# HG changeset patch # User Aurelien Campeas # Date 1401718216 -7200 # Node ID 1485aab7ece6b0e84ef6d89f7a813d6d63df0800 # Parent 17abdb7af3e68016755f67b82a3f2ea67054155a [tests/querier] use the new connection api (part 3/3) Some adaptations to devtools/repotest: * dead code removal * remove session related code In the tests: * many tests actually don't check the querier but some generic rql property -- when such a test needs several statements to complete a commit, switch to session.new_cnx * provide an assertRQLEqual helper diff -r 17abdb7af3e6 -r 1485aab7ece6 devtools/repotest.py --- a/devtools/repotest.py Tue Jun 10 14:25:20 2014 +0200 +++ b/devtools/repotest.py Mon Jun 02 16:10:16 2014 +0200 @@ -1,4 +1,4 @@ -# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -22,10 +22,8 @@ __docformat__ = "restructuredtext en" -from copy import deepcopy from pprint import pprint -from logilab.common.decorators import clear_cache from logilab.common.testlib import SkipTest def tuplify(list): @@ -204,27 +202,22 @@ self.ueid = self.session.user.eid assert self.ueid != -1 self.repo._type_source_cache = {} # clear cache - self.cnxset = self.session.set_cnxset() self.maxeid = self.get_max_eid() do_monkey_patch() self._dumb_sessions = [] def get_max_eid(self): - return self.session.execute('Any MAX(X)')[0][0] + with self.session.new_cnx() as cnx: + return cnx.execute('Any MAX(X)')[0][0] + def cleanup(self): - self.session.set_cnxset() - self.session.execute('DELETE Any X WHERE X eid > %s' % self.maxeid) + with self.session.new_cnx() as cnx: + cnx.execute('DELETE Any X WHERE X eid > %s' % self.maxeid) + cnx.commit() def tearDown(self): undo_monkey_patch() - self.session.rollback() self.cleanup() - self.commit() - # properly close dumb sessions - for session in self._dumb_sessions: - session.rollback() - session.close() - self.repo._free_cnxset(self.cnxset) assert self.session.user.eid != -1 def set_debug(self, debug): @@ -259,21 +252,11 @@ def user_groups_session(self, *groups): """lightweight session using the current user with hi-jacked groups""" # use self.session.user.eid to get correct owned_by relation, unless explicit eid - u = self.repo._build_user(self.session, self.session.user.eid) - u._groups = set(groups) - s = Session(u, self.repo) - s._cnx.cnxset = self.cnxset - s._cnx.ctx_count = 1 - # register session to ensure it gets closed - self._dumb_sessions.append(s) - return s - - def execute(self, rql, args=None, build_descr=True): - return self.o.execute(self.session, rql, args, build_descr) - - def commit(self): - self.session.commit() - self.session.set_cnxset() + with self.session.new_cnx() as cnx: + u = self.repo._build_user(cnx, self.session.user.eid) + u._groups = set(groups) + s = Session(u, self.repo) + return s def qexecute(self, rql, args=None, build_descr=True): with self.session.new_cnx() as cnx: @@ -291,19 +274,13 @@ # XXX source_defs self.o = self.repo.querier self.session = self.repo._sessions.values()[0] - self.cnxset = self.session.set_cnxset() self.schema = self.o.schema self.system = self.repo.system_source do_monkey_patch() - self._dumb_sessions = [] # by hi-jacked parent setup self.repo.vreg.rqlhelper.backend = 'postgres' # so FTIRANK is considered def tearDown(self): undo_monkey_patch() - for session in self._dumb_sessions: - if session._cnx.cnxset is not None: - session._cnx.cnxset = None - session.close() def _prepare_plan(self, rql, kwargs=None): rqlst = self.o.parse(rql, annotate=True) @@ -363,24 +340,6 @@ _sort=lambda rels: sorted(rels, key=sort_key)) -def _merge_input_maps(*args, **kwargs): - return sorted(_orig_merge_input_maps(*args, **kwargs)) - -def _choose_term(self, source, sourceterms): - # predictable order for test purpose - def get_key(x): - try: - # variable - return x.name - except AttributeError: - try: - # relation - return x.r_type - except AttributeError: - # const - return x.value - return _orig_choose_term(self, source, DumbOrderedDict2(sourceterms, get_key)) - def _ordered_iter_relations(stinfo): return sorted(_orig_iter_relations(stinfo), key=lambda x:x.r_type) diff -r 17abdb7af3e6 -r 1485aab7ece6 server/test/unittest_querier.py --- a/server/test/unittest_querier.py Tue Jun 10 14:25:20 2014 +0200 +++ b/server/test/unittest_querier.py Mon Jun 02 16:10:16 2014 +0200 @@ -1,5 +1,5 @@ # -*- coding: iso-8859-1 -*- -# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -110,9 +110,12 @@ pass def test_preprocess_1(self): - reid = self.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] - rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid}) - self.assertEqual(rqlst.solutions, [{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}]) + with self.session.new_cnx() as cnx: + reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0] + rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', + {'x': reid}) + self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}], + rqlst.solutions) def test_preprocess_2(self): teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0] @@ -122,7 +125,12 @@ rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid}) # the query may be optimized, should keep only one solution # (any one, etype will be discarded) - self.assertEqual(len(rqlst.solutions), 1) + self.assertEqual(1, len(rqlst.solutions)) + + def assertRQLEqual(self, expected, got): + from rql import parse + self.assertMultiLineEqual(unicode(parse(expected)), + unicode(parse(got))) def test_preprocess_security(self): plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN ' @@ -140,13 +148,17 @@ ['ETN']) partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children)) rql, solutions = partrqls[0] - self.assertEqual(rql, - 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' - ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, X identity D, C is Division, D is Affaire))' - ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, X identity H, H is Affaire)))' - ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, X identity I, I is Affaire)))' - ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, X identity J, J is Affaire)))' - ', ET is CWEType, X is Affaire') + self.assertRQLEqual(rql, + 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))' + ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, ' + ' X identity D, C is Division, D is Affaire))' + ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, ' + ' X identity H, H is Affaire)))' + ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, ' + ' X identity I, I is Affaire)))' + ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, ' + ' X identity J, J is Affaire)))' + ', ET is CWEType, X is Affaire') self.assertEqual(solutions, [{'C': 'Division', 'D': 'Affaire', 'E': 'Note', @@ -158,7 +170,13 @@ 'X': 'Affaire', 'ET': 'CWEType', 'ETN': 'String'}]) rql, solutions = partrqls[1] - self.assertEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') + self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, ' + 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, ' + ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, ' + ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, ' + ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, ' + ' Old, Personne, RQLExpression, Societe, State, SubDivision, ' + ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)') self.assertListEqual(sorted(solutions), sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'}, {'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'}, @@ -254,14 +272,16 @@ self.assertEqual(rset.description[0][0], 'String') def test_build_descr1(self): - rset = self.execute('(Any U,L WHERE U login L) UNION (Any G,N WHERE G name N, G is CWGroup)') - rset.req = self.session - orig_length = len(rset) - rset.rows[0][0] = 9999999 - description = manual_build_descr(rset.req, rset.syntax_tree(), None, rset.rows) - self.assertEqual(len(description), orig_length - 1) - self.assertEqual(len(rset.rows), orig_length - 1) - self.assertNotEqual(rset.rows[0][0], 9999999) + with self.session.new_cnx() as cnx: + rset = cnx.execute('(Any U,L WHERE U login L) UNION ' + '(Any G,N WHERE G name N, G is CWGroup)') + # rset.req = self.session + orig_length = len(rset) + rset.rows[0][0] = 9999999 + description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows) + self.assertEqual(len(description), orig_length - 1) + self.assertEqual(len(rset.rows), orig_length - 1) + self.assertNotEqual(rset.rows[0][0], 9999999) def test_build_descr2(self): rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION ' @@ -655,16 +675,15 @@ ## def test_select_simplified(self): ## ueid = self.session.user.eid -## rset = self.execute('Any L WHERE %s login L'%ueid) +## rset = self.qexecute('Any L WHERE %s login L'%ueid) ## self.assertEqual(rset.rows[0][0], 'admin') -## rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid}) +## rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid}) ## self.assertEqual(rset.rows[0][0], 'admin') def test_select_searchable_text_1(self): rset = self.qexecute(u"INSERT Personne X: X nom 'bidüle'") rset = self.qexecute(u"INSERT Societe X: X nom 'bidüle'") rset = self.qexecute("INSERT Societe X: X nom 'chouette'") - self.commit() rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidüle'}) self.assertEqual(len(rset.rows), 2, rset.rows) rset = self.qexecute(u'Any N where N has_text "bidüle"') @@ -679,7 +698,6 @@ rset = self.qexecute("INSERT Personne X: X nom 'bidule'") rset = self.qexecute("INSERT Personne X: X nom 'chouette'") rset = self.qexecute("INSERT Societe X: X nom 'bidule'") - self.commit() rset = self.qexecute('Personne N where N has_text "bidule"') self.assertEqual(len(rset.rows), 1, rset.rows) @@ -687,7 +705,6 @@ rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'") rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'") rset = self.qexecute("INSERT Societe X: X nom 'bidule'") - self.commit() rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"') self.assertEqual(len(rset.rows), 1, rset.rows) @@ -695,7 +712,6 @@ self.qexecute(u"INSERT Personne X: X nom 'bidüle'") self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X") self.qexecute(u"INSERT Personne X: X nom 'bidüle'") - self.commit() rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s', {'text': u'bidüle', 'text2': u'chouette',} @@ -736,10 +752,10 @@ self.assertEqual(len(rset.rows), 2, rset.rows) def test_select_inline(self): - self.execute("INSERT Personne X: X nom 'bidule'") - self.execute("INSERT Note X: X type 'a'") - self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") - rset = self.execute('Any N where N ecrit_par X, X nom "bidule"') + self.qexecute("INSERT Personne X: X nom 'bidule'") + self.qexecute("INSERT Note X: X type 'a'") + self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'") + rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"') self.assertEqual(len(rset.rows), 1, rset.rows) def test_select_creation_date(self): @@ -753,10 +769,12 @@ self.qexecute("INSERT Societe X: X nom 'logilab'") self.qexecute("INSERT Societe X: X nom 'caesium'") self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'") - rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') + rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, ' + 'S1 nom "logilab", S2 nom "caesium"') self.assertEqual(len(rset.rows), 1) self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'") - rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"') + rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, ' + 'S1 nom "logilab", S2 nom "caesium"') self.assertEqual(len(rset.rows), 2) def test_select_or_sym_relation(self): @@ -808,7 +826,8 @@ self.assertEqual(len(rset.rows), 2) def test_select_explicit_eid(self): - rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid}) + rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', + {'u': self.session.user.eid}) self.assertTrue(rset) self.assertEqual(rset.description[0][1], 'Int') @@ -852,12 +871,12 @@ 'Password', 'String', 'TZDatetime', 'TZTime', 'Time']) - req = self.session - req.create_entity('Personne', nom=u'louis', test=True) - self.assertEqual(len(req.execute('Any X WHERE X test %(val)s', {'val': True})), 1) - self.assertEqual(len(req.execute('Any X WHERE X test TRUE')), 1) - self.assertEqual(len(req.execute('Any X WHERE X test %(val)s', {'val': False})), 0) - self.assertEqual(len(req.execute('Any X WHERE X test FALSE')), 0) + with self.session.new_cnx() as cnx: + cnx.create_entity('Personne', nom=u'louis', test=True) + self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': True})), 1) + self.assertEqual(len(cnx.execute('Any X WHERE X test TRUE')), 1) + self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': False})), 0) + self.assertEqual(len(cnx.execute('Any X WHERE X test FALSE')), 0) def test_select_constant(self): rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup') @@ -897,15 +916,18 @@ '(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)') def test_select_union_aggregat_independant_group(self): - self.execute('INSERT State X: X name "hop"') - self.execute('INSERT State X: X name "hop"') - self.execute('INSERT Transition X: X name "hop"') - self.execute('INSERT Transition X: X name "hop"') - rset = self.execute('Any N,NX ORDERBY 2 WITH N,NX BEING ' - '((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)' - ' UNION ' - '(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))') - self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]]) + with self.session.new_cnx() as cnx: + cnx.execute('INSERT State X: X name "hop"') + cnx.execute('INSERT State X: X name "hop"') + cnx.execute('INSERT Transition X: X name "hop"') + cnx.execute('INSERT Transition X: X name "hop"') + rset = cnx.execute('Any N,NX ORDERBY 2 WITH N,NX BEING ' + '((Any N,COUNT(X) GROUPBY N WHERE X name N, ' + ' X is State HAVING COUNT(X)>1)' + ' UNION ' + '(Any N,COUNT(X) GROUPBY N WHERE X name N, ' + ' X is Transition HAVING COUNT(X)>1))') + self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]]) def test_select_union_selection_with_diff_variables(self): rset = self.qexecute('(Any N WHERE X name N, X is State)' @@ -1144,10 +1166,13 @@ def test_delete_3(self): s = self.user_groups_session('users') - peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0] - seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0] - self.o.execute(s, "SET P travaille S") - rset = self.execute('Personne P WHERE P travaille S') + with s.new_cnx() as cnx: + with cnx.ensure_cnx_set: + peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0] + seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0] + self.o.execute(cnx, "SET P travaille S") + cnx.commit() + rset = self.qexecute('Personne P WHERE P travaille S') self.assertEqual(len(rset.rows), 1) self.qexecute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid)) rset = self.qexecute('Personne P WHERE P travaille S') @@ -1174,27 +1199,28 @@ (using cachekey on sql generation returned always the same query for an eid, whatever the relation) """ - aeid, = self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] + aeid, = self.qexecute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0] # XXX would be nice if the rql below was enough... #'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y' - eeid, = self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0] - self.execute("DELETE Email X") - sqlc = self.session.cnxset.cu - sqlc.execute('SELECT * FROM recipients_relation') - self.assertEqual(len(sqlc.fetchall()), 0) - sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid) - self.assertEqual(len(sqlc.fetchall()), 0) + eeid, = self.qexecute('INSERT Email X: X messageid "<1234>", X subject "test", ' + 'X sender Y, X recipients Y WHERE Y is EmailAddress')[0] + self.qexecute("DELETE Email X") + with self.session.new_cnx() as cnx: + with cnx.ensure_cnx_set: + sqlc = cnx.cnxset.cu + sqlc.execute('SELECT * FROM recipients_relation') + self.assertEqual(len(sqlc.fetchall()), 0) + sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid) + self.assertEqual(len(sqlc.fetchall()), 0) def test_nonregr_delete_cache2(self): eid = self.qexecute("INSERT Folder T: T name 'toto'")[0][0] - self.commit() # fill the cache self.qexecute("Any X WHERE X eid %(x)s", {'x': eid}) self.qexecute("Any X WHERE X eid %s" % eid) self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid}) self.qexecute("Folder X WHERE X eid %s" % eid) self.qexecute("DELETE Folder T WHERE T eid %s" % eid) - self.commit() rset = self.qexecute("Any X WHERE X eid %(x)s", {'x': eid}) self.assertEqual(rset.rows, []) rset = self.qexecute("Any X WHERE X eid %s" % eid) @@ -1250,15 +1276,16 @@ self.assertEqual(self.qexecute('Any X WHERE X nom "tutu"').rows, [[peid2]]) def test_update_multiple2(self): - ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] - peid1 = self.execute("INSERT Personne Y: Y nom 'turlu'")[0][0] - peid2 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] - self.execute('SET P1 owned_by U, P2 owned_by U ' - 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid)) - self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' - % (peid1, ueid))) - self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' - % (peid2, ueid))) + with self.session.new_cnx() as cnx: + ueid = cnx.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] + peid1 = cnx.execute("INSERT Personne Y: Y nom 'turlu'")[0][0] + peid2 = cnx.execute("INSERT Personne Y: Y nom 'tutu'")[0][0] + cnx.execute('SET P1 owned_by U, P2 owned_by U ' + 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid)) + self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' + % (peid1, ueid))) + self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s' + % (peid2, ueid))) def test_update_math_expr(self): orders = [r[0] for r in self.qexecute('Any O ORDERBY O WHERE ST name "Personne", ' @@ -1324,35 +1351,41 @@ # upassword encryption tests ################################################# def test_insert_upassword(self): - rset = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'") + rset = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', " + "X in_group G WHERE G name 'users'") self.assertEqual(len(rset.rows), 1) self.assertEqual(rset.description, [('CWUser',)]) self.assertRaises(Unauthorized, - self.execute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") - cursor = self.cnxset.cu - cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" - % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) - passwd = str(cursor.fetchone()[0]) - self.assertEqual(passwd, crypt_password('toto', passwd)) - rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", + self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P") + with self.session.new_cnx() as cnx: + with cnx.ensure_cnx_set: + cursor = cnx.cnxset.cu + cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" + % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) + passwd = str(cursor.fetchone()[0]) + self.assertEqual(passwd, crypt_password('toto', passwd)) + rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", {'pwd': Binary(passwd)}) self.assertEqual(len(rset.rows), 1) self.assertEqual(rset.description, [('CWUser',)]) def test_update_upassword(self): - rset = self.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'}) - self.assertEqual(rset.description[0][0], 'CWUser') - rset = self.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'", - {'pwd': 'tutu'}) - cursor = self.cnxset.cu - cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" - % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) - passwd = str(cursor.fetchone()[0]) - self.assertEqual(passwd, crypt_password('tutu', passwd)) - rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", - {'pwd': Binary(passwd)}) - self.assertEqual(len(rset.rows), 1) - self.assertEqual(rset.description, [('CWUser',)]) + with self.session.new_cnx() as cnx: + with cnx.ensure_cnx_set: + rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", + {'pwd': 'toto'}) + self.assertEqual(rset.description[0][0], 'CWUser') + rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'", + {'pwd': 'tutu'}) + cursor = cnx.cnxset.cu + cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'" + % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX)) + passwd = str(cursor.fetchone()[0]) + self.assertEqual(passwd, crypt_password('tutu', passwd)) + rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s", + {'pwd': Binary(passwd)}) + self.assertEqual(len(rset.rows), 1) + self.assertEqual(rset.description, [('CWUser',)]) # ZT datetime tests ######################################################## @@ -1473,7 +1506,6 @@ def test_nonregr_has_text_cache(self): eid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0] eid2 = self.qexecute("INSERT Personne X: X nom 'tag'")[0][0] - self.commit() rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'bidule'}) self.assertEqual(rset.rows, [[eid1]]) rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'tag'}) @@ -1518,12 +1550,6 @@ # huum, psycopg specific self.qexecute('SET X creation_date %(date)s WHERE X eid 1', {'date': date.today()}) - def test_nonregr_set_query(self): - ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0] - self.execute("SET E in_group G, E firstname %(firstname)s, E surname %(surname)s " - "WHERE E eid %(x)s, G name 'users'", - {'x':ueid, 'firstname': u'jean', 'surname': u'paul'}) - def test_nonregr_u_owned_by_u(self): ueid = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G " "WHERE G name 'users'")[0][0] @@ -1569,7 +1595,6 @@ peid = self.qexecute("INSERT CWUser X: X login 'bidule', X upassword 'bidule', " "X in_group G WHERE G name 'users'")[0][0] aeid = self.qexecute("INSERT Affaire X: X ref 'bidule'")[0][0] - self.commit() rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule"') self.assertEqual(rset.rows, [[peid]]) rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule", ' @@ -1588,39 +1613,39 @@ class NonRegressionTC(CubicWebTC): def test_has_text_security_cache_bug(self): - req = self.request() - self.create_user(req, 'user', ('users',)) - aff1 = req.create_entity('Societe', nom=u'aff1') - aff2 = req.create_entity('Societe', nom=u'aff2') - self.commit() - with self.login('user', password='user'): - res = self.execute('Any X WHERE X has_text %(text)s', {'text': 'aff1'}) + with self.admin_access.repo_cnx() as cnx: + self.create_user(cnx, 'user', ('users',)) + aff1 = cnx.create_entity('Societe', nom=u'aff1') + aff2 = cnx.create_entity('Societe', nom=u'aff2') + cnx.commit() + with self.new_access('user').repo_cnx() as cnx: + res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': 'aff1'}) self.assertEqual(res.rows, [[aff1.eid]]) - res = self.execute('Any X WHERE X has_text %(text)s', {'text': 'aff2'}) + res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': 'aff2'}) self.assertEqual(res.rows, [[aff2.eid]]) def test_set_relations_eid(self): - req = self.request() - # create 3 email addresses - a1 = req.create_entity('EmailAddress', address=u'a1') - a2 = req.create_entity('EmailAddress', address=u'a2') - a3 = req.create_entity('EmailAddress', address=u'a3') - # SET relations using '>=' operator on eids - req.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid) - self.assertEqual( - [[a2.eid], [a3.eid]], - req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) - # DELETE - req.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid) - self.assertEqual( - [[a2.eid]], - req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) - req.execute('DELETE U use_email A WHERE U login "admin"') - # SET relations using '<' operator on eids - req.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid) - self.assertEqual( - [[a1.eid]], - req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) + with self.admin_access.repo_cnx() as cnx: + # create 3 email addresses + a1 = cnx.create_entity('EmailAddress', address=u'a1') + a2 = cnx.create_entity('EmailAddress', address=u'a2') + a3 = cnx.create_entity('EmailAddress', address=u'a3') + # SET relations using '>=' operator on eids + cnx.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid) + self.assertEqual( + [[a2.eid], [a3.eid]], + cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) + # DELETE + cnx.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid) + self.assertEqual( + [[a2.eid]], + cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) + cnx.execute('DELETE U use_email A WHERE U login "admin"') + # SET relations using '<' operator on eids + cnx.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid) + self.assertEqual( + [[a1.eid]], + cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows) if __name__ == '__main__': unittest_main()