108 def cleanup(self): |
108 def cleanup(self): |
109 # no need for cleanup here |
109 # no need for cleanup here |
110 pass |
110 pass |
111 |
111 |
112 def test_preprocess_1(self): |
112 def test_preprocess_1(self): |
113 reid = self.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] |
113 with self.session.new_cnx() as cnx: |
114 rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid}) |
114 reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] |
115 self.assertEqual(rqlst.solutions, [{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}]) |
115 rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', |
|
116 {'x': reid}) |
|
117 self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}], |
|
118 rqlst.solutions) |
116 |
119 |
117 def test_preprocess_2(self): |
120 def test_preprocess_2(self): |
118 teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0] |
121 teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0] |
119 #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] |
122 #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] |
120 #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
123 #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", |
121 # {'g': geid, 't': teid}, 'g') |
124 # {'g': geid, 't': teid}, 'g') |
122 rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) |
125 rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) |
123 # the query may be optimized, should keep only one solution |
126 # the query may be optimized, should keep only one solution |
124 # (any one, etype will be discarded) |
127 # (any one, etype will be discarded) |
125 self.assertEqual(len(rqlst.solutions), 1) |
128 self.assertEqual(1, len(rqlst.solutions)) |
|
129 |
|
130 def assertRQLEqual(self, expected, got): |
|
131 from rql import parse |
|
132 self.assertMultiLineEqual(unicode(parse(expected)), |
|
133 unicode(parse(got))) |
126 |
134 |
127 def test_preprocess_security(self): |
135 def test_preprocess_security(self): |
128 plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' |
136 plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' |
129 'WHERE X is ET, ET name ETN') |
137 'WHERE X is ET, ET name ETN') |
130 plan.cnx = self.user_groups_session('users') |
138 plan.cnx = self.user_groups_session('users') |
138 ['ETN','COUNT(X)']) |
146 ['ETN','COUNT(X)']) |
139 self.assertEqual([t.as_string() for t in union.children[0].groupby], |
147 self.assertEqual([t.as_string() for t in union.children[0].groupby], |
140 ['ETN']) |
148 ['ETN']) |
141 partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) |
149 partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) |
142 rql, solutions = partrqls[0] |
150 rql, solutions = partrqls[0] |
143 self.assertEqual(rql, |
151 self.assertRQLEqual(rql, |
144 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' |
152 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' |
145 ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, X identity D, C is Division, D is Affaire))' |
153 ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, ' |
146 ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, X identity H, H is Affaire)))' |
154 ' X identity D, C is Division, D is Affaire))' |
147 ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, X identity I, I is Affaire)))' |
155 ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, ' |
148 ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, X identity J, J is Affaire)))' |
156 ' X identity H, H is Affaire)))' |
149 ', ET is CWEType, X is Affaire') |
157 ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, ' |
|
158 ' X identity I, I is Affaire)))' |
|
159 ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, ' |
|
160 ' X identity J, J is Affaire)))' |
|
161 ', ET is CWEType, X is Affaire') |
150 self.assertEqual(solutions, [{'C': 'Division', |
162 self.assertEqual(solutions, [{'C': 'Division', |
151 'D': 'Affaire', |
163 'D': 'Affaire', |
152 'E': 'Note', |
164 'E': 'Note', |
153 'F': 'Societe', |
165 'F': 'Societe', |
154 'G': 'SubDivision', |
166 'G': 'SubDivision', |
156 'I': 'Affaire', |
168 'I': 'Affaire', |
157 'J': 'Affaire', |
169 'J': 'Affaire', |
158 'X': 'Affaire', |
170 'X': 'Affaire', |
159 'ET': 'CWEType', 'ETN': 'String'}]) |
171 'ET': 'CWEType', 'ETN': 'String'}]) |
160 rql, solutions = partrqls[1] |
172 rql, solutions = partrqls[1] |
161 self.assertEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') |
173 self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' |
|
174 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, ' |
|
175 ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, ' |
|
176 ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, ' |
|
177 ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, ' |
|
178 ' Old, Personne, RQLExpression, Societe, State, SubDivision, ' |
|
179 ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') |
162 self.assertListEqual(sorted(solutions), |
180 self.assertListEqual(sorted(solutions), |
163 sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, |
181 sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, |
164 {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, |
182 {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, |
165 {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, |
183 {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, |
166 {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'}, |
184 {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'}, |
252 self.assertEqual(rset.description[0][0], 'String') |
270 self.assertEqual(rset.description[0][0], 'String') |
253 rset = self.qexecute('Any %(x)s', {'x': u'str'}) |
271 rset = self.qexecute('Any %(x)s', {'x': u'str'}) |
254 self.assertEqual(rset.description[0][0], 'String') |
272 self.assertEqual(rset.description[0][0], 'String') |
255 |
273 |
256 def test_build_descr1(self): |
274 def test_build_descr1(self): |
257 rset = self.execute('(Any U,L WHERE U login L) UNION (Any G,N WHERE G name N, G is CWGroup)') |
275 with self.session.new_cnx() as cnx: |
258 rset.req = self.session |
276 rset = cnx.execute('(Any U,L WHERE U login L) UNION ' |
259 orig_length = len(rset) |
277 '(Any G,N WHERE G name N, G is CWGroup)') |
260 rset.rows[0][0] = 9999999 |
278 # rset.req = self.session |
261 description = manual_build_descr(rset.req, rset.syntax_tree(), None, rset.rows) |
279 orig_length = len(rset) |
262 self.assertEqual(len(description), orig_length - 1) |
280 rset.rows[0][0] = 9999999 |
263 self.assertEqual(len(rset.rows), orig_length - 1) |
281 description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows) |
264 self.assertNotEqual(rset.rows[0][0], 9999999) |
282 self.assertEqual(len(description), orig_length - 1) |
|
283 self.assertEqual(len(rset.rows), orig_length - 1) |
|
284 self.assertNotEqual(rset.rows[0][0], 9999999) |
265 |
285 |
266 def test_build_descr2(self): |
286 def test_build_descr2(self): |
267 rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION ' |
287 rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION ' |
268 '(Any U,G WHERE U in_group G))') |
288 '(Any U,G WHERE U in_group G))') |
269 for x, y in rset.description: |
289 for x, y in rset.description: |
653 rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid}) |
673 rset = self.qexecute('Any ABS(D) WHERE X eid %(x)s, X duration D', {'x': eid}) |
654 self.assertEqual(rset.rows[0][0], 12) |
674 self.assertEqual(rset.rows[0][0], 12) |
655 |
675 |
656 ## def test_select_simplified(self): |
676 ## def test_select_simplified(self): |
657 ## ueid = self.session.user.eid |
677 ## ueid = self.session.user.eid |
658 ## rset = self.execute('Any L WHERE %s login L'%ueid) |
678 ## rset = self.qexecute('Any L WHERE %s login L'%ueid) |
659 ## self.assertEqual(rset.rows[0][0], 'admin') |
679 ## self.assertEqual(rset.rows[0][0], 'admin') |
660 ## rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid}) |
680 ## rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid}) |
661 ## self.assertEqual(rset.rows[0][0], 'admin') |
681 ## self.assertEqual(rset.rows[0][0], 'admin') |
662 |
682 |
663 def test_select_searchable_text_1(self): |
683 def test_select_searchable_text_1(self): |
664 rset = self.qexecute(u"INSERT Personne X: X nom 'bidüle'") |
684 rset = self.qexecute(u"INSERT Personne X: X nom 'bidüle'") |
665 rset = self.qexecute(u"INSERT Societe X: X nom 'bidüle'") |
685 rset = self.qexecute(u"INSERT Societe X: X nom 'bidüle'") |
666 rset = self.qexecute("INSERT Societe X: X nom 'chouette'") |
686 rset = self.qexecute("INSERT Societe X: X nom 'chouette'") |
667 self.commit() |
|
668 rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidüle'}) |
687 rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidüle'}) |
669 self.assertEqual(len(rset.rows), 2, rset.rows) |
688 self.assertEqual(len(rset.rows), 2, rset.rows) |
670 rset = self.qexecute(u'Any N where N has_text "bidüle"') |
689 rset = self.qexecute(u'Any N where N has_text "bidüle"') |
671 self.assertEqual(len(rset.rows), 2, rset.rows) |
690 self.assertEqual(len(rset.rows), 2, rset.rows) |
672 biduleeids = [r[0] for r in rset.rows] |
691 biduleeids = [r[0] for r in rset.rows] |
677 |
696 |
678 def test_select_searchable_text_2(self): |
697 def test_select_searchable_text_2(self): |
679 rset = self.qexecute("INSERT Personne X: X nom 'bidule'") |
698 rset = self.qexecute("INSERT Personne X: X nom 'bidule'") |
680 rset = self.qexecute("INSERT Personne X: X nom 'chouette'") |
699 rset = self.qexecute("INSERT Personne X: X nom 'chouette'") |
681 rset = self.qexecute("INSERT Societe X: X nom 'bidule'") |
700 rset = self.qexecute("INSERT Societe X: X nom 'bidule'") |
682 self.commit() |
|
683 rset = self.qexecute('Personne N where N has_text "bidule"') |
701 rset = self.qexecute('Personne N where N has_text "bidule"') |
684 self.assertEqual(len(rset.rows), 1, rset.rows) |
702 self.assertEqual(len(rset.rows), 1, rset.rows) |
685 |
703 |
686 def test_select_searchable_text_3(self): |
704 def test_select_searchable_text_3(self): |
687 rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'") |
705 rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'") |
688 rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'") |
706 rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'") |
689 rset = self.qexecute("INSERT Societe X: X nom 'bidule'") |
707 rset = self.qexecute("INSERT Societe X: X nom 'bidule'") |
690 self.commit() |
|
691 rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"') |
708 rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"') |
692 self.assertEqual(len(rset.rows), 1, rset.rows) |
709 self.assertEqual(len(rset.rows), 1, rset.rows) |
693 |
710 |
694 def test_select_multiple_searchable_text(self): |
711 def test_select_multiple_searchable_text(self): |
695 self.qexecute(u"INSERT Personne X: X nom 'bidüle'") |
712 self.qexecute(u"INSERT Personne X: X nom 'bidüle'") |
696 self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X") |
713 self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X") |
697 self.qexecute(u"INSERT Personne X: X nom 'bidüle'") |
714 self.qexecute(u"INSERT Personne X: X nom 'bidüle'") |
698 self.commit() |
|
699 rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s', |
715 rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s', |
700 {'text': u'bidüle', |
716 {'text': u'bidüle', |
701 'text2': u'chouette',} |
717 'text2': u'chouette',} |
702 ) |
718 ) |
703 self.assertEqual(len(rset.rows), 1, rset.rows) |
719 self.assertEqual(len(rset.rows), 1, rset.rows) |
734 self.assertEqual(len(rset.rows), 2, rset.rows) |
750 self.assertEqual(len(rset.rows), 2, rset.rows) |
735 rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"') |
751 rset = self.qexecute('Any P WHERE P2 connait P, P2 nom "chouette"') |
736 self.assertEqual(len(rset.rows), 2, rset.rows) |
752 self.assertEqual(len(rset.rows), 2, rset.rows) |
737 |
753 |
738 def test_select_inline(self): |
754 def test_select_inline(self): |
739 self.execute("INSERT Personne X: X nom 'bidule'") |
755 self.qexecute("INSERT Personne X: X nom 'bidule'") |
740 self.execute("INSERT Note X: X type 'a'") |
756 self.qexecute("INSERT Note X: X type 'a'") |
741 self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") |
757 self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") |
742 rset = self.execute('Any N where N ecrit_par X, X nom "bidule"') |
758 rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"') |
743 self.assertEqual(len(rset.rows), 1, rset.rows) |
759 self.assertEqual(len(rset.rows), 1, rset.rows) |
744 |
760 |
745 def test_select_creation_date(self): |
761 def test_select_creation_date(self): |
746 self.qexecute("INSERT Personne X: X nom 'bidule'") |
762 self.qexecute("INSERT Personne X: X nom 'bidule'") |
747 rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D') |
763 rset = self.qexecute('Any D WHERE X nom "bidule", X creation_date D') |
751 self.qexecute("INSERT Personne X: X nom 'bidule'") |
767 self.qexecute("INSERT Personne X: X nom 'bidule'") |
752 self.qexecute("INSERT Personne X: X nom 'chouette'") |
768 self.qexecute("INSERT Personne X: X nom 'chouette'") |
753 self.qexecute("INSERT Societe X: X nom 'logilab'") |
769 self.qexecute("INSERT Societe X: X nom 'logilab'") |
754 self.qexecute("INSERT Societe X: X nom 'caesium'") |
770 self.qexecute("INSERT Societe X: X nom 'caesium'") |
755 self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'") |
771 self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'") |
756 rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') |
772 rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, ' |
|
773 'S1 nom "logilab", S2 nom "caesium"') |
757 self.assertEqual(len(rset.rows), 1) |
774 self.assertEqual(len(rset.rows), 1) |
758 self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'") |
775 self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'") |
759 rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') |
776 rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, ' |
|
777 'S1 nom "logilab", S2 nom "caesium"') |
760 self.assertEqual(len(rset.rows), 2) |
778 self.assertEqual(len(rset.rows), 2) |
761 |
779 |
762 def test_select_or_sym_relation(self): |
780 def test_select_or_sym_relation(self): |
763 self.qexecute("INSERT Personne X: X nom 'bidule'") |
781 self.qexecute("INSERT Personne X: X nom 'bidule'") |
764 self.qexecute("INSERT Personne X: X nom 'chouette'") |
782 self.qexecute("INSERT Personne X: X nom 'chouette'") |
806 self.assertEqual(len(rset.rows), 0) |
824 self.assertEqual(len(rset.rows), 0) |
807 rset = self.qexecute('Any U WHERE U in_group G, G name "guests" OR G name "managers"') |
825 rset = self.qexecute('Any U WHERE U in_group G, G name "guests" OR G name "managers"') |
808 self.assertEqual(len(rset.rows), 2) |
826 self.assertEqual(len(rset.rows), 2) |
809 |
827 |
810 def test_select_explicit_eid(self): |
828 def test_select_explicit_eid(self): |
811 rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid}) |
829 rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', |
|
830 {'u': self.session.user.eid}) |
812 self.assertTrue(rset) |
831 self.assertTrue(rset) |
813 self.assertEqual(rset.description[0][1], 'Int') |
832 self.assertEqual(rset.description[0][1], 'Int') |
814 |
833 |
815 # def test_select_rewritten_optional(self): |
834 # def test_select_rewritten_optional(self): |
816 # eid = self.qexecute("INSERT Affaire X: X sujet 'cool'")[0][0] |
835 # eid = self.qexecute("INSERT Affaire X: X sujet 'cool'")[0][0] |
850 'Decimal', 'Float', |
869 'Decimal', 'Float', |
851 'Int', 'Interval', |
870 'Int', 'Interval', |
852 'Password', 'String', |
871 'Password', 'String', |
853 'TZDatetime', 'TZTime', |
872 'TZDatetime', 'TZTime', |
854 'Time']) |
873 'Time']) |
855 req = self.session |
874 with self.session.new_cnx() as cnx: |
856 req.create_entity('Personne', nom=u'louis', test=True) |
875 cnx.create_entity('Personne', nom=u'louis', test=True) |
857 self.assertEqual(len(req.execute('Any X WHERE X test %(val)s', {'val': True})), 1) |
876 self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': True})), 1) |
858 self.assertEqual(len(req.execute('Any X WHERE X test TRUE')), 1) |
877 self.assertEqual(len(cnx.execute('Any X WHERE X test TRUE')), 1) |
859 self.assertEqual(len(req.execute('Any X WHERE X test %(val)s', {'val': False})), 0) |
878 self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': False})), 0) |
860 self.assertEqual(len(req.execute('Any X WHERE X test FALSE')), 0) |
879 self.assertEqual(len(cnx.execute('Any X WHERE X test FALSE')), 0) |
861 |
880 |
862 def test_select_constant(self): |
881 def test_select_constant(self): |
863 rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup') |
882 rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup') |
864 self.assertEqual(rset.rows, |
883 self.assertEqual(rset.rows, |
865 map(list, zip((2,3,4,5), ('toto','toto','toto','toto',)))) |
884 map(list, zip((2,3,4,5), ('toto','toto','toto','toto',)))) |
895 self.qexecute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)' |
914 self.qexecute('(Any N,COUNT(X) GROUPBY N WHERE X name N, X is State)' |
896 ' UNION ' |
915 ' UNION ' |
897 '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)') |
916 '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)') |
898 |
917 |
899 def test_select_union_aggregat_independant_group(self): |
918 def test_select_union_aggregat_independant_group(self): |
900 self.execute('INSERT State X: X name "hop"') |
919 with self.session.new_cnx() as cnx: |
901 self.execute('INSERT State X: X name "hop"') |
920 cnx.execute('INSERT State X: X name "hop"') |
902 self.execute('INSERT Transition X: X name "hop"') |
921 cnx.execute('INSERT State X: X name "hop"') |
903 self.execute('INSERT Transition X: X name "hop"') |
922 cnx.execute('INSERT Transition X: X name "hop"') |
904 rset = self.execute('Any N,NX ORDERBY 2 WITH N,NX BEING ' |
923 cnx.execute('INSERT Transition X: X name "hop"') |
905 '((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)' |
924 rset = cnx.execute('Any N,NX ORDERBY 2 WITH N,NX BEING ' |
906 ' UNION ' |
925 '((Any N,COUNT(X) GROUPBY N WHERE X name N, ' |
907 '(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))') |
926 ' X is State HAVING COUNT(X)>1)' |
908 self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]]) |
927 ' UNION ' |
|
928 '(Any N,COUNT(X) GROUPBY N WHERE X name N, ' |
|
929 ' X is Transition HAVING COUNT(X)>1))') |
|
930 self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]]) |
909 |
931 |
910 def test_select_union_selection_with_diff_variables(self): |
932 def test_select_union_selection_with_diff_variables(self): |
911 rset = self.qexecute('(Any N WHERE X name N, X is State)' |
933 rset = self.qexecute('(Any N WHERE X name N, X is State)' |
912 ' UNION ' |
934 ' UNION ' |
913 '(Any NN WHERE XX name NN, XX is Transition)') |
935 '(Any NN WHERE XX name NN, XX is Transition)') |
1142 rset = self.qexecute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
1164 rset = self.qexecute('Personne X WHERE X travaille Y, Y nom "Logilab"') |
1143 self.assertEqual(len(rset.rows), 0, rset.rows) |
1165 self.assertEqual(len(rset.rows), 0, rset.rows) |
1144 |
1166 |
1145 def test_delete_3(self): |
1167 def test_delete_3(self): |
1146 s = self.user_groups_session('users') |
1168 s = self.user_groups_session('users') |
1147 peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0] |
1169 with s.new_cnx() as cnx: |
1148 seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0] |
1170 with cnx.ensure_cnx_set: |
1149 self.o.execute(s, "SET P travaille S") |
1171 peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0] |
1150 rset = self.execute('Personne P WHERE P travaille S') |
1172 seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0] |
|
1173 self.o.execute(cnx, "SET P travaille S") |
|
1174 cnx.commit() |
|
1175 rset = self.qexecute('Personne P WHERE P travaille S') |
1151 self.assertEqual(len(rset.rows), 1) |
1176 self.assertEqual(len(rset.rows), 1) |
1152 self.qexecute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid)) |
1177 self.qexecute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid)) |
1153 rset = self.qexecute('Personne P WHERE P travaille S') |
1178 rset = self.qexecute('Personne P WHERE P travaille S') |
1154 self.assertEqual(len(rset.rows), 0) |
1179 self.assertEqual(len(rset.rows), 0) |
1155 |
1180 |
1172 def test_nonregr_delete_cache(self): |
1197 def test_nonregr_delete_cache(self): |
1173 """test that relations are properly cleaned when an entity is deleted |
1198 """test that relations are properly cleaned when an entity is deleted |
1174 (using cachekey on sql generation returned always the same query for an eid, |
1199 (using cachekey on sql generation returned always the same query for an eid, |
1175 whatever the relation) |
1200 whatever the relation) |
1176 """ |
1201 """ |
1177 aeid, = self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] |
1202 aeid, = self.qexecute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] |
1178 # XXX would be nice if the rql below was enough... |
1203 # XXX would be nice if the rql below was enough... |
1179 #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y' |
1204 #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y' |
1180 eeid, = self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0] |
1205 eeid, = self.qexecute('INSERT Email X: X messageid "<1234>", X subject "test", ' |
1181 self.execute("DELETE Email X") |
1206 'X sender Y, X recipients Y WHERE Y is EmailAddress')[0] |
1182 sqlc = self.session.cnxset.cu |
1207 self.qexecute("DELETE Email X") |
1183 sqlc.execute('SELECT * FROM recipients_relation') |
1208 with self.session.new_cnx() as cnx: |
1184 self.assertEqual(len(sqlc.fetchall()), 0) |
1209 with cnx.ensure_cnx_set: |
1185 sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid) |
1210 sqlc = cnx.cnxset.cu |
1186 self.assertEqual(len(sqlc.fetchall()), 0) |
1211 sqlc.execute('SELECT * FROM recipients_relation') |
|
1212 self.assertEqual(len(sqlc.fetchall()), 0) |
|
1213 sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid) |
|
1214 self.assertEqual(len(sqlc.fetchall()), 0) |
1187 |
1215 |
1188 def test_nonregr_delete_cache2(self): |
1216 def test_nonregr_delete_cache2(self): |
1189 eid = self.qexecute("INSERT Folder T: T name 'toto'")[0][0] |
1217 eid = self.qexecute("INSERT Folder T: T name 'toto'")[0][0] |
1190 self.commit() |
|
1191 # fill the cache |
1218 # fill the cache |
1192 self.qexecute("Any X WHERE X eid %(x)s", {'x': eid}) |
1219 self.qexecute("Any X WHERE X eid %(x)s", {'x': eid}) |
1193 self.qexecute("Any X WHERE X eid %s" % eid) |
1220 self.qexecute("Any X WHERE X eid %s" % eid) |
1194 self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid}) |
1221 self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid}) |
1195 self.qexecute("Folder X WHERE X eid %s" % eid) |
1222 self.qexecute("Folder X WHERE X eid %s" % eid) |
1196 self.qexecute("DELETE Folder T WHERE T eid %s" % eid) |
1223 self.qexecute("DELETE Folder T WHERE T eid %s" % eid) |
1197 self.commit() |
|
1198 rset = self.qexecute("Any X WHERE X eid %(x)s", {'x': eid}) |
1224 rset = self.qexecute("Any X WHERE X eid %(x)s", {'x': eid}) |
1199 self.assertEqual(rset.rows, []) |
1225 self.assertEqual(rset.rows, []) |
1200 rset = self.qexecute("Any X WHERE X eid %s" % eid) |
1226 rset = self.qexecute("Any X WHERE X eid %s" % eid) |
1201 self.assertEqual(rset.rows, []) |
1227 self.assertEqual(rset.rows, []) |
1202 rset = self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid}) |
1228 rset = self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid}) |
1248 self.qexecute("SET X nom 'tutu', Y nom 'toto' WHERE X nom 'toto', Y nom 'tutu'") |
1274 self.qexecute("SET X nom 'tutu', Y nom 'toto' WHERE X nom 'toto', Y nom 'tutu'") |
1249 self.assertEqual(self.qexecute('Any X WHERE X nom "toto"').rows, [[peid1]]) |
1275 self.assertEqual(self.qexecute('Any X WHERE X nom "toto"').rows, [[peid1]]) |
1250 self.assertEqual(self.qexecute('Any X WHERE X nom "tutu"').rows, [[peid2]]) |
1276 self.assertEqual(self.qexecute('Any X WHERE X nom "tutu"').rows, [[peid2]]) |
1251 |
1277 |
1252 def test_update_multiple2(self): |
1278 def test_update_multiple2(self): |
1253 ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] |
1279 with self.session.new_cnx() as cnx: |
1254 peid1 = self.execute("INSERT Personne Y: Y nom 'turlu'")[0][0] |
1280 ueid = cnx.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] |
1255 peid2 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] |
1281 peid1 = cnx.execute("INSERT Personne Y: Y nom 'turlu'")[0][0] |
1256 self.execute('SET P1 owned_by U, P2 owned_by U ' |
1282 peid2 = cnx.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] |
1257 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid)) |
1283 cnx.execute('SET P1 owned_by U, P2 owned_by U ' |
1258 self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' |
1284 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid)) |
1259 % (peid1, ueid))) |
1285 self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' |
1260 self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' |
1286 % (peid1, ueid))) |
1261 % (peid2, ueid))) |
1287 self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' |
|
1288 % (peid2, ueid))) |
1262 |
1289 |
1263 def test_update_math_expr(self): |
1290 def test_update_math_expr(self): |
1264 orders = [r[0] for r in self.qexecute('Any O ORDERBY O WHERE ST name "Personne", ' |
1291 orders = [r[0] for r in self.qexecute('Any O ORDERBY O WHERE ST name "Personne", ' |
1265 'X from_entity ST, X ordernum O')] |
1292 'X from_entity ST, X ordernum O')] |
1266 for i,v in enumerate(orders): |
1293 for i,v in enumerate(orders): |
1322 self.assertTrue(self.qexecute("DELETE Personne Y WHERE X tel XT HAVING XT&1=1")) |
1349 self.assertTrue(self.qexecute("DELETE Personne Y WHERE X tel XT HAVING XT&1=1")) |
1323 |
1350 |
1324 # upassword encryption tests ################################################# |
1351 # upassword encryption tests ################################################# |
1325 |
1352 |
1326 def test_insert_upassword(self): |
1353 def test_insert_upassword(self): |
1327 rset = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'") |
1354 rset = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', " |
|
1355 "X in_group G WHERE G name 'users'") |
1328 self.assertEqual(len(rset.rows), 1) |
1356 self.assertEqual(len(rset.rows), 1) |
1329 self.assertEqual(rset.description, [('CWUser',)]) |
1357 self.assertEqual(rset.description, [('CWUser',)]) |
1330 self.assertRaises(Unauthorized, |
1358 self.assertRaises(Unauthorized, |
1331 self.execute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") |
1359 self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") |
1332 cursor = self.cnxset.cu |
1360 with self.session.new_cnx() as cnx: |
1333 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1361 with cnx.ensure_cnx_set: |
1334 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1362 cursor = cnx.cnxset.cu |
1335 passwd = str(cursor.fetchone()[0]) |
1363 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1336 self.assertEqual(passwd, crypt_password('toto', passwd)) |
1364 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1337 rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1365 passwd = str(cursor.fetchone()[0]) |
|
1366 self.assertEqual(passwd, crypt_password('toto', passwd)) |
|
1367 rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1338 {'pwd': Binary(passwd)}) |
1368 {'pwd': Binary(passwd)}) |
1339 self.assertEqual(len(rset.rows), 1) |
1369 self.assertEqual(len(rset.rows), 1) |
1340 self.assertEqual(rset.description, [('CWUser',)]) |
1370 self.assertEqual(rset.description, [('CWUser',)]) |
1341 |
1371 |
1342 def test_update_upassword(self): |
1372 def test_update_upassword(self): |
1343 rset = self.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) |
1373 with self.session.new_cnx() as cnx: |
1344 self.assertEqual(rset.description[0][0], 'CWUser') |
1374 with cnx.ensure_cnx_set: |
1345 rset = self.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'", |
1375 rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", |
1346 {'pwd': 'tutu'}) |
1376 {'pwd': 'toto'}) |
1347 cursor = self.cnxset.cu |
1377 self.assertEqual(rset.description[0][0], 'CWUser') |
1348 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1378 rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'", |
1349 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1379 {'pwd': 'tutu'}) |
1350 passwd = str(cursor.fetchone()[0]) |
1380 cursor = cnx.cnxset.cu |
1351 self.assertEqual(passwd, crypt_password('tutu', passwd)) |
1381 cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" |
1352 rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
1382 % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) |
1353 {'pwd': Binary(passwd)}) |
1383 passwd = str(cursor.fetchone()[0]) |
1354 self.assertEqual(len(rset.rows), 1) |
1384 self.assertEqual(passwd, crypt_password('tutu', passwd)) |
1355 self.assertEqual(rset.description, [('CWUser',)]) |
1385 rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", |
|
1386 {'pwd': Binary(passwd)}) |
|
1387 self.assertEqual(len(rset.rows), 1) |
|
1388 self.assertEqual(rset.description, [('CWUser',)]) |
1356 |
1389 |
1357 # ZT datetime tests ######################################################## |
1390 # ZT datetime tests ######################################################## |
1358 |
1391 |
1359 def test_tz_datetime(self): |
1392 def test_tz_datetime(self): |
1360 self.qexecute("INSERT Personne X: X nom 'bob', X tzdatenaiss %(date)s", |
1393 self.qexecute("INSERT Personne X: X nom 'bob', X tzdatenaiss %(date)s", |
1471 self.assertEqual(rset.rows, [[peid]]) |
1504 self.assertEqual(rset.rows, [[peid]]) |
1472 |
1505 |
1473 def test_nonregr_has_text_cache(self): |
1506 def test_nonregr_has_text_cache(self): |
1474 eid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0] |
1507 eid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0] |
1475 eid2 = self.qexecute("INSERT Personne X: X nom 'tag'")[0][0] |
1508 eid2 = self.qexecute("INSERT Personne X: X nom 'tag'")[0][0] |
1476 self.commit() |
|
1477 rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'bidule'}) |
1509 rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'bidule'}) |
1478 self.assertEqual(rset.rows, [[eid1]]) |
1510 self.assertEqual(rset.rows, [[eid1]]) |
1479 rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'tag'}) |
1511 rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'tag'}) |
1480 self.assertEqual(rset.rows, [[eid2]]) |
1512 self.assertEqual(rset.rows, [[eid2]]) |
1481 |
1513 |
1516 |
1548 |
1517 def test_nonregr_set_datetime(self): |
1549 def test_nonregr_set_datetime(self): |
1518 # huum, psycopg specific |
1550 # huum, psycopg specific |
1519 self.qexecute('SET X creation_date %(date)s WHERE X eid 1', {'date': date.today()}) |
1551 self.qexecute('SET X creation_date %(date)s WHERE X eid 1', {'date': date.today()}) |
1520 |
1552 |
1521 def test_nonregr_set_query(self): |
|
1522 ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] |
|
1523 self.execute("SET E in_group G, E firstname %(firstname)s, E surname %(surname)s " |
|
1524 "WHERE E eid %(x)s, G name 'users'", |
|
1525 {'x':ueid, 'firstname': u'jean', 'surname': u'paul'}) |
|
1526 |
|
1527 def test_nonregr_u_owned_by_u(self): |
1553 def test_nonregr_u_owned_by_u(self): |
1528 ueid = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G " |
1554 ueid = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G " |
1529 "WHERE G name 'users'")[0][0] |
1555 "WHERE G name 'users'")[0][0] |
1530 rset = self.qexecute("CWUser U") |
1556 rset = self.qexecute("CWUser U") |
1531 self.assertEqual(len(rset), 3) # bob + admin + anon |
1557 self.assertEqual(len(rset), 3) # bob + admin + anon |
1567 |
1593 |
1568 def test_nonregr_has_text_ambiguity_1(self): |
1594 def test_nonregr_has_text_ambiguity_1(self): |
1569 peid = self.qexecute("INSERT CWUser X: X login 'bidule', X upassword 'bidule', " |
1595 peid = self.qexecute("INSERT CWUser X: X login 'bidule', X upassword 'bidule', " |
1570 "X in_group G WHERE G name 'users'")[0][0] |
1596 "X in_group G WHERE G name 'users'")[0][0] |
1571 aeid = self.qexecute("INSERT Affaire X: X ref 'bidule'")[0][0] |
1597 aeid = self.qexecute("INSERT Affaire X: X ref 'bidule'")[0][0] |
1572 self.commit() |
|
1573 rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule"') |
1598 rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule"') |
1574 self.assertEqual(rset.rows, [[peid]]) |
1599 self.assertEqual(rset.rows, [[peid]]) |
1575 rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule", ' |
1600 rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule", ' |
1576 'X in_state S, S name SN') |
1601 'X in_state S, S name SN') |
1577 self.assertEqual(rset.rows, [[peid]]) |
1602 self.assertEqual(rset.rows, [[peid]]) |
1586 |
1611 |
1587 |
1612 |
1588 class NonRegressionTC(CubicWebTC): |
1613 class NonRegressionTC(CubicWebTC): |
1589 |
1614 |
1590 def test_has_text_security_cache_bug(self): |
1615 def test_has_text_security_cache_bug(self): |
1591 req = self.request() |
1616 with self.admin_access.repo_cnx() as cnx: |
1592 self.create_user(req, 'user', ('users',)) |
1617 self.create_user(cnx, 'user', ('users',)) |
1593 aff1 = req.create_entity('Societe', nom=u'aff1') |
1618 aff1 = cnx.create_entity('Societe', nom=u'aff1') |
1594 aff2 = req.create_entity('Societe', nom=u'aff2') |
1619 aff2 = cnx.create_entity('Societe', nom=u'aff2') |
1595 self.commit() |
1620 cnx.commit() |
1596 with self.login('user', password='user'): |
1621 with self.new_access('user').repo_cnx() as cnx: |
1597 res = self.execute('Any X WHERE X has_text %(text)s', {'text': 'aff1'}) |
1622 res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': 'aff1'}) |
1598 self.assertEqual(res.rows, [[aff1.eid]]) |
1623 self.assertEqual(res.rows, [[aff1.eid]]) |
1599 res = self.execute('Any X WHERE X has_text %(text)s', {'text': 'aff2'}) |
1624 res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': 'aff2'}) |
1600 self.assertEqual(res.rows, [[aff2.eid]]) |
1625 self.assertEqual(res.rows, [[aff2.eid]]) |
1601 |
1626 |
1602 def test_set_relations_eid(self): |
1627 def test_set_relations_eid(self): |
1603 req = self.request() |
1628 with self.admin_access.repo_cnx() as cnx: |
1604 # create 3 email addresses |
1629 # create 3 email addresses |
1605 a1 = req.create_entity('EmailAddress', address=u'a1') |
1630 a1 = cnx.create_entity('EmailAddress', address=u'a1') |
1606 a2 = req.create_entity('EmailAddress', address=u'a2') |
1631 a2 = cnx.create_entity('EmailAddress', address=u'a2') |
1607 a3 = req.create_entity('EmailAddress', address=u'a3') |
1632 a3 = cnx.create_entity('EmailAddress', address=u'a3') |
1608 # SET relations using '>=' operator on eids |
1633 # SET relations using '>=' operator on eids |
1609 req.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid) |
1634 cnx.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid) |
1610 self.assertEqual( |
1635 self.assertEqual( |
1611 [[a2.eid], [a3.eid]], |
1636 [[a2.eid], [a3.eid]], |
1612 req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) |
1637 cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) |
1613 # DELETE |
1638 # DELETE |
1614 req.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid) |
1639 cnx.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid) |
1615 self.assertEqual( |
1640 self.assertEqual( |
1616 [[a2.eid]], |
1641 [[a2.eid]], |
1617 req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) |
1642 cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) |
1618 req.execute('DELETE U use_email A WHERE U login "admin"') |
1643 cnx.execute('DELETE U use_email A WHERE U login "admin"') |
1619 # SET relations using '<' operator on eids |
1644 # SET relations using '<' operator on eids |
1620 req.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid) |
1645 cnx.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid) |
1621 self.assertEqual( |
1646 self.assertEqual( |
1622 [[a1.eid]], |
1647 [[a1.eid]], |
1623 req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) |
1648 cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) |
1624 |
1649 |
1625 if __name__ == '__main__': |
1650 if __name__ == '__main__': |
1626 unittest_main() |
1651 unittest_main() |