# HG changeset patch # User Aurelien Campeas # Date 1401881304 -7200 # Node ID 5ef9dd383ae25bd2b15fdb2e7a5a3fa442cbe21c # Parent b18ef631e72c95e32fbf931a01e0c8ad6e6890a6 [tests/rqlannotation,querier] use the new connection api diff -r b18ef631e72c -r 5ef9dd383ae2 devtools/repotest.py --- a/devtools/repotest.py Wed Jul 02 17:31:54 2014 +0200 +++ b/devtools/repotest.py Wed Jun 04 13:28:24 2014 +0200 @@ -157,8 +157,9 @@ def setUp(self): self.repo = FakeRepo(self.schema, config=FakeConfig(apphome=self.datadir)) self.repo.system_source = mock_object(dbdriver=self.backend) - self.rqlhelper = RQLHelper(self.schema, special_relations={'eid': 'uid', - 'has_text': 'fti'}, + self.rqlhelper = RQLHelper(self.schema, + special_relations={'eid': 'uid', + 'has_text': 'fti'}, backend=self.backend) self.qhelper = QuerierHelper(self.repo, self.schema) ExecutionPlan._check_permissions = _dummy_check_permissions @@ -230,7 +231,7 @@ rqlhelper._analyser.uid_func_mapping = {} return rqlhelper - def _prepare_plan(self, rql, kwargs=None, simplify=True): + def _prepare_plan(self, cnx, rql, kwargs=None, simplify=True): rqlhelper = self._rqlhelper() rqlst = rqlhelper.parse(rql) rqlhelper.compute_solutions(rqlst, kwargs=kwargs) @@ -238,10 +239,10 @@ rqlhelper.simplify(rqlst) for select in rqlst.children: select.solutions.sort() - return self.o.plan_factory(rqlst, kwargs, self.session) + return self.o.plan_factory(rqlst, kwargs, cnx) - def _prepare(self, rql, kwargs=None): - plan = self._prepare_plan(rql, kwargs, simplify=False) + def _prepare(self, cnx, rql, kwargs=None): + plan = self._prepare_plan(cnx, rql, kwargs, simplify=False) plan.preprocess(plan.rqlst) rqlst = plan.rqlst.children[0] rqlst.solutions = remove_unused_solutions(rqlst, rqlst.solutions, {}, self.repo.schema)[0] diff -r b18ef631e72c -r 5ef9dd383ae2 server/test/unittest_querier.py --- a/server/test/unittest_querier.py Wed Jul 02 17:31:54 2014 +0200 +++ b/server/test/unittest_querier.py Wed Jun 04 13:28:24 2014 +0200 @@ -112,20 +112,21 @@ def test_preprocess_1(self): with self.session.new_cnx() as cnx: reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] - rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', + rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid}) self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}], rqlst.solutions) def test_preprocess_2(self): - teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0] - #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] - #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", - # {'g': geid, 't': teid}, 'g') - rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) - # the query may be optimized, should keep only one solution - # (any one, etype will be discarded) - self.assertEqual(1, len(rqlst.solutions)) + with self.session.new_cnx() as cnx: + teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0] + #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] + #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", + # {'g': geid, 't': teid}, 'g') + rqlst = self._prepare(cnx, 'Any X WHERE E eid %(x)s, E tags X', {'x': teid}) + # the query may be optimized, should keep only one solution + # (any one, etype will be discarded) + self.assertEqual(1, len(rqlst.solutions)) def assertRQLEqual(self, expected, got): from rql import parse @@ -133,114 +134,117 @@ unicode(parse(got))) def test_preprocess_security(self): - plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' - 'WHERE X is ET, ET name ETN') - plan.cnx = self.user_groups_session('users') - union = plan.rqlst - plan.preprocess(union) - self.assertEqual(len(union.children), 1) - self.assertEqual(len(union.children[0].with_), 1) - subq = union.children[0].with_[0].query - self.assertEqual(len(subq.children), 4) - self.assertEqual([t.as_string() for t in union.children[0].selection], - ['ETN','COUNT(X)']) - self.assertEqual([t.as_string() for t in union.children[0].groupby], - ['ETN']) - partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) - rql, solutions = partrqls[0] - self.assertRQLEqual(rql, - 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' - ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, ' - ' X identity D, C is Division, D is Affaire))' - ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, ' - ' X identity H, H is Affaire)))' - ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, ' - ' X identity I, I is Affaire)))' - ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, ' - ' X identity J, J is Affaire)))' - ', ET is CWEType, X is Affaire') - self.assertEqual(solutions, [{'C': 'Division', - 'D': 'Affaire', - 'E': 'Note', - 'F': 'Societe', - 'G': 'SubDivision', - 'H': 'Affaire', - 'I': 'Affaire', - 'J': 'Affaire', - 'X': 'Affaire', - 'ET': 'CWEType', 'ETN': 'String'}]) - rql, solutions = partrqls[1] - self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' - 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, ' - ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, ' - ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, ' - ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, ' - ' Old, Personne, RQLExpression, Societe, State, SubDivision, ' - ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') - self.assertListEqual(sorted(solutions), - sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'}, - {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])) - rql, solutions = partrqls[2] - self.assertEqual(rql, - 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), ' - 'ET is CWEType, X is EmailAddress') - self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}]) - rql, solutions = partrqls[3] - self.assertEqual(rql, - 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), ' - 'ET is CWEType, X is Basket') - self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}]) + s = self.user_groups_session('users') + with s.new_cnx() as cnx: + plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN ' + 'WHERE X is ET, ET name ETN') + union = plan.rqlst + plan.preprocess(union) + self.assertEqual(len(union.children), 1) + self.assertEqual(len(union.children[0].with_), 1) + subq = union.children[0].with_[0].query + self.assertEqual(len(subq.children), 4) + self.assertEqual([t.as_string() for t in union.children[0].selection], + ['ETN','COUNT(X)']) + self.assertEqual([t.as_string() for t in union.children[0].groupby], + ['ETN']) + partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) + rql, solutions = partrqls[0] + self.assertRQLEqual(rql, + 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' + ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, ' + ' X identity D, C is Division, D is Affaire))' + ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, ' + ' X identity H, H is Affaire)))' + ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, ' + ' X identity I, I is Affaire)))' + ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, ' + ' X identity J, J is Affaire)))' + ', ET is CWEType, X is Affaire') + self.assertEqual(solutions, [{'C': 'Division', + 'D': 'Affaire', + 'E': 'Note', + 'F': 'Societe', + 'G': 'SubDivision', + 'H': 'Affaire', + 'I': 'Affaire', + 'J': 'Affaire', + 'X': 'Affaire', + 'ET': 'CWEType', 'ETN': 'String'}]) + rql, solutions = partrqls[1] + self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' + 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, ' + ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, ' + ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, ' + ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, ' + ' Old, Personne, RQLExpression, Societe, State, SubDivision, ' + ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') + self.assertListEqual(sorted(solutions), + sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Card', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Comment', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Division', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWCache', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWConstraint', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWConstraintType', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWEType', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWAttribute', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWGroup', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWRelation', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWPermission', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWProperty', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWRType', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWSource', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWUniqueTogetherConstraint', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'CWUser', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Email', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'EmailPart', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'EmailThread', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'ExternalUri', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'File', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Folder', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Note', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Old', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Personne', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'RQLExpression', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Societe', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'State', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'SubDivision', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'SubWorkflowExitPoint', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Tag', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Transition', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'TrInfo', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'Workflow', 'ETN': 'String', 'ET': 'CWEType'}, + {'X': 'WorkflowTransition', 'ETN': 'String', 'ET': 'CWEType'}])) + rql, solutions = partrqls[2] + self.assertEqual(rql, + 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(%(D)s use_email X), ' + 'ET is CWEType, X is EmailAddress') + self.assertEqual(solutions, [{'X': 'EmailAddress', 'ET': 'CWEType', 'ETN': 'String'}]) + rql, solutions = partrqls[3] + self.assertEqual(rql, + 'Any ETN,X WHERE X is ET, ET name ETN, EXISTS(X owned_by %(C)s), ' + 'ET is CWEType, X is Basket') + self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}]) def test_preprocess_security_aggregat(self): - plan = self._prepare_plan('Any MAX(X)') - plan.cnx = self.user_groups_session('users') - union = plan.rqlst - plan.preprocess(union) - self.assertEqual(len(union.children), 1) - self.assertEqual(len(union.children[0].with_), 1) - subq = union.children[0].with_[0].query - self.assertEqual(len(subq.children), 4) - self.assertEqual([t.as_string() for t in union.children[0].selection], - ['MAX(X)']) + s = self.user_groups_session('users') + with s.new_cnx() as cnx: + plan = self._prepare_plan(cnx, 'Any MAX(X)') + union = plan.rqlst + plan.preprocess(union) + self.assertEqual(len(union.children), 1) + self.assertEqual(len(union.children[0].with_), 1) + subq = union.children[0].with_[0].query + self.assertEqual(len(subq.children), 4) + self.assertEqual([t.as_string() for t in union.children[0].selection], + ['MAX(X)']) def test_preprocess_nonregr(self): - rqlst = self._prepare('Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') - self.assertEqual(len(rqlst.solutions), 1) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') + self.assertEqual(len(rqlst.solutions), 1) def test_build_description(self): # should return an empty result set diff -r b18ef631e72c -r 5ef9dd383ae2 server/test/unittest_rqlannotation.py --- a/server/test/unittest_rqlannotation.py Wed Jul 02 17:31:54 2014 +0200 +++ b/server/test/unittest_rqlannotation.py Wed Jun 04 13:28:24 2014 +0200 @@ -1,5 +1,5 @@ # -*- coding: iso-8859-1 -*- -# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -21,340 +21,424 @@ from cubicweb.devtools import TestServerConfiguration, get_test_db_handler from cubicweb.devtools.repotest import BaseQuerierTC - -def setUpModule(*args): - handler = get_test_db_handler(TestServerConfiguration( - 'data2', apphome=SQLGenAnnotatorTC.datadir)) - handler.build_db_cache() - global repo, cnx - repo, cnx = handler.get_repo_and_cnx() - -def tearDownModule(*args): - global repo, cnx - del repo, cnx - - class SQLGenAnnotatorTC(BaseQuerierTC): def setUp(self): + handler = get_test_db_handler(TestServerConfiguration( + 'data2', apphome=SQLGenAnnotatorTC.datadir)) + handler.build_db_cache() + repo, _cnx = handler.get_repo_and_cnx() self.__class__.repo = repo super(SQLGenAnnotatorTC, self).setUp() def get_max_eid(self): # no need for cleanup here return None + def cleanup(self): # no need for cleanup here pass def test_0_1(self): - rqlst = self._prepare('Any SEN,RN,OEN WHERE X from_entity SE, SE eid 44, X relation_type R, R eid 139, X to_entity OE, OE eid 42, R name RN, SE name SEN, OE name OEN') - self.assertEqual(rqlst.defined_vars['SE']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['OE']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['R']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['SE'].stinfo['attrvar'], None) - self.assertEqual(rqlst.defined_vars['OE'].stinfo['attrvar'], None) - self.assertEqual(rqlst.defined_vars['R'].stinfo['attrvar'], None) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any SEN,RN,OEN WHERE X from_entity SE, ' + 'SE eid 44, X relation_type R, R eid 139, ' + 'X to_entity OE, OE eid 42, R name RN, SE name SEN, ' + 'OE name OEN') + self.assertEqual(rqlst.defined_vars['SE']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['OE']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['R']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['SE'].stinfo['attrvar'], None) + self.assertEqual(rqlst.defined_vars['OE'].stinfo['attrvar'], None) + self.assertEqual(rqlst.defined_vars['R'].stinfo['attrvar'], None) def test_0_2(self): - rqlst = self._prepare('Any O WHERE NOT S ecrit_par O, S eid 1, S inline1 P, O inline2 P') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['O'].stinfo['attrvar'], None) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any O WHERE NOT S ecrit_par O, S eid 1, ' + 'S inline1 P, O inline2 P') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['O'].stinfo['attrvar'], None) def test_0_4(self): - rqlst = self._prepare('Any A,B,C WHERE A eid 12,A comment B, A ?wf_info_for C') - self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) - self.assert_(rqlst.defined_vars['B'].stinfo['attrvar']) - self.assertEqual(rqlst.defined_vars['C']._q_invariant, False) - self.assertEqual(rqlst.solutions, [{'A': 'TrInfo', 'B': 'String', 'C': 'Affaire'}, - {'A': 'TrInfo', 'B': 'String', 'C': 'CWUser'}, - {'A': 'TrInfo', 'B': 'String', 'C': 'Note'}]) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any A,B,C WHERE A eid 12,A comment B, ' + 'A ?wf_info_for C') + self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) + self.assert_(rqlst.defined_vars['B'].stinfo['attrvar']) + self.assertEqual(rqlst.defined_vars['C']._q_invariant, False) + self.assertEqual(rqlst.solutions, [{'A': 'TrInfo', 'B': 'String', 'C': 'Affaire'}, + {'A': 'TrInfo', 'B': 'String', 'C': 'CWUser'}, + {'A': 'TrInfo', 'B': 'String', 'C': 'Note'}]) def test_0_5(self): - rqlst = self._prepare('Any P WHERE N ecrit_par P, N eid 0') - self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any P WHERE N ecrit_par P, N eid 0') + self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) def test_0_6(self): - rqlst = self._prepare('Any P WHERE NOT N ecrit_par P, N eid 512') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any P WHERE NOT N ecrit_par P, N eid 512') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) def test_0_7(self): - rqlst = self._prepare('Personne X,Y where X nom NX, Y nom NX, X eid XE, not Y eid XE') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) - self.assert_(rqlst.defined_vars['XE'].stinfo['attrvar']) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Personne X,Y where X nom NX, ' + 'Y nom NX, X eid XE, not Y eid XE') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + self.assert_(rqlst.defined_vars['XE'].stinfo['attrvar']) def test_0_8(self): - rqlst = self._prepare('Any P WHERE X eid 0, NOT X connait P') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) - #self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) - self.assertEqual(len(rqlst.solutions), 1, rqlst.solutions) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any P WHERE X eid 0, NOT X connait P') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) + #self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + self.assertEqual(len(rqlst.solutions), 1, rqlst.solutions) def test_0_10(self): - rqlst = self._prepare('Any X WHERE X concerne Y, Y is Note') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X concerne Y, Y is Note') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_0_11(self): - rqlst = self._prepare('Any X WHERE X todo_by Y, X is Affaire') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X todo_by Y, X is Affaire') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_0_12(self): - rqlst = self._prepare('Personne P WHERE P concerne A, A concerne S, S nom "Logilab"') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Personne P WHERE P concerne A, ' + 'A concerne S, S nom "Logilab"') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) def test_1_0(self): - rqlst = self._prepare('Any X,Y WHERE X created_by Y, X eid 5, NOT Y eid 6') - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,Y WHERE X created_by Y, ' + 'X eid 5, NOT Y eid 6') + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_1_1(self): - rqlst = self._prepare('Any X,Y WHERE X created_by Y, X eid 5, NOT Y eid IN (6,7)') - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,Y WHERE X created_by Y, X eid 5, ' + 'NOT Y eid IN (6,7)') + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_2(self): - rqlst = self._prepare('Any X WHERE X identity Y, Y eid 1') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X identity Y, Y eid 1') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_7(self): - rqlst = self._prepare('Personne X,Y where X nom NX, Y nom NX, X eid XE, not Y eid XE') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Personne X,Y where X nom NX, Y nom NX, ' + 'X eid XE, not Y eid XE') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_8(self): - # DISTINCT Any P WHERE P require_group %(g)s, NOT %(u)s has_group_permission P, P is CWPermission - rqlst = self._prepare('DISTINCT Any X WHERE A concerne X, NOT N migrated_from X, ' - 'X is Note, N eid 1') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + with self.session.new_cnx() as cnx: + # DISTINCT Any P WHERE P require_group %(g)s, + # NOT %(u)s has_group_permission P, P is CWPermission + rqlst = self._prepare(cnx, 'DISTINCT Any X WHERE A concerne X, ' + 'NOT N migrated_from X, ' + 'X is Note, N eid 1') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_diff_scope_identity_deamb(self): - rqlst = self._prepare('Any X WHERE X concerne Y, Y is Note, EXISTS(Y identity Z, Z migrated_from N)') - self.assertEqual(rqlst.defined_vars['Z']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X concerne Y, Y is Note, ' + 'EXISTS(Y identity Z, Z migrated_from N)') + self.assertEqual(rqlst.defined_vars['Z']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_optional_inlined(self): - rqlst = self._prepare('Any X,S where X from_state S?') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,S where X from_state S?') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) def test_optional_inlined_2(self): - rqlst = self._prepare('Any N,A WHERE N? inline1 A') - self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any N,A WHERE N? inline1 A') + self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) def test_optional_1(self): - rqlst = self._prepare('Any X,S WHERE X travaille S?') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,S WHERE X travaille S?') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) def test_greater_eid(self): - rqlst = self._prepare('Any X WHERE X eid > 5') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X eid > 5') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_greater_eid_typed(self): - rqlst = self._prepare('Any X WHERE X eid > 5, X is Note') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X eid > 5, X is Note') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_max_eid(self): - rqlst = self._prepare('Any MAX(X)') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any MAX(X)') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_max_eid_typed(self): - rqlst = self._prepare('Any MAX(X) WHERE X is Note') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any MAX(X) WHERE X is Note') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_all_entities(self): - rqlst = self._prepare('Any X') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_all_typed_entity(self): - rqlst = self._prepare('Any X WHERE X is Note') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X is Note') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_has_text_1(self): - rqlst = self._prepare('Any X WHERE X has_text "toto tata"') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'has_text') + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X has_text "toto tata"') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, + 'has_text') def test_has_text_2(self): - rqlst = self._prepare('Any X WHERE X is Personne, X has_text "coucou"') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'has_text') + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X is Personne, ' + 'X has_text "coucou"') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, + 'has_text') def test_not_relation_1(self): - # P can't be invariant since deambiguification caused by "NOT X require_permission P" - # is not considered by generated sql (NOT EXISTS(...)) - rqlst = self._prepare('Any P,G WHERE P require_group G, NOT X require_permission P') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['G']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + # P can't be invariant since deambiguification caused by "NOT X require_permission P" + # is not considered by generated sql (NOT EXISTS(...)) + rqlst = self._prepare(cnx, 'Any P,G WHERE P require_group G, ' + 'NOT X require_permission P') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['G']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_not_relation_2(self): - rqlst = self._prepare('TrInfo X WHERE X eid 2, NOT X from_state Y, Y is State') - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'TrInfo X WHERE X eid 2, ' + 'NOT X from_state Y, Y is State') + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_not_relation_3(self): - rqlst = self._prepare('Any X, Y WHERE X eid 1, Y eid in (2, 3)') - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X, Y WHERE X eid 1, Y eid in (2, 3)') + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_4_1(self): - rqlst = self._prepare('Note X WHERE NOT Y evaluee X') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Note X WHERE NOT Y evaluee X') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_not_relation_4_2(self): - rqlst = self._prepare('Any X WHERE NOT Y evaluee X') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_not_relation_4_3(self): - rqlst = self._prepare('Any Y WHERE NOT Y evaluee X') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any Y WHERE NOT Y evaluee X') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_4_4(self): - rqlst = self._prepare('Any X WHERE NOT Y evaluee X, Y is CWUser') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X, Y is CWUser') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_4_5(self): - rqlst = self._prepare('Any X WHERE NOT Y evaluee X, Y eid %s, X is Note' % self.ueid) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.solutions, [{'X': 'Note'}]) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X, ' + 'Y eid %s, X is Note' % self.ueid) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.solutions, [{'X': 'Note'}]) def test_not_relation_5_1(self): - rqlst = self._prepare('Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT X read_permission Y') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", ' + 'Y eid IN(1, 2, 3), NOT X read_permission Y') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_5_2(self): - rqlst = self._prepare('DISTINCT Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT X read_permission Y') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", ' + 'Y eid IN(1, 2, 3), NOT X read_permission Y') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_6(self): - rqlst = self._prepare('Personne P where NOT P concerne A') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Personne P where NOT P concerne A') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) def test_not_relation_7(self): - rqlst = self._prepare('Any K,V WHERE P is CWProperty, P pkey K, P value V, NOT P for_user U') - self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any K,V WHERE P is CWProperty, ' + 'P pkey K, P value V, NOT P for_user U') + self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) def test_exists_1(self): - rqlst = self._prepare('Any U WHERE U eid IN (1,2), EXISTS(X owned_by U)') - self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any U WHERE U eid IN (1,2), EXISTS(X owned_by U)') + self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_exists_2(self): - rqlst = self._prepare('Any U WHERE EXISTS(U eid IN (1,2), X owned_by U)') - self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(U eid IN (1,2), X owned_by U)') + self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_exists_3(self): - rqlst = self._prepare('Any U WHERE EXISTS(X owned_by U, X bookmarked_by U)') - self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(X owned_by U, X bookmarked_by U)') + self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_exists_4(self): - rqlst = self._prepare('Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), EXISTS(X read_permission Y)') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", ' + 'Y eid IN(1, 2, 3), EXISTS(X read_permission Y)') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_exists_5(self): - rqlst = self._prepare('DISTINCT Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), EXISTS(X read_permission Y)') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", ' + 'Y eid IN(1, 2, 3), EXISTS(X read_permission Y)') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_not_exists_1(self): - rqlst = self._prepare('Any U WHERE NOT EXISTS(X owned_by U, X bookmarked_by U)') - self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any U WHERE NOT EXISTS(X owned_by U, ' + 'X bookmarked_by U)') + self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_not_exists_2(self): - rqlst = self._prepare('Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)') - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", ' + 'Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)') + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_exists_distinct_1(self): - rqlst = self._prepare('DISTINCT Any X,Y WHERE X name "CWGroup", Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)') - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", ' + 'Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)') + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_or_1(self): - rqlst = self._prepare('Any X WHERE X concerne B OR C concerne X, B eid 12, C eid 13') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X concerne B OR ' + 'C concerne X, B eid 12, C eid 13') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_or_2(self): - rqlst = self._prepare('Any X WHERE X created_by U, X concerne B OR C concerne X, B eid 12, C eid 13') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'created_by') + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X created_by U, X concerne B OR ' + 'C concerne X, B eid 12, C eid 13') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'created_by') def test_or_3(self): - rqlst = self._prepare('Any N WHERE A evaluee N or EXISTS(N todo_by U)') - self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) - self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any N WHERE A evaluee N or EXISTS(N todo_by U)') + self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) + self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) def test_or_exists_1(self): - # query generated by security rewriting - rqlst = self._prepare('DISTINCT Any A,S WHERE A is Affaire, S nom "chouette", S is IN(Division, Societe, SubDivision),' - '(EXISTS(A owned_by D)) ' - 'OR ((((EXISTS(E concerne C?, C owned_by D, A identity E, C is Note, E is Affaire)) ' - 'OR (EXISTS(I concerne H?, H owned_by D, H is Societe, A identity I, I is Affaire))) ' - 'OR (EXISTS(J concerne G?, G owned_by D, G is SubDivision, A identity J, J is Affaire))) ' - 'OR (EXISTS(K concerne F?, F owned_by D, F is Division, A identity K, K is Affaire)))') - self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) + with self.session.new_cnx() as cnx: + # query generated by security rewriting + rqlst = self._prepare(cnx, 'DISTINCT Any A,S WHERE A is Affaire, S nom "chouette", ' + 'S is IN(Division, Societe, SubDivision),' + '(EXISTS(A owned_by D)) ' + 'OR ((((EXISTS(E concerne C?, C owned_by D, A identity E, ' + ' C is Note, E is Affaire)) ' + 'OR (EXISTS(I concerne H?, H owned_by D, H is Societe, ' + ' A identity I, I is Affaire))) ' + 'OR (EXISTS(J concerne G?, G owned_by D, G is SubDivision, ' + ' A identity J, J is Affaire))) ' + 'OR (EXISTS(K concerne F?, F owned_by D, F is Division, ' + ' A identity K, K is Affaire)))') + self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) def test_or_exists_2(self): - rqlst = self._prepare('Any U WHERE EXISTS(U in_group G, G name "managers") OR EXISTS(X owned_by U, X bookmarked_by U)') - self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['G']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(U in_group G, G name "managers") OR ' + 'EXISTS(X owned_by U, X bookmarked_by U)') + self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['G']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_or_exists_3(self): - rqlst = self._prepare('Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 ' - 'WHERE C is Societe, S concerne C, C nom CS, ' - '(EXISTS(S owned_by D)) OR (EXISTS(S documented_by N, N title "published"))') - self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) - rqlst = self._prepare('Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 ' - 'WHERE S is Affaire, C is Societe, S concerne C, C nom CS, ' - '(EXISTS(S owned_by D)) OR (EXISTS(S documented_by N, N title "published"))') - self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 ' + 'WHERE C is Societe, S concerne C, C nom CS, ' + '(EXISTS(S owned_by D)) OR (EXISTS(S documented_by N, N title "published"))') + self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) + rqlst = self._prepare(cnx, 'Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 ' + 'WHERE S is Affaire, C is Societe, S concerne C, C nom CS, ' + '(EXISTS(S owned_by D)) OR (EXISTS(S documented_by N, N title "published"))') + self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) def test_nonregr_ambiguity(self): - rqlst = self._prepare('Note N WHERE N attachment F') - # N may be an image as well, not invariant - self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['F']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Note N WHERE N attachment F') + # N may be an image as well, not invariant + self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['F']._q_invariant, True) def test_nonregr_ambiguity_2(self): - rqlst = self._prepare('Any S,SN WHERE X has_text "tot", X in_state S, S name SN, X is CWUser') - # X use has_text but should not be invariant as ambiguous, and has_text - # may not be its principal - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any S,SN WHERE X has_text "tot", X in_state S, S name SN, X is CWUser') + # X use has_text but should not be invariant as ambiguous, and has_text + # may not be its principal + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) def test_remove_from_deleted_source_1(self): - rqlst = self._prepare('Note X WHERE X eid 999998, NOT X cw_source Y') - self.assertNotIn('X', rqlst.defined_vars) # simplified - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Note X WHERE X eid 999998, NOT X cw_source Y') + self.assertNotIn('X', rqlst.defined_vars) # simplified + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_remove_from_deleted_source_2(self): - rqlst = self._prepare('Note X WHERE X eid IN (999998, 999999), NOT X cw_source Y') - self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) - self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) - + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Note X WHERE X eid IN (999998, 999999), NOT X cw_source Y') + self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) + self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_has_text_security_cache_bug(self): - rqlst = self._prepare('Any X WHERE X has_text "toto" WITH X BEING ' - '(Any C WHERE C is Societe, C nom CS)') - self.assertTrue(rqlst.parent.has_text_query) + with self.session.new_cnx() as cnx: + rqlst = self._prepare(cnx, 'Any X WHERE X has_text "toto" WITH X BEING ' + '(Any C WHERE C is Societe, C nom CS)') + self.assertTrue(rqlst.parent.has_text_query) if __name__ == '__main__': from logilab.common.testlib import unittest_main