--- a/server/test/unittest_querier.py Tue Jun 10 14:25:20 2014 +0200
+++ b/server/test/unittest_querier.py Mon Jun 02 16:10:16 2014 +0200
@@ -1,5 +1,5 @@
# -*- coding: iso-8859-1 -*-
-# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -110,9 +110,12 @@
pass
def test_preprocess_1(self):
- reid = self.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
- rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s', {'x': reid})
- self.assertEqual(rqlst.solutions, [{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}])
+ with self.session.new_cnx() as cnx:
+ reid = cnx.execute('Any X WHERE X is CWRType, X name "owned_by"')[0][0]
+ rqlst = self._prepare('Any COUNT(RDEF) WHERE RDEF relation_type X, X eid %(x)s',
+ {'x': reid})
+ self.assertEqual([{'RDEF': 'CWAttribute'}, {'RDEF': 'CWRelation'}],
+ rqlst.solutions)
def test_preprocess_2(self):
teid = self.qexecute("INSERT Tag X: X name 'tag'")[0][0]
@@ -122,7 +125,12 @@
rqlst = self._prepare('Any X WHERE E eid %(x)s, E tags X', {'x': teid})
# the query may be optimized, should keep only one solution
# (any one, etype will be discarded)
- self.assertEqual(len(rqlst.solutions), 1)
+ self.assertEqual(1, len(rqlst.solutions))
+
+ def assertRQLEqual(self, expected, got):
+ from rql import parse
+ self.assertMultiLineEqual(unicode(parse(expected)),
+ unicode(parse(got)))
def test_preprocess_security(self):
plan = self._prepare_plan('Any ETN,COUNT(X) GROUPBY ETN '
@@ -140,13 +148,17 @@
['ETN'])
partrqls = sorted(((rqlst.as_string(), rqlst.solutions) for rqlst in subq.children))
rql, solutions = partrqls[0]
- self.assertEqual(rql,
- 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
- ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, X identity D, C is Division, D is Affaire))'
- ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, X identity H, H is Affaire)))'
- ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, X identity I, I is Affaire)))'
- ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, X identity J, J is Affaire)))'
- ', ET is CWEType, X is Affaire')
+ self.assertRQLEqual(rql,
+ 'Any ETN,X WHERE X is ET, ET name ETN, (EXISTS(X owned_by %(B)s))'
+ ' OR ((((EXISTS(D concerne C?, C owned_by %(B)s, '
+ ' X identity D, C is Division, D is Affaire))'
+ ' OR (EXISTS(H concerne G?, G owned_by %(B)s, G is SubDivision, '
+ ' X identity H, H is Affaire)))'
+ ' OR (EXISTS(I concerne F?, F owned_by %(B)s, F is Societe, '
+ ' X identity I, I is Affaire)))'
+ ' OR (EXISTS(J concerne E?, E owned_by %(B)s, E is Note, '
+ ' X identity J, J is Affaire)))'
+ ', ET is CWEType, X is Affaire')
self.assertEqual(solutions, [{'C': 'Division',
'D': 'Affaire',
'E': 'Note',
@@ -158,7 +170,13 @@
'X': 'Affaire',
'ET': 'CWEType', 'ETN': 'String'}])
rql, solutions = partrqls[1]
- self.assertEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, Old, Personne, RQLExpression, Societe, State, SubDivision, SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
+ self.assertRQLEqual(rql, 'Any ETN,X WHERE X is ET, ET name ETN, ET is CWEType, '
+ 'X is IN(BaseTransition, Bookmark, CWAttribute, CWCache, CWConstraint, '
+ ' CWConstraintType, CWEType, CWGroup, CWPermission, CWProperty, CWRType, '
+ ' CWRelation, CWSource, CWUniqueTogetherConstraint, CWUser, Card, Comment, '
+ ' Division, Email, EmailPart, EmailThread, ExternalUri, File, Folder, Note, '
+ ' Old, Personne, RQLExpression, Societe, State, SubDivision, '
+ ' SubWorkflowExitPoint, Tag, TrInfo, Transition, Workflow, WorkflowTransition)')
self.assertListEqual(sorted(solutions),
sorted([{'X': 'BaseTransition', 'ETN': 'String', 'ET': 'CWEType'},
{'X': 'Bookmark', 'ETN': 'String', 'ET': 'CWEType'},
@@ -254,14 +272,16 @@
self.assertEqual(rset.description[0][0], 'String')
def test_build_descr1(self):
- rset = self.execute('(Any U,L WHERE U login L) UNION (Any G,N WHERE G name N, G is CWGroup)')
- rset.req = self.session
- orig_length = len(rset)
- rset.rows[0][0] = 9999999
- description = manual_build_descr(rset.req, rset.syntax_tree(), None, rset.rows)
- self.assertEqual(len(description), orig_length - 1)
- self.assertEqual(len(rset.rows), orig_length - 1)
- self.assertNotEqual(rset.rows[0][0], 9999999)
+ with self.session.new_cnx() as cnx:
+ rset = cnx.execute('(Any U,L WHERE U login L) UNION '
+ '(Any G,N WHERE G name N, G is CWGroup)')
+ # rset.req = self.session
+ orig_length = len(rset)
+ rset.rows[0][0] = 9999999
+ description = manual_build_descr(cnx, rset.syntax_tree(), None, rset.rows)
+ self.assertEqual(len(description), orig_length - 1)
+ self.assertEqual(len(rset.rows), orig_length - 1)
+ self.assertNotEqual(rset.rows[0][0], 9999999)
def test_build_descr2(self):
rset = self.qexecute('Any X,Y WITH X,Y BEING ((Any G,NULL WHERE G is CWGroup) UNION '
@@ -655,16 +675,15 @@
## def test_select_simplified(self):
## ueid = self.session.user.eid
-## rset = self.execute('Any L WHERE %s login L'%ueid)
+## rset = self.qexecute('Any L WHERE %s login L'%ueid)
## self.assertEqual(rset.rows[0][0], 'admin')
-## rset = self.execute('Any L WHERE %(x)s login L', {'x':ueid})
+## rset = self.qexecute('Any L WHERE %(x)s login L', {'x':ueid})
## self.assertEqual(rset.rows[0][0], 'admin')
def test_select_searchable_text_1(self):
rset = self.qexecute(u"INSERT Personne X: X nom 'bidüle'")
rset = self.qexecute(u"INSERT Societe X: X nom 'bidüle'")
rset = self.qexecute("INSERT Societe X: X nom 'chouette'")
- self.commit()
rset = self.qexecute('Any X where X has_text %(text)s', {'text': u'bidüle'})
self.assertEqual(len(rset.rows), 2, rset.rows)
rset = self.qexecute(u'Any N where N has_text "bidüle"')
@@ -679,7 +698,6 @@
rset = self.qexecute("INSERT Personne X: X nom 'bidule'")
rset = self.qexecute("INSERT Personne X: X nom 'chouette'")
rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
- self.commit()
rset = self.qexecute('Personne N where N has_text "bidule"')
self.assertEqual(len(rset.rows), 1, rset.rows)
@@ -687,7 +705,6 @@
rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'M'")
rset = self.qexecute("INSERT Personne X: X nom 'bidule', X sexe 'F'")
rset = self.qexecute("INSERT Societe X: X nom 'bidule'")
- self.commit()
rset = self.qexecute('Any X where X has_text "bidule" and X sexe "M"')
self.assertEqual(len(rset.rows), 1, rset.rows)
@@ -695,7 +712,6 @@
self.qexecute(u"INSERT Personne X: X nom 'bidüle'")
self.qexecute("INSERT Societe X: X nom 'chouette', S travaille X")
self.qexecute(u"INSERT Personne X: X nom 'bidüle'")
- self.commit()
rset = self.qexecute('Personne X WHERE X has_text %(text)s, X travaille S, S has_text %(text2)s',
{'text': u'bidüle',
'text2': u'chouette',}
@@ -736,10 +752,10 @@
self.assertEqual(len(rset.rows), 2, rset.rows)
def test_select_inline(self):
- self.execute("INSERT Personne X: X nom 'bidule'")
- self.execute("INSERT Note X: X type 'a'")
- self.execute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
- rset = self.execute('Any N where N ecrit_par X, X nom "bidule"')
+ self.qexecute("INSERT Personne X: X nom 'bidule'")
+ self.qexecute("INSERT Note X: X type 'a'")
+ self.qexecute("SET X ecrit_par Y WHERE X type 'a', Y nom 'bidule'")
+ rset = self.qexecute('Any N where N ecrit_par X, X nom "bidule"')
self.assertEqual(len(rset.rows), 1, rset.rows)
def test_select_creation_date(self):
@@ -753,10 +769,12 @@
self.qexecute("INSERT Societe X: X nom 'logilab'")
self.qexecute("INSERT Societe X: X nom 'caesium'")
self.qexecute("SET P travaille S WHERE P nom 'bidule', S nom 'logilab'")
- rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"')
+ rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
+ 'S1 nom "logilab", S2 nom "caesium"')
self.assertEqual(len(rset.rows), 1)
self.qexecute("SET P travaille S WHERE P nom 'chouette', S nom 'caesium'")
- rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, S1 nom "logilab", S2 nom "caesium"')
+ rset = self.qexecute('DISTINCT Any P WHERE P travaille S1 OR P travaille S2, '
+ 'S1 nom "logilab", S2 nom "caesium"')
self.assertEqual(len(rset.rows), 2)
def test_select_or_sym_relation(self):
@@ -808,7 +826,8 @@
self.assertEqual(len(rset.rows), 2)
def test_select_explicit_eid(self):
- rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s', {'u': self.session.user.eid})
+ rset = self.qexecute('Any X,E WHERE X owned_by U, X eid E, U eid %(u)s',
+ {'u': self.session.user.eid})
self.assertTrue(rset)
self.assertEqual(rset.description[0][1], 'Int')
@@ -852,12 +871,12 @@
'Password', 'String',
'TZDatetime', 'TZTime',
'Time'])
- req = self.session
- req.create_entity('Personne', nom=u'louis', test=True)
- self.assertEqual(len(req.execute('Any X WHERE X test %(val)s', {'val': True})), 1)
- self.assertEqual(len(req.execute('Any X WHERE X test TRUE')), 1)
- self.assertEqual(len(req.execute('Any X WHERE X test %(val)s', {'val': False})), 0)
- self.assertEqual(len(req.execute('Any X WHERE X test FALSE')), 0)
+ with self.session.new_cnx() as cnx:
+ cnx.create_entity('Personne', nom=u'louis', test=True)
+ self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': True})), 1)
+ self.assertEqual(len(cnx.execute('Any X WHERE X test TRUE')), 1)
+ self.assertEqual(len(cnx.execute('Any X WHERE X test %(val)s', {'val': False})), 0)
+ self.assertEqual(len(cnx.execute('Any X WHERE X test FALSE')), 0)
def test_select_constant(self):
rset = self.qexecute('Any X, "toto" ORDERBY X WHERE X is CWGroup')
@@ -897,15 +916,18 @@
'(Any N,COUNT(X) GROUPBY N ORDERBY 2 WHERE X login N)')
def test_select_union_aggregat_independant_group(self):
- self.execute('INSERT State X: X name "hop"')
- self.execute('INSERT State X: X name "hop"')
- self.execute('INSERT Transition X: X name "hop"')
- self.execute('INSERT Transition X: X name "hop"')
- rset = self.execute('Any N,NX ORDERBY 2 WITH N,NX BEING '
- '((Any N,COUNT(X) GROUPBY N WHERE X name N, X is State HAVING COUNT(X)>1)'
- ' UNION '
- '(Any N,COUNT(X) GROUPBY N WHERE X name N, X is Transition HAVING COUNT(X)>1))')
- self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]])
+ with self.session.new_cnx() as cnx:
+ cnx.execute('INSERT State X: X name "hop"')
+ cnx.execute('INSERT State X: X name "hop"')
+ cnx.execute('INSERT Transition X: X name "hop"')
+ cnx.execute('INSERT Transition X: X name "hop"')
+ rset = cnx.execute('Any N,NX ORDERBY 2 WITH N,NX BEING '
+ '((Any N,COUNT(X) GROUPBY N WHERE X name N, '
+ ' X is State HAVING COUNT(X)>1)'
+ ' UNION '
+ '(Any N,COUNT(X) GROUPBY N WHERE X name N, '
+ ' X is Transition HAVING COUNT(X)>1))')
+ self.assertEqual(rset.rows, [[u'hop', 2], [u'hop', 2]])
def test_select_union_selection_with_diff_variables(self):
rset = self.qexecute('(Any N WHERE X name N, X is State)'
@@ -1144,10 +1166,13 @@
def test_delete_3(self):
s = self.user_groups_session('users')
- peid, = self.o.execute(s, "INSERT Personne P: P nom 'toto'")[0]
- seid, = self.o.execute(s, "INSERT Societe S: S nom 'logilab'")[0]
- self.o.execute(s, "SET P travaille S")
- rset = self.execute('Personne P WHERE P travaille S')
+ with s.new_cnx() as cnx:
+ with cnx.ensure_cnx_set:
+ peid, = self.o.execute(cnx, "INSERT Personne P: P nom 'toto'")[0]
+ seid, = self.o.execute(cnx, "INSERT Societe S: S nom 'logilab'")[0]
+ self.o.execute(cnx, "SET P travaille S")
+ cnx.commit()
+ rset = self.qexecute('Personne P WHERE P travaille S')
self.assertEqual(len(rset.rows), 1)
self.qexecute("DELETE X travaille Y WHERE X eid %s, Y eid %s" % (peid, seid))
rset = self.qexecute('Personne P WHERE P travaille S')
@@ -1174,27 +1199,28 @@
(using cachekey on sql generation returned always the same query for an eid,
whatever the relation)
"""
- aeid, = self.execute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0]
+ aeid, = self.qexecute('INSERT EmailAddress X: X address "toto@logilab.fr", X alias "hop"')[0]
# XXX would be nice if the rql below was enough...
#'INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y'
- eeid, = self.execute('INSERT Email X: X messageid "<1234>", X subject "test", X sender Y, X recipients Y WHERE Y is EmailAddress')[0]
- self.execute("DELETE Email X")
- sqlc = self.session.cnxset.cu
- sqlc.execute('SELECT * FROM recipients_relation')
- self.assertEqual(len(sqlc.fetchall()), 0)
- sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
- self.assertEqual(len(sqlc.fetchall()), 0)
+ eeid, = self.qexecute('INSERT Email X: X messageid "<1234>", X subject "test", '
+ 'X sender Y, X recipients Y WHERE Y is EmailAddress')[0]
+ self.qexecute("DELETE Email X")
+ with self.session.new_cnx() as cnx:
+ with cnx.ensure_cnx_set:
+ sqlc = cnx.cnxset.cu
+ sqlc.execute('SELECT * FROM recipients_relation')
+ self.assertEqual(len(sqlc.fetchall()), 0)
+ sqlc.execute('SELECT * FROM owned_by_relation WHERE eid_from=%s'%eeid)
+ self.assertEqual(len(sqlc.fetchall()), 0)
def test_nonregr_delete_cache2(self):
eid = self.qexecute("INSERT Folder T: T name 'toto'")[0][0]
- self.commit()
# fill the cache
self.qexecute("Any X WHERE X eid %(x)s", {'x': eid})
self.qexecute("Any X WHERE X eid %s" % eid)
self.qexecute("Folder X WHERE X eid %(x)s", {'x': eid})
self.qexecute("Folder X WHERE X eid %s" % eid)
self.qexecute("DELETE Folder T WHERE T eid %s" % eid)
- self.commit()
rset = self.qexecute("Any X WHERE X eid %(x)s", {'x': eid})
self.assertEqual(rset.rows, [])
rset = self.qexecute("Any X WHERE X eid %s" % eid)
@@ -1250,15 +1276,16 @@
self.assertEqual(self.qexecute('Any X WHERE X nom "tutu"').rows, [[peid2]])
def test_update_multiple2(self):
- ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0]
- peid1 = self.execute("INSERT Personne Y: Y nom 'turlu'")[0][0]
- peid2 = self.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
- self.execute('SET P1 owned_by U, P2 owned_by U '
- 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid))
- self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
- % (peid1, ueid)))
- self.assertTrue(self.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
- % (peid2, ueid)))
+ with self.session.new_cnx() as cnx:
+ ueid = cnx.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0]
+ peid1 = cnx.execute("INSERT Personne Y: Y nom 'turlu'")[0][0]
+ peid2 = cnx.execute("INSERT Personne Y: Y nom 'tutu'")[0][0]
+ cnx.execute('SET P1 owned_by U, P2 owned_by U '
+ 'WHERE P1 eid %s, P2 eid %s, U eid %s' % (peid1, peid2, ueid))
+ self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
+ % (peid1, ueid)))
+ self.assertTrue(cnx.execute('Any X WHERE X eid %s, X owned_by U, U eid %s'
+ % (peid2, ueid)))
def test_update_math_expr(self):
orders = [r[0] for r in self.qexecute('Any O ORDERBY O WHERE ST name "Personne", '
@@ -1324,35 +1351,41 @@
# upassword encryption tests #################################################
def test_insert_upassword(self):
- rset = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")
+ rset = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', "
+ "X in_group G WHERE G name 'users'")
self.assertEqual(len(rset.rows), 1)
self.assertEqual(rset.description, [('CWUser',)])
self.assertRaises(Unauthorized,
- self.execute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
- cursor = self.cnxset.cu
- cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
- % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
- passwd = str(cursor.fetchone()[0])
- self.assertEqual(passwd, crypt_password('toto', passwd))
- rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
+ self.qexecute, "Any P WHERE X is CWUser, X login 'bob', X upassword P")
+ with self.session.new_cnx() as cnx:
+ with cnx.ensure_cnx_set:
+ cursor = cnx.cnxset.cu
+ cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
+ % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
+ passwd = str(cursor.fetchone()[0])
+ self.assertEqual(passwd, crypt_password('toto', passwd))
+ rset = self.qexecute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
{'pwd': Binary(passwd)})
self.assertEqual(len(rset.rows), 1)
self.assertEqual(rset.description, [('CWUser',)])
def test_update_upassword(self):
- rset = self.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s", {'pwd': 'toto'})
- self.assertEqual(rset.description[0][0], 'CWUser')
- rset = self.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
- {'pwd': 'tutu'})
- cursor = self.cnxset.cu
- cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
- % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
- passwd = str(cursor.fetchone()[0])
- self.assertEqual(passwd, crypt_password('tutu', passwd))
- rset = self.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
- {'pwd': Binary(passwd)})
- self.assertEqual(len(rset.rows), 1)
- self.assertEqual(rset.description, [('CWUser',)])
+ with self.session.new_cnx() as cnx:
+ with cnx.ensure_cnx_set:
+ rset = cnx.execute("INSERT CWUser X: X login 'bob', X upassword %(pwd)s",
+ {'pwd': 'toto'})
+ self.assertEqual(rset.description[0][0], 'CWUser')
+ rset = cnx.execute("SET X upassword %(pwd)s WHERE X is CWUser, X login 'bob'",
+ {'pwd': 'tutu'})
+ cursor = cnx.cnxset.cu
+ cursor.execute("SELECT %supassword from %sCWUser WHERE %slogin='bob'"
+ % (SQL_PREFIX, SQL_PREFIX, SQL_PREFIX))
+ passwd = str(cursor.fetchone()[0])
+ self.assertEqual(passwd, crypt_password('tutu', passwd))
+ rset = cnx.execute("Any X WHERE X is CWUser, X login 'bob', X upassword %(pwd)s",
+ {'pwd': Binary(passwd)})
+ self.assertEqual(len(rset.rows), 1)
+ self.assertEqual(rset.description, [('CWUser',)])
# ZT datetime tests ########################################################
@@ -1473,7 +1506,6 @@
def test_nonregr_has_text_cache(self):
eid1 = self.qexecute("INSERT Personne X: X nom 'bidule'")[0][0]
eid2 = self.qexecute("INSERT Personne X: X nom 'tag'")[0][0]
- self.commit()
rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'bidule'})
self.assertEqual(rset.rows, [[eid1]])
rset = self.qexecute("Any X WHERE X has_text %(text)s", {'text': 'tag'})
@@ -1518,12 +1550,6 @@
# huum, psycopg specific
self.qexecute('SET X creation_date %(date)s WHERE X eid 1', {'date': date.today()})
- def test_nonregr_set_query(self):
- ueid = self.execute("INSERT CWUser X: X login 'bob', X upassword 'toto'")[0][0]
- self.execute("SET E in_group G, E firstname %(firstname)s, E surname %(surname)s "
- "WHERE E eid %(x)s, G name 'users'",
- {'x':ueid, 'firstname': u'jean', 'surname': u'paul'})
-
def test_nonregr_u_owned_by_u(self):
ueid = self.qexecute("INSERT CWUser X: X login 'bob', X upassword 'toto', X in_group G "
"WHERE G name 'users'")[0][0]
@@ -1569,7 +1595,6 @@
peid = self.qexecute("INSERT CWUser X: X login 'bidule', X upassword 'bidule', "
"X in_group G WHERE G name 'users'")[0][0]
aeid = self.qexecute("INSERT Affaire X: X ref 'bidule'")[0][0]
- self.commit()
rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule"')
self.assertEqual(rset.rows, [[peid]])
rset = self.qexecute('Any X WHERE X is CWUser, X has_text "bidule", '
@@ -1588,39 +1613,39 @@
class NonRegressionTC(CubicWebTC):
def test_has_text_security_cache_bug(self):
- req = self.request()
- self.create_user(req, 'user', ('users',))
- aff1 = req.create_entity('Societe', nom=u'aff1')
- aff2 = req.create_entity('Societe', nom=u'aff2')
- self.commit()
- with self.login('user', password='user'):
- res = self.execute('Any X WHERE X has_text %(text)s', {'text': 'aff1'})
+ with self.admin_access.repo_cnx() as cnx:
+ self.create_user(cnx, 'user', ('users',))
+ aff1 = cnx.create_entity('Societe', nom=u'aff1')
+ aff2 = cnx.create_entity('Societe', nom=u'aff2')
+ cnx.commit()
+ with self.new_access('user').repo_cnx() as cnx:
+ res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': 'aff1'})
self.assertEqual(res.rows, [[aff1.eid]])
- res = self.execute('Any X WHERE X has_text %(text)s', {'text': 'aff2'})
+ res = cnx.execute('Any X WHERE X has_text %(text)s', {'text': 'aff2'})
self.assertEqual(res.rows, [[aff2.eid]])
def test_set_relations_eid(self):
- req = self.request()
- # create 3 email addresses
- a1 = req.create_entity('EmailAddress', address=u'a1')
- a2 = req.create_entity('EmailAddress', address=u'a2')
- a3 = req.create_entity('EmailAddress', address=u'a3')
- # SET relations using '>=' operator on eids
- req.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid)
- self.assertEqual(
- [[a2.eid], [a3.eid]],
- req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
- # DELETE
- req.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid)
- self.assertEqual(
- [[a2.eid]],
- req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
- req.execute('DELETE U use_email A WHERE U login "admin"')
- # SET relations using '<' operator on eids
- req.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid)
- self.assertEqual(
- [[a1.eid]],
- req.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
+ with self.admin_access.repo_cnx() as cnx:
+ # create 3 email addresses
+ a1 = cnx.create_entity('EmailAddress', address=u'a1')
+ a2 = cnx.create_entity('EmailAddress', address=u'a2')
+ a3 = cnx.create_entity('EmailAddress', address=u'a3')
+ # SET relations using '>=' operator on eids
+ cnx.execute('SET U use_email A WHERE U login "admin", A eid >= %s' % a2.eid)
+ self.assertEqual(
+ [[a2.eid], [a3.eid]],
+ cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
+ # DELETE
+ cnx.execute('DELETE U use_email A WHERE U login "admin", A eid > %s' % a2.eid)
+ self.assertEqual(
+ [[a2.eid]],
+ cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
+ cnx.execute('DELETE U use_email A WHERE U login "admin"')
+ # SET relations using '<' operator on eids
+ cnx.execute('SET U use_email A WHERE U login "admin", A eid < %s' % a2.eid)
+ self.assertEqual(
+ [[a1.eid]],
+ cnx.execute('Any A ORDERBY A WHERE U use_email A, U login "admin"').rows)
if __name__ == '__main__':
unittest_main()