# HG changeset patch # User Sylvain Thénault # Date 1489164764 -3600 # Node ID 7514626e1dc580deb80cf10ee7b5965e1c48ab9a # Parent 3c60180481ac50dede6b764b6bde1527a8a09ec5 [session+test] Stop storing / accessing session when it's not necessary diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/devtools/repotest.py --- a/cubicweb/devtools/repotest.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/devtools/repotest.py Fri Mar 10 17:52:44 2017 +0100 @@ -21,9 +21,9 @@ """ from __future__ import print_function +from contextlib import contextmanager from pprint import pprint -from logilab.common.decorators import cachedproperty from logilab.common.testlib import SkipTest from cubicweb.devtools.testlib import RepoAccess @@ -39,7 +39,7 @@ def check_plan(self, rql, expected, kwargs=None): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: plan = self._prepare_plan(cnx, rql, kwargs) self.planner.build_plan(plan) try: @@ -138,7 +138,6 @@ from cubicweb.devtools.fake import FakeRepo, FakeConfig, FakeRequest, FakeConnection from cubicweb.server import set_debug, debugged from cubicweb.server.querier import QuerierHelper -from cubicweb.server.session import Session from cubicweb.server.sources.rql2sql import SQLGenerator, remove_unused_solutions class RQLGeneratorTC(BaseTestCase): @@ -193,14 +192,10 @@ class BaseQuerierTC(TestCase): repo = None # set this in concrete class - @cachedproperty - def session(self): - return self._access._session - def setUp(self): self.o = self.repo.querier self._access = RepoAccess(self.repo, 'admin', FakeRequest) - self.ueid = self.session.user.eid + self.ueid = self._access._user.eid assert self.ueid != -1 self.repo._type_cache = {} # clear cache self.maxeid = self.get_max_eid() @@ -208,18 +203,18 @@ self._dumb_sessions = [] def get_max_eid(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: return cnx.execute('Any MAX(X)')[0][0] def cleanup(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: cnx.execute('DELETE Any X WHERE X eid > %s' % self.maxeid) cnx.commit() def tearDown(self): undo_monkey_patch() self.cleanup() - assert self.session.user.eid != -1 + assert self._access._user.eid != -1 def set_debug(self, debug): set_debug(debug) @@ -250,17 +245,17 @@ rqlst.solutions = remove_unused_solutions(rqlst, rqlst.solutions, self.repo.schema)[0] return rqlst + @contextmanager def user_groups_session(self, *groups): """lightweight session using the current user with hi-jacked groups""" - # use self.session.user.eid to get correct owned_by relation, unless explicit eid - with self.session.new_cnx() as cnx: - user_eid = self.session.user.eid - session = Session(self.repo._build_user(cnx, user_eid), self.repo) - session.data['%s-groups' % user_eid] = set(groups) - return session + # use cnx.user.eid to get correct owned_by relation, unless explicit eid + with self._access.cnx() as cnx: + user_eid = cnx.user.eid + cnx.user._cw.data['groups-%s' % user_eid] = set(groups) + yield cnx def qexecute(self, rql, args=None, build_descr=True): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: try: return self.o.execute(cnx, rql, args, build_descr) finally: diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/devtools/testlib.py --- a/cubicweb/devtools/testlib.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/devtools/testlib.py Fri Mar 10 17:52:44 2017 +0100 @@ -309,7 +309,6 @@ cls.config.mode = 'test' def __init__(self, *args, **kwargs): - self._admin_session = None self.repo = None self._open_access = set() super(CubicWebTC, self).__init__(*args, **kwargs) @@ -340,11 +339,6 @@ except BadConnectionId: continue # already closed - @property - def session(self): - """return admin session""" - return self._admin_session - def _init_repo(self): """init the repository and connection to it. """ @@ -356,7 +350,6 @@ # get an admin session (without actual login) login = text_type(db_handler.config.default_admin_config['login']) self.admin_access = self.new_access(login) - self._admin_session = self.admin_access._session # config management ######################################################## @@ -432,9 +425,6 @@ MAILBOX[:] = [] # reset mailbox def tearDown(self): - # XXX hack until logilab.common.testlib is fixed - if self._admin_session is not None: - self._admin_session = None while self._cleanups: cleanup, args, kwargs = self._cleanups.pop(-1) cleanup(*args, **kwargs) diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/server/migractions.py --- a/cubicweb/server/migractions.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/server/migractions.py Fri Mar 10 17:52:44 2017 +0100 @@ -96,12 +96,9 @@ assert repo self.cnx = cnx self.repo = repo - self.session = cnx.session elif connect: self.repo = config.repository() self.set_cnx() - else: - self.session = None # no config on shell to a remote instance if config is not None and (cnx or connect): repo = self.repo @@ -154,7 +151,6 @@ except (KeyboardInterrupt, EOFError): print('aborting...') sys.exit(0) - self.session = self.cnx.session def cube_upgraded(self, cube, version): self.cmd_set_property('system.version.%s' % cube.lower(), diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/server/session.py --- a/cubicweb/server/session.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/server/session.py Fri Mar 10 17:52:44 2017 +0100 @@ -291,12 +291,12 @@ @_open_only def get_schema(self): """Return the schema currently used by the repository.""" - return self.session.repo.source_defs() + return self.repo.source_defs() @_open_only def get_option_value(self, option): """Return the value for `option` in the configuration.""" - return self.session.repo.get_option_value(option) + return self.repo.get_option_value(option) # transaction api diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/server/test/unittest_postgres.py --- a/cubicweb/server/test/unittest_postgres.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/server/test/unittest_postgres.py Fri Mar 10 17:52:44 2017 +0100 @@ -52,7 +52,7 @@ def test_eid_range(self): # concurrent allocation of eid ranges - source = self.session.repo.sources_by_uri['system'] + source = self.repo.sources_by_uri['system'] range1 = [] range2 = [] def allocate_eid_ranges(session, target): diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/server/test/unittest_querier.py --- a/cubicweb/server/test/unittest_querier.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/server/test/unittest_querier.py Fri Mar 10 17:52:44 2017 +0100 @@ -120,7 +120,7 @@ pass def test_preprocess_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] rqlst = self._prepare(cnx, 'Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid}) @@ -128,7 +128,7 @@ rqlst.solutions) def test_preprocess_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: teid = cnx.execute("INSERT Tag X: X name 'tag'")[0][0] #geid = self.execute("CWGroup G WHERE G name 'users'")[0][0] #self.execute("SET X tags Y WHERE X eid %(t)s, Y eid %(g)s", @@ -144,8 +144,7 @@ text_type(parse(got))) def test_preprocess_security(self): - s = self.user_groups_session('users') - with s.new_cnx() as cnx: + with self.user_groups_session('users') as cnx: plan = self._prepare_plan(cnx, 'Any ETN,COUNT(X) GROUPBY ETN ' 'WHERE X is ET, ET name ETN') union = plan.rqlst @@ -241,8 +240,7 @@ self.assertEqual(solutions, [{'X': 'Basket', 'ET': 'CWEType', 'ETN': 'String'}]) def test_preprocess_security_aggregat(self): - s = self.user_groups_session('users') - with s.new_cnx() as cnx: + with self.user_groups_session('users') as cnx: plan = self._prepare_plan(cnx, 'Any MAX(X)') union = plan.rqlst plan.preprocess(union) @@ -254,13 +252,13 @@ ['MAX(X)']) def test_preprocess_nonregr(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any S ORDERBY SI WHERE NOT S ecrit_par O, S para SI') self.assertEqual(len(rqlst.solutions), 1) def test_build_description(self): # should return an empty result set - rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self.session.user.eid}) + rset = self.qexecute('Any X WHERE X eid %(x)s', {'x': self._access._user.eid}) self.assertEqual(rset.description[0][0], 'CWUser') rset = self.qexecute('Any 1') self.assertEqual(rset.description[0][0], 'Int') @@ -289,10 +287,9 @@ self.assertEqual(rset.description[0][0], 'String') def test_build_descr1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rset = cnx.execute('(Any U,L WHERE U login L) UNION ' '(Any G,N WHERE G name N, G is CWGroup)') - # rset.req = self.session orig_length = len(rset) rset.rows[0][0] = 9999999 description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows) @@ -493,7 +490,7 @@ rset = self.qexecute('DISTINCT Any G WHERE U? in_group G') self.assertEqual(len(rset), 4) rset = self.qexecute('DISTINCT Any G WHERE U? in_group G, U eid %(x)s', - {'x': self.session.user.eid}) + {'x': self._access._user.eid}) self.assertEqual(len(rset), 4) def test_select_ambigous_outer_join(self): @@ -687,7 +684,7 @@ self.assertEqual(rset.rows[0][0], 12) ## def test_select_simplified(self): -## ueid = self.session.user.eid +## ueid = self._access._user.eid ## rset = self.qexecute('Any L WHERE %s login L'%ueid) ## self.assertEqual(rset.rows[0][0], 'admin') ## rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid}) @@ -840,7 +837,7 @@ def test_select_explicit_eid(self): rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', - {'u': self.session.user.eid}) + {'u': self._access._user.eid}) self.assertTrue(rset) self.assertEqual(rset.description[0][1], 'Int') @@ -891,7 +888,7 @@ 'Password', 'String', 'TZDatetime', 'TZTime', 'Time']) - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: cnx.create_entity('Personne', nom=u'louis', test=True) self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': True})), 1) self.assertEqual(len(cnx.execute('Any X WHERE X test TRUE')), 1) @@ -936,7 +933,7 @@ '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)') def test_select_union_aggregat_independant_group(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: cnx.execute('INSERT State X: X name "hop"') cnx.execute('INSERT State X: X name "hop"') cnx.execute('INSERT Transition X: X name "hop"') @@ -1185,8 +1182,7 @@ self.assertEqual(len(rset.rows), 0, rset.rows) def test_delete_3(self): - s = self.user_groups_session('users') - with s.new_cnx() as cnx: + with self.user_groups_session('users') as cnx: peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0] seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0] self.o.execute(cnx, "SET P travaille S") @@ -1224,7 +1220,7 @@ eeid, = self.qexecute('INSERT Email X: X messageid "<1234>", X subject "test", ' 'X sender Y, X recipients Y WHERE Y is EmailAddress')[0] self.qexecute("DELETE Email X") - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: sqlc = cnx.cnxset.cu sqlc.execute('SELECT * FROM recipients_relation') self.assertEqual(len(sqlc.fetchall()), 0) @@ -1294,7 +1290,7 @@ self.assertEqual(self.qexecute('Any X WHERE X nom "tutu"').rows, [[peid2]]) def test_update_multiple2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: ueid = cnx.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] peid1 = cnx.execute("INSERT Personne Y: Y nom 'turlu'")[0][0] peid2 = cnx.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] @@ -1375,7 +1371,7 @@ self.assertEqual(rset.description, [('CWUser',)]) self.assertRaises(Unauthorized, self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: cursor = cnx.cnxset.cu cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) @@ -1387,7 +1383,7 @@ self.assertEqual(rset.description, [('CWUser',)]) def test_update_upassword(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) self.assertEqual(rset.description[0][0], 'CWUser') @@ -1495,7 +1491,7 @@ 'creation_date': '2000/07/03 11:00'}) rset = self.qexecute('Any lower(N) ORDERBY LOWER(N) WHERE X is Tag, X name N,' 'X owned_by U, U eid %(x)s', - {'x':self.session.user.eid}) + {'x':self._access._user.eid}) self.assertEqual(rset.rows, [[u'\xe9name0']]) def test_nonregr_description(self): @@ -1543,7 +1539,7 @@ self.qexecute('Any X ORDERBY D DESC WHERE X creation_date D') def test_nonregr_extra_joins(self): - ueid = self.session.user.eid + ueid = self._access._user.eid teid1 = self.qexecute("INSERT Folder X: X name 'folder1'")[0][0] teid2 = self.qexecute("INSERT Folder X: X name 'folder2'")[0][0] neid1 = self.qexecute("INSERT Note X: X para 'note1'")[0][0] diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/server/test/unittest_rqlannotation.py --- a/cubicweb/server/test/unittest_rqlannotation.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/server/test/unittest_rqlannotation.py Fri Mar 10 17:52:44 2017 +0100 @@ -40,7 +40,7 @@ pass def test_0_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any SEN,RN,OEN WHERE X from_entity SE, ' 'SE eid 44, X relation_type R, R eid 139, ' 'X to_entity OE, OE eid 42, R name RN, SE name SEN, ' @@ -53,14 +53,14 @@ self.assertEqual(rqlst.defined_vars['R'].stinfo['attrvar'], None) def test_0_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any O WHERE NOT S ecrit_par O, S eid 1, ' 'S inline1 P, O inline2 P') self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) self.assertEqual(rqlst.defined_vars['O'].stinfo['attrvar'], None) def test_0_4(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any A,B,C WHERE A eid 12,A comment B, ' 'A ?wf_info_for C') self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) @@ -71,18 +71,18 @@ {'A': 'TrInfo', 'B': 'String', 'C': 'Note'}]) def test_0_5(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any P WHERE N ecrit_par P, N eid 0') self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) def test_0_6(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any P WHERE NOT N ecrit_par P, N eid 512') self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) def test_0_7(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Personne X,Y where X nom NX, ' 'Y nom NX, X eid XE, not Y eid XE') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) @@ -90,25 +90,25 @@ self.assertTrue(rqlst.defined_vars['XE'].stinfo['attrvar']) def test_0_8(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any P WHERE X eid 0, NOT X connait P') self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) self.assertEqual(len(rqlst.solutions), 1, rqlst.solutions) def test_0_10(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X concerne Y, Y is Note') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_0_11(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X todo_by Y, X is Affaire') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_0_12(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Personne P WHERE P concerne A, ' 'A concerne S, S nom "Logilab"') self.assertEqual(rqlst.defined_vars['P']._q_invariant, True) @@ -116,31 +116,31 @@ self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) def test_1_0(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,Y WHERE X created_by Y, ' 'X eid 5, NOT Y eid 6') self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_1_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,Y WHERE X created_by Y, X eid 5, ' 'NOT Y eid IN (6,7)') self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X identity Y, Y eid 1') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_7(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Personne X,Y where X nom NX, Y nom NX, ' 'X eid XE, not Y eid XE') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_8(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: # DISTINCT Any P WHERE P require_group %(g)s, # NOT %(u)s has_group_permission P, P is CWPermission rqlst = self._prepare(cnx, 'DISTINCT Any X WHERE A concerne X, ' @@ -149,69 +149,69 @@ self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_diff_scope_identity_deamb(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X concerne Y, Y is Note, ' 'EXISTS(Y identity Z, Z migrated_from N)') self.assertEqual(rqlst.defined_vars['Z']._q_invariant, True) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_optional_inlined(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,S where X from_state S?') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) def test_optional_inlined_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any N,A WHERE N? inline1 A') self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) self.assertEqual(rqlst.defined_vars['A']._q_invariant, False) def test_optional_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,S WHERE X travaille S?') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) def test_greater_eid(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X eid > 5') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_greater_eid_typed(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X eid > 5, X is Note') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_max_eid(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any MAX(X)') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_max_eid_typed(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any MAX(X) WHERE X is Note') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_all_entities(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_all_typed_entity(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X is Note') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_has_text_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X has_text "toto tata"') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'has_text') def test_has_text_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X is Personne, ' 'X has_text "coucou"') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) @@ -219,7 +219,7 @@ 'has_text') def test_not_relation_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: # P can't be invariant since deambiguification caused by "NOT X require_permission P" # is not considered by generated sql (NOT EXISTS(...)) rqlst = self._prepare(cnx, 'Any P,G WHERE P require_group G, ' @@ -229,134 +229,134 @@ self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_not_relation_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'TrInfo X WHERE X eid 2, ' 'NOT X from_state Y, Y is State') self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_not_relation_3(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X, Y WHERE X eid 1, Y eid in (2, 3)') self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_4_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Note X WHERE NOT Y evaluee X') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_not_relation_4_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_not_relation_4_3(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any Y WHERE NOT Y evaluee X') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_4_4(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X, Y is CWUser') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_4_5(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE NOT Y evaluee X, ' 'Y eid %s, X is Note' % self.ueid) self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.solutions, [{'X': 'Note'}]) def test_not_relation_5_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", ' 'Y eid IN(1, 2, 3), NOT X read_permission Y') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_5_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", ' 'Y eid IN(1, 2, 3), NOT X read_permission Y') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_relation_6(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Personne P where NOT P concerne A') self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) def test_not_relation_7(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any K,V WHERE P is CWProperty, ' 'P pkey K, P value V, NOT P for_user U') self.assertEqual(rqlst.defined_vars['P']._q_invariant, False) self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) def test_exists_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any U WHERE U eid IN (1,2), EXISTS(X owned_by U)') self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_exists_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(U eid IN (1,2), X owned_by U)') self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_exists_3(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(X owned_by U, X bookmarked_by U)') self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_exists_4(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", ' 'Y eid IN(1, 2, 3), EXISTS(X read_permission Y)') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_exists_5(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", ' 'Y eid IN(1, 2, 3), EXISTS(X read_permission Y)') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_not_exists_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any U WHERE NOT EXISTS(X owned_by U, ' 'X bookmarked_by U)') self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_not_exists_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X,Y WHERE X name "CWGroup", ' 'Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)') self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_not_exists_distinct_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'DISTINCT Any X,Y WHERE X name "CWGroup", ' 'Y eid IN(1, 2, 3), NOT EXISTS(X read_permission Y)') self.assertEqual(rqlst.defined_vars['Y']._q_invariant, False) def test_or_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X concerne B OR ' 'C concerne X, B eid 12, C eid 13') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) def test_or_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X created_by U, X concerne B OR ' 'C concerne X, B eid 12, C eid 13') self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) @@ -364,14 +364,14 @@ self.assertEqual(rqlst.defined_vars['X'].stinfo['principal'].r_type, 'created_by') def test_or_3(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any N WHERE A evaluee N or EXISTS(N todo_by U)') self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) self.assertEqual(rqlst.defined_vars['A']._q_invariant, True) self.assertEqual(rqlst.defined_vars['U']._q_invariant, True) def test_or_exists_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: # query generated by security rewriting rqlst = self._prepare(cnx, 'DISTINCT Any A,S WHERE A is Affaire, S nom "chouette", ' 'S is IN(Division, Societe, SubDivision),' @@ -388,7 +388,7 @@ self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) def test_or_exists_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any U WHERE EXISTS(U in_group G, G name "managers") OR ' 'EXISTS(X owned_by U, X bookmarked_by U)') self.assertEqual(rqlst.defined_vars['U']._q_invariant, False) @@ -396,7 +396,7 @@ self.assertEqual(rqlst.defined_vars['X']._q_invariant, True) def test_or_exists_3(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any COUNT(S),CS GROUPBY CS ORDERBY 1 DESC LIMIT 10 ' 'WHERE C is Societe, S concerne C, C nom CS, ' '(EXISTS(S owned_by D)) ' @@ -409,14 +409,14 @@ self.assertEqual(rqlst.defined_vars['S']._q_invariant, True) def test_nonregr_ambiguity(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Note N WHERE N attachment F') # N may be an image as well, not invariant self.assertEqual(rqlst.defined_vars['N']._q_invariant, False) self.assertEqual(rqlst.defined_vars['F']._q_invariant, True) def test_nonregr_ambiguity_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any S,SN WHERE X has_text "tot", ' 'X in_state S, S name SN, X is CWUser') # X use has_text but should not be invariant as ambiguous, and has_text @@ -425,19 +425,19 @@ self.assertEqual(rqlst.defined_vars['S']._q_invariant, False) def test_remove_from_deleted_source_1(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Note X WHERE X eid 999998, NOT X cw_source Y') self.assertNotIn('X', rqlst.defined_vars) # simplified self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_remove_from_deleted_source_2(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Note X WHERE X eid IN (999998, 999999), NOT X cw_source Y') self.assertEqual(rqlst.defined_vars['X']._q_invariant, False) self.assertEqual(rqlst.defined_vars['Y']._q_invariant, True) def test_has_text_security_cache_bug(self): - with self.session.new_cnx() as cnx: + with self._access.cnx() as cnx: rqlst = self._prepare(cnx, 'Any X WHERE X has_text "toto" WITH X BEING ' '(Any C WHERE C is Societe, C nom CS)') self.assertTrue(rqlst.parent.has_text_query) diff -r 3c60180481ac -r 7514626e1dc5 cubicweb/web/test/unittest_views_searchrestriction.py --- a/cubicweb/web/test/unittest_views_searchrestriction.py Fri Mar 10 17:46:06 2017 +0100 +++ b/cubicweb/web/test/unittest_views_searchrestriction.py Fri Mar 10 17:52:44 2017 +0100 @@ -23,8 +23,9 @@ class InsertAttrRelationTC(CubicWebTC): def parse(self, query): - rqlst = self.vreg.parse(self.session, query) - select = rqlst.children[0] + with self.admin_access.cnx() as cnx: + rqlst = self.vreg.parse(cnx, query) + rqlst.children[0] return rqlst def _generate(self, rqlst, rel, role, attr):