44 self.assertEquals(make_schema((Variable('A'), Variable('B')), solution, |
44 self.assertEquals(make_schema((Variable('A'), Variable('B')), solution, |
45 'table0', TYPEMAP), |
45 'table0', TYPEMAP), |
46 ('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'})) |
46 ('C0 text,C1 integer', {'A': 'table0.C0', 'B': 'table0.C1'})) |
47 |
47 |
48 |
48 |
49 repo, cnx = init_test_database('sqlite') |
49 repo, cnx = init_test_database() |
50 |
50 |
51 |
51 |
52 |
52 |
53 class UtilsTC(BaseQuerierTC): |
53 class UtilsTC(BaseQuerierTC): |
54 repo = repo |
54 repo = repo |
76 self.assertEquals(len(rqlst.solutions), 1) |
76 self.assertEquals(len(rqlst.solutions), 1) |
77 |
77 |
78 def test_preprocess_security(self): |
78 def test_preprocess_security(self): |
79 plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' |
79 plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' |
80 'WHERE X is ET, ET name ETN') |
80 'WHERE X is ET, ET name ETN') |
81 plan.session = self._user_session(('users',))[1] |
81 plan.session = self.user_groups_session('users') |
82 union = plan.rqlst |
82 union = plan.rqlst |
83 plan.preprocess(union) |
83 plan.preprocess(union) |
84 self.assertEquals(len(union.children), 1) |
84 self.assertEquals(len(union.children), 1) |
85 self.assertEquals(len(union.children[0].with_), 1) |
85 self.assertEquals(len(union.children[0].with_), 1) |
86 subq = union.children[0].with_[0].query |
86 subq = union.children[0].with_[0].query |
156 'ETN': 'String', |
156 'ETN': 'String', |
157 }]) |
157 }]) |
158 |
158 |
159 def test_preprocess_security_aggregat(self): |
159 def test_preprocess_security_aggregat(self): |
160 plan = self._prepare_plan('Any MAX(X)') |
160 plan = self._prepare_plan('Any MAX(X)') |
161 plan.session = self._user_session(('users',))[1] |
161 plan.session = self.user_groups_session('users') |
162 union = plan.rqlst |
162 union = plan.rqlst |
163 plan.preprocess(union) |
163 plan.preprocess(union) |
164 self.assertEquals(len(union.children), 1) |
164 self.assertEquals(len(union.children), 1) |
165 self.assertEquals(len(union.children[0].with_), 1) |
165 self.assertEquals(len(union.children[0].with_), 1) |
166 subq = union.children[0].with_[0].query |
166 subq = union.children[0].with_[0].query |
255 def test_select_is_aggr(self): |
255 def test_select_is_aggr(self): |
256 rset = self.execute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN') |
256 rset = self.execute('Any TN, COUNT(X) GROUPBY TN ORDERBY 2 DESC WHERE X is T, T name TN') |
257 result, descr = rset.rows, rset.description |
257 result, descr = rset.rows, rset.description |
258 self.assertEquals(descr[0][0], 'String') |
258 self.assertEquals(descr[0][0], 'String') |
259 self.assertEquals(descr[0][1], 'Int') |
259 self.assertEquals(descr[0][1], 'Int') |
260 self.assertEquals(result[0][0], 'CWRelation') |
260 self.assertEquals(result[0][0], 'CWRelation') # XXX may change as schema evolve |
261 |
261 |
262 def test_select_groupby_orderby(self): |
262 def test_select_groupby_orderby(self): |
263 rset = self.execute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N') |
263 rset = self.execute('Any N GROUPBY N ORDERBY N WHERE X is CWGroup, X name N') |
264 self.assertEquals(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)]) |
264 self.assertEquals(tuplify(rset.rows), [('guests',), ('managers',), ('owners',), ('users',)]) |
265 self.assertEquals(rset.description, [('String',), ('String',), ('String',), ('String',)]) |
265 self.assertEquals(rset.description, [('String',), ('String',), ('String',), ('String',)]) |
557 self.assertEquals(tuplify(rset.rows), [(1,), (2,)]) |
557 self.assertEquals(tuplify(rset.rows), [(1,), (2,)]) |
558 self.assertEquals(rset.description, [('CWGroup',), ('CWGroup',)]) |
558 self.assertEquals(rset.description, [('CWGroup',), ('CWGroup',)]) |
559 rset = self.execute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N') |
559 rset = self.execute('CWGroup X ORDERBY N LIMIT 2 OFFSET 2 WHERE X name N') |
560 self.assertEquals(tuplify(rset.rows), [(3,), (4,)]) |
560 self.assertEquals(tuplify(rset.rows), [(3,), (4,)]) |
561 |
561 |
562 def test_select_symetric(self): |
562 def test_select_symmetric(self): |
563 self.execute("INSERT Personne X: X nom 'machin'") |
563 self.execute("INSERT Personne X: X nom 'machin'") |
564 self.execute("INSERT Personne X: X nom 'bidule'") |
564 self.execute("INSERT Personne X: X nom 'bidule'") |
565 self.execute("INSERT Personne X: X nom 'chouette'") |
565 self.execute("INSERT Personne X: X nom 'chouette'") |
566 self.execute("INSERT Personne X: X nom 'trucmuche'") |
566 self.execute("INSERT Personne X: X nom 'trucmuche'") |
567 self.execute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'") |
567 self.execute("SET X connait Y WHERE X nom 'chouette', Y nom 'bidule'") |
860 self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'") |
860 self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X nom 'bidule'") |
861 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
861 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
862 self.assert_(rset.rows) |
862 self.assert_(rset.rows) |
863 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
863 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
864 |
864 |
|
865 def test_insert_5bis(self): |
|
866 peid = self.execute("INSERT Personne X: X nom 'bidule'")[0][0] |
|
867 self.execute("INSERT Societe Y: Y nom 'toto', X travaille Y WHERE X eid %(x)s", |
|
868 {'x': peid}, 'x') |
|
869 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
|
870 self.assert_(rset.rows) |
|
871 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
|
872 |
865 def test_insert_6(self): |
873 def test_insert_6(self): |
866 self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto', X travaille Y") |
874 self.execute("INSERT Personne X, Societe Y: X nom 'bidule', Y nom 'toto', X travaille Y") |
867 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
875 rset = self.execute('Any X, Y WHERE X nom "bidule", Y nom "toto", X travaille Y') |
868 self.assert_(rset.rows) |
876 self.assert_(rset.rows) |
869 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
877 self.assertEquals(rset.description, [('Personne', 'Societe',)]) |
905 |
913 |
906 def test_delete_1(self): |
914 def test_delete_1(self): |
907 self.execute("INSERT Personne Y: Y nom 'toto'") |
915 self.execute("INSERT Personne Y: Y nom 'toto'") |
908 rset = self.execute('Personne X WHERE X nom "toto"') |
916 rset = self.execute('Personne X WHERE X nom "toto"') |
909 self.assertEqual(len(rset.rows), 1) |
917 self.assertEqual(len(rset.rows), 1) |
910 self.execute("DELETE Personne Y WHERE Y nom 'toto'") |
918 drset = self.execute("DELETE Personne Y WHERE Y nom 'toto'") |
|
919 self.assertEqual(drset.rows, rset.rows) |
911 rset = self.execute('Personne X WHERE X nom "toto"') |
920 rset = self.execute('Personne X WHERE X nom "toto"') |
912 self.assertEqual(len(rset.rows), 0) |
921 self.assertEqual(len(rset.rows), 0) |
913 |
922 |
914 def test_delete_2(self): |
923 def test_delete_2(self): |
915 rset = self.execute("INSERT Personne X, Personne Y, Societe Z : X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z") |
924 rset = self.execute("INSERT Personne X, Personne Y, Societe Z : X nom 'syt', Y nom 'adim', Z nom 'Logilab', X travaille Z, Y travaille Z") |
925 self.execute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilab'") |
934 self.execute("DELETE X travaille Y WHERE X is Personne, Y nom 'Logilab'") |
926 rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
935 rset = self.execute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
927 self.assertEqual(len(rset.rows), 0, rset.rows) |
936 self.assertEqual(len(rset.rows), 0, rset.rows) |
928 |
937 |
929 def test_delete_3(self): |
938 def test_delete_3(self): |
930 u, s = self._user_session(('users',)) |
939 s = self.user_groups_session('users') |
931 peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0] |
940 peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0] |
932 seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0] |
941 seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0] |
933 self.o.execute(s, "SET P travaille S") |
942 self.o.execute(s, "SET P travaille S") |
934 rset = self.execute('Personne P WHERE P travaille S') |
943 rset = self.execute('Personne P WHERE P travaille S') |
935 self.assertEqual(len(rset.rows), 1) |
944 self.assertEqual(len(rset.rows), 1) |
936 self.execute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid)) |
945 self.execute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid)) |
937 rset = self.execute('Personne P WHERE P travaille S') |
946 rset = self.execute('Personne P WHERE P travaille S') |
938 self.assertEqual(len(rset.rows), 0) |
947 self.assertEqual(len(rset.rows), 0) |
939 |
948 |
940 def test_delete_symetric(self): |
949 def test_delete_symmetric(self): |
941 teid1 = self.execute("INSERT Folder T: T name 'toto'")[0][0] |
950 teid1 = self.execute("INSERT Folder T: T name 'toto'")[0][0] |
942 teid2 = self.execute("INSERT Folder T: T name 'tutu'")[0][0] |
951 teid2 = self.execute("INSERT Folder T: T name 'tutu'")[0][0] |
943 self.execute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2)) |
952 self.execute('SET X see_also Y WHERE X eid %s, Y eid %s' % (teid1, teid2)) |
944 rset = self.execute('Any X,Y WHERE X see_also Y') |
953 rset = self.execute('Any X,Y WHERE X see_also Y') |
945 self.assertEquals(len(rset) , 2, rset.rows) |
954 self.assertEquals(len(rset) , 2, rset.rows) |
956 def test_nonregr_delete_cache(self): |
965 def test_nonregr_delete_cache(self): |
957 """test that relations are properly cleaned when an entity is deleted |
966 """test that relations are properly cleaned when an entity is deleted |
958 (using cachekey on sql generation returned always the same query for an eid, |
967 (using cachekey on sql generation returned always the same query for an eid, |
959 whatever the relation) |
968 whatever the relation) |
960 """ |
969 """ |
961 u, s = self._user_session(('users',)) |
970 s = self.user_groups_session('users') |
962 aeid, = self.o.execute(s, 'INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] |
971 aeid, = self.o.execute(s, 'INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] |
963 # XXX would be nice if the rql below was enough... |
972 # XXX would be nice if the rql below was enough... |
964 #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y' |
973 #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y' |
965 eeid, = self.o.execute(s, 'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0] |
974 eeid, = self.o.execute(s, 'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0] |
966 self.o.execute(s, "DELETE Email X") |
975 self.o.execute(s, "DELETE Email X") |