server/test/unittest_repository.py
changeset 4191 01638461d4b0
parent 3777 3ef8cdb5fb1c
child 4252 6c4f109c2b03
equal deleted inserted replaced
4190:742e3eb16f81 4191:01638461d4b0
    58                                           (u'Interval',), (u'Password',),
    58                                           (u'Interval',), (u'Password',),
    59                                           (u'String',), (u'Time',)])
    59                                           (u'String',), (u'Time',)])
    60 
    60 
    61     def test_schema_has_owner(self):
    61     def test_schema_has_owner(self):
    62         repo = self.repo
    62         repo = self.repo
    63         cnxid = repo.connect(self.admlogin, self.admpassword)
    63         cnxid = repo.connect(self.admlogin, password=self.admpassword)
    64         self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
    64         self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U'))
    65         self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
    65         self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U'))
    66         self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
    66         self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U'))
    67         self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U'))
    67         self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U'))
    68         self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U'))
    68         self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U'))
    69         self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
    69         self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U'))
    70 
    70 
    71     def test_connect(self):
    71     def test_connect(self):
    72         self.assert_(self.repo.connect(self.admlogin, self.admpassword))
    72         self.assert_(self.repo.connect(self.admlogin, password=self.admpassword))
    73         self.assertRaises(AuthenticationError,
    73         self.assertRaises(AuthenticationError,
    74                           self.repo.connect, self.admlogin, 'nimportnawak')
    74                           self.repo.connect, self.admlogin, password='nimportnawak')
    75         self.assertRaises(AuthenticationError,
    75         self.assertRaises(AuthenticationError,
    76                           self.repo.connect, self.admlogin, None)
    76                           self.repo.connect, self.admlogin, password=None)
    77         self.assertRaises(AuthenticationError,
    77         self.assertRaises(AuthenticationError,
    78                           self.repo.connect, None, None)
    78                           self.repo.connect, None, password=None)
       
    79         self.assertRaises(AuthenticationError,
       
    80                           self.repo.connect, self.admlogin)
       
    81         self.assertRaises(AuthenticationError,
       
    82                           self.repo.connect, None)
    79 
    83 
    80     def test_execute(self):
    84     def test_execute(self):
    81         repo = self.repo
    85         repo = self.repo
    82         cnxid = repo.connect(self.admlogin, self.admpassword)
    86         cnxid = repo.connect(self.admlogin, password=self.admpassword)
    83         repo.execute(cnxid, 'Any X')
    87         repo.execute(cnxid, 'Any X')
    84         repo.execute(cnxid, 'Any X where X is Personne')
    88         repo.execute(cnxid, 'Any X where X is Personne')
    85         repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
    89         repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
    86         repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
    90         repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
    87         repo.close(cnxid)
    91         repo.close(cnxid)
    88 
    92 
    89     def test_login_upassword_accent(self):
    93     def test_login_upassword_accent(self):
    90         repo = self.repo
    94         repo = self.repo
    91         cnxid = repo.connect(self.admlogin, self.admpassword)
    95         cnxid = repo.connect(self.admlogin, password=self.admpassword)
    92         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"',
    96         repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_group G WHERE G name "users"',
    93                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
    97                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
    94         repo.commit(cnxid)
    98         repo.commit(cnxid)
    95         repo.close(cnxid)
    99         repo.close(cnxid)
    96         self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
   100         self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')))
    97 
   101 
    98     def test_invalid_entity_rollback(self):
   102     def test_invalid_entity_rollback(self):
    99         cnxid = self.repo.connect(self.admlogin, self.admpassword)
   103         cnxid = self.repo.connect(self.admlogin, password=self.admpassword)
   100         # no group
   104         # no group
   101         self.repo.execute(cnxid,
   105         self.repo.execute(cnxid,
   102                           'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
   106                           'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s',
   103                           {'login': u"tutetute", 'passwd': 'tutetute'})
   107                           {'login': u"tutetute", 'passwd': 'tutetute'})
   104         self.assertRaises(ValidationError, self.repo.commit, cnxid)
   108         self.assertRaises(ValidationError, self.repo.commit, cnxid)
   105         self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"'))
   109         self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"'))
   106 
   110 
   107     def test_close(self):
   111     def test_close(self):
   108         repo = self.repo
   112         repo = self.repo
   109         cnxid = repo.connect(self.admlogin, self.admpassword)
   113         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   110         self.assert_(cnxid)
   114         self.assert_(cnxid)
   111         repo.close(cnxid)
   115         repo.close(cnxid)
   112         self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
   116         self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
   113 
   117 
   114     def test_invalid_cnxid(self):
   118     def test_invalid_cnxid(self):
   115         self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
   119         self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
   116         self.assertRaises(BadConnectionId, self.repo.close, None)
   120         self.assertRaises(BadConnectionId, self.repo.close, None)
   117 
   121 
   118     def test_shared_data(self):
   122     def test_shared_data(self):
   119         repo = self.repo
   123         repo = self.repo
   120         cnxid = repo.connect(self.admlogin, self.admpassword)
   124         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   121         repo.set_shared_data(cnxid, 'data', 4)
   125         repo.set_shared_data(cnxid, 'data', 4)
   122         cnxid2 = repo.connect(self.admlogin, self.admpassword)
   126         cnxid2 = repo.connect(self.admlogin, password=self.admpassword)
   123         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   127         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   124         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
   128         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
   125         repo.set_shared_data(cnxid2, 'data', 5)
   129         repo.set_shared_data(cnxid2, 'data', 5)
   126         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   130         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   127         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5)
   131         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5)
   135         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1)
   139         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1)
   136         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1)
   140         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1)
   137 
   141 
   138     def test_check_session(self):
   142     def test_check_session(self):
   139         repo = self.repo
   143         repo = self.repo
   140         cnxid = repo.connect(self.admlogin, self.admpassword)
   144         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   141         self.assertEquals(repo.check_session(cnxid), None)
   145         self.assertEquals(repo.check_session(cnxid), None)
   142         repo.close(cnxid)
   146         repo.close(cnxid)
   143         self.assertRaises(BadConnectionId, repo.check_session, cnxid)
   147         self.assertRaises(BadConnectionId, repo.check_session, cnxid)
   144 
   148 
   145     def test_transaction_base(self):
   149     def test_transaction_base(self):
   146         repo = self.repo
   150         repo = self.repo
   147         cnxid = repo.connect(self.admlogin, self.admpassword)
   151         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   148         # check db state
   152         # check db state
   149         result = repo.execute(cnxid, 'Personne X')
   153         result = repo.execute(cnxid, 'Personne X')
   150         self.assertEquals(result.rowcount, 0)
   154         self.assertEquals(result.rowcount, 0)
   151         # rollback entity insertion
   155         # rollback entity insertion
   152         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
   156         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
   161         result = repo.execute(cnxid, 'Personne X')
   165         result = repo.execute(cnxid, 'Personne X')
   162         self.assertEquals(result.rowcount, 1)
   166         self.assertEquals(result.rowcount, 1)
   163 
   167 
   164     def test_transaction_base2(self):
   168     def test_transaction_base2(self):
   165         repo = self.repo
   169         repo = self.repo
   166         cnxid = repo.connect(self.admlogin, self.admpassword)
   170         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   167         # rollback relation insertion
   171         # rollback relation insertion
   168         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
   172         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
   169         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   173         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   170         self.assertEquals(result.rowcount, 1)
   174         self.assertEquals(result.rowcount, 1)
   171         repo.rollback(cnxid)
   175         repo.rollback(cnxid)
   172         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   176         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
   173         self.assertEquals(result.rowcount, 0, result.rows)
   177         self.assertEquals(result.rowcount, 0, result.rows)
   174 
   178 
   175     def test_transaction_base3(self):
   179     def test_transaction_base3(self):
   176         repo = self.repo
   180         repo = self.repo
   177         cnxid = repo.connect(self.admlogin, self.admpassword)
   181         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   178         # rollback state change which trigger TrInfo insertion
   182         # rollback state change which trigger TrInfo insertion
   179         user = repo._get_session(cnxid).user
   183         user = repo._get_session(cnxid).user
   180         user.fire_transition('deactivate')
   184         user.fire_transition('deactivate')
   181         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
   185         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid})
   182         self.assertEquals(len(rset), 1)
   186         self.assertEquals(len(rset), 1)
   187     def test_transaction_interleaved(self):
   191     def test_transaction_interleaved(self):
   188         self.skip('implement me')
   192         self.skip('implement me')
   189 
   193 
   190     def test_close_wait_processing_request(self):
   194     def test_close_wait_processing_request(self):
   191         repo = self.repo
   195         repo = self.repo
   192         cnxid = repo.connect(self.admlogin, self.admpassword)
   196         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   193         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
   197         repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
   194         repo.commit(cnxid)
   198         repo.commit(cnxid)
   195         # close has to be in the thread due to sqlite limitations
   199         # close has to be in the thread due to sqlite limitations
   196         def close_in_a_few_moment():
   200         def close_in_a_few_moment():
   197             time.sleep(0.1)
   201             time.sleep(0.1)
   208         schema = self.repo.schema
   212         schema = self.repo.schema
   209         # check order of attributes is respected
   213         # check order of attributes is respected
   210         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   214         self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations()
   211                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
   215                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity',
   212                                                  'creation_date', 'modification_date', 'cwuri',
   216                                                  'creation_date', 'modification_date', 'cwuri',
   213                                                  'owned_by', 'created_by')],
   217                                                  'owned_by', 'created_by',
   214                               ['relation_type', 'from_entity', 'to_entity', 'in_basket', 'constrained_by',
   218                                                  'add_permission', 'delete_permission', 'read_permission')],
       
   219                               ['relation_type',
       
   220                                'from_entity', 'to_entity',
       
   221                                'in_basket', 'constrained_by', 
   215                                'cardinality', 'ordernum',
   222                                'cardinality', 'ordernum',
   216                                'indexed', 'fulltextindexed', 'internationalizable',
   223                                'indexed', 'fulltextindexed', 'internationalizable',
   217                                'defaultval', 'description', 'description_format'])
   224                                'defaultval', 'description', 'description_format'])
   218 
   225 
   219         self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
   226         self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name')
   220         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
   227         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
   221 
   228 
   222         constraints = schema.rschema('name').rproperty('CWEType', 'String', 'constraints')
   229         constraints = schema.rschema('name').rdef('CWEType', 'String').constraints
   223         self.assertEquals(len(constraints), 2)
   230         self.assertEquals(len(constraints), 2)
   224         for cstr in constraints[:]:
   231         for cstr in constraints[:]:
   225             if isinstance(cstr, UniqueConstraint):
   232             if isinstance(cstr, UniqueConstraint):
   226                 constraints.remove(cstr)
   233                 constraints.remove(cstr)
   227                 break
   234                 break
   229             self.fail('unique constraint not found')
   236             self.fail('unique constraint not found')
   230         sizeconstraint = constraints[0]
   237         sizeconstraint = constraints[0]
   231         self.assertEquals(sizeconstraint.min, None)
   238         self.assertEquals(sizeconstraint.min, None)
   232         self.assertEquals(sizeconstraint.max, 64)
   239         self.assertEquals(sizeconstraint.max, 64)
   233 
   240 
   234         constraints = schema.rschema('relation_type').rproperty('CWAttribute', 'CWRType', 'constraints')
   241         constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints
   235         self.assertEquals(len(constraints), 1)
   242         self.assertEquals(len(constraints), 1)
   236         cstr = constraints[0]
   243         cstr = constraints[0]
   237         self.assert_(isinstance(cstr, RQLConstraint))
   244         self.assert_(isinstance(cstr, RQLConstraint))
   238         self.assertEquals(cstr.restriction, 'O final TRUE')
   245         self.assertEquals(cstr.restriction, 'O final TRUE')
   239 
   246 
   256                 self.fail('something went wrong, thread still alive')
   263                 self.fail('something went wrong, thread still alive')
   257         finally:
   264         finally:
   258             repository.pyro_unregister(self.repo.config)
   265             repository.pyro_unregister(self.repo.config)
   259 
   266 
   260     def _pyro_client(self, done):
   267     def _pyro_client(self, done):
   261         cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
   268         cnx = connect(self.repo.config.appid, u'admin', password='gingkow')
   262         try:
   269         try:
   263             # check we can get the schema
   270             # check we can get the schema
   264             schema = cnx.get_schema()
   271             schema = cnx.get_schema()
   265             self.assertEquals(schema.__hashmode__, None)
   272             self.assertEquals(schema.__hashmode__, None)
   266             cu = cnx.cursor()
   273             cu = cnx.cursor()
   271             # connect monkey path some method by default, remove them
   278             # connect monkey path some method by default, remove them
   272             multiple_connections_unfix()
   279             multiple_connections_unfix()
   273 
   280 
   274     def test_internal_api(self):
   281     def test_internal_api(self):
   275         repo = self.repo
   282         repo = self.repo
   276         cnxid = repo.connect(self.admlogin, self.admpassword)
   283         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   277         session = repo._get_session(cnxid, setpool=True)
   284         session = repo._get_session(cnxid, setpool=True)
   278         self.assertEquals(repo.type_and_source_from_eid(1, session),
   285         self.assertEquals(repo.type_and_source_from_eid(1, session),
   279                           ('CWGroup', 'system', None))
   286                           ('CWGroup', 'system', None))
   280         self.assertEquals(repo.type_from_eid(1, session), 'CWGroup')
   287         self.assertEquals(repo.type_from_eid(1, session), 'CWGroup')
   281         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
   288         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
   289         # .properties() return a result set
   296         # .properties() return a result set
   290         self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
   297         self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U')
   291 
   298 
   292     def test_session_api(self):
   299     def test_session_api(self):
   293         repo = self.repo
   300         repo = self.repo
   294         cnxid = repo.connect(self.admlogin, self.admpassword)
   301         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   295         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
   302         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
   296         self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
   303         self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None))
   297         repo.close(cnxid)
   304         repo.close(cnxid)
   298         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
   305         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
   299         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
   306         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
   300 
   307 
   301     def test_shared_data_api(self):
   308     def test_shared_data_api(self):
   302         repo = self.repo
   309         repo = self.repo
   303         cnxid = repo.connect(self.admlogin, self.admpassword)
   310         cnxid = repo.connect(self.admlogin, password=self.admpassword)
   304         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
   311         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
   305         repo.set_shared_data(cnxid, 'data', 4)
   312         repo.set_shared_data(cnxid, 'data', 4)
   306         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   313         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
   307         repo.get_shared_data(cnxid, 'data', pop=True)
   314         repo.get_shared_data(cnxid, 'data', pop=True)
   308         repo.get_shared_data(cnxid, 'whatever', pop=True)
   315         repo.get_shared_data(cnxid, 'whatever', pop=True)
   324 #         finally:
   331 #         finally:
   325 #             self.set_debug(False)
   332 #             self.set_debug(False)
   326 #         print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)
   333 #         print 'test time: %.3f (time) %.3f (cpu)' % ((time() - t), clock() - c)
   327 
   334 
   328     def test_delete_if_singlecard1(self):
   335     def test_delete_if_singlecard1(self):
   329         note = self.add_entity('Affaire')
   336         note = self.request().create_entity('Affaire')
   330         p1 = self.add_entity('Personne', nom=u'toto')
   337         p1 = self.request().create_entity('Personne', nom=u'toto')
   331         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   338         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   332                      {'x': note.eid, 'p': p1.eid})
   339                      {'x': note.eid, 'p': p1.eid})
   333         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   340         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   334                             {'x': note.eid})
   341                             {'x': note.eid})
   335         self.assertEquals(len(rset), 1)
   342         self.assertEquals(len(rset), 1)
   336         p2 = self.add_entity('Personne', nom=u'tutu')
   343         p2 = self.request().create_entity('Personne', nom=u'tutu')
   337         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   344         self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s',
   338                      {'x': note.eid, 'p': p2.eid})
   345                      {'x': note.eid, 'p': p2.eid})
   339         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   346         rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s',
   340                             {'x': note.eid})
   347                             {'x': note.eid})
   341         self.assertEquals(len(rset), 1)
   348         self.assertEquals(len(rset), 1)
   417         self.assertEquals(modified, [])
   424         self.assertEquals(modified, [])
   418         self.assertEquals(deleted, [('Personne', eidp)])
   425         self.assertEquals(deleted, [('Personne', eidp)])
   419 
   426 
   420     def test_composite_entity(self):
   427     def test_composite_entity(self):
   421         assert self.schema.rschema('use_email').fulltext_container == 'subject'
   428         assert self.schema.rschema('use_email').fulltext_container == 'subject'
   422         eid = self.add_entity('EmailAddress', address=u'toto@logilab.fr').eid
   429         eid = self.request().create_entity('EmailAddress', address=u'toto@logilab.fr').eid
   423         self.commit()
   430         self.commit()
   424         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   431         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   425         self.assertEquals(rset.rows, [[eid]])
   432         self.assertEquals(rset.rows, [[eid]])
   426         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
   433         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
   427         self.commit()
   434         self.commit()
   429         self.assertEquals(rset.rows, [[self.session.user.eid]])
   436         self.assertEquals(rset.rows, [[self.session.user.eid]])
   430         self.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
   437         self.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
   431         self.commit()
   438         self.commit()
   432         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   439         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
   433         self.assertEquals(rset.rows, [])
   440         self.assertEquals(rset.rows, [])
   434         eid = self.add_entity('EmailAddress', address=u'tutu@logilab.fr').eid
   441         eid = self.request().create_entity('EmailAddress', address=u'tutu@logilab.fr').eid
   435         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
   442         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
   436         self.commit()
   443         self.commit()
   437         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
   444         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
   438         self.assertEquals(rset.rows, [[self.session.user.eid]])
   445         self.assertEquals(rset.rows, [[self.session.user.eid]])
   439 
   446