server/test/unittest_repository.py
changeset 0 b97547f5f1fa
child 17 62ce3e6126e0
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 # -*- coding: iso-8859-1 -*-
       
     2 """unit tests for module cubicweb.server.repository"""
       
     3 
       
     4 import os
       
     5 import sys
       
     6 import threading
       
     7 import time
       
     8 from copy import deepcopy
       
     9 
       
    10 from mx.DateTime import DateTimeType, now
       
    11 from logilab.common.testlib import TestCase, unittest_main
       
    12 from cubicweb.devtools.apptest import RepositoryBasedTC
       
    13 from cubicweb.devtools.repotest import tuplify
       
    14 
       
    15 from yams.constraints import UniqueConstraint
       
    16 
       
    17 from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError
       
    18 from cubicweb.schema import CubicWebSchema, RQLConstraint
       
    19 from cubicweb.dbapi import connect, repo_connect
       
    20 
       
    21 from cubicweb.server import repository 
       
    22 
       
    23 
       
    24 # start name server anyway, process will fail if already running
       
    25 os.system('pyro-ns >/dev/null 2>/dev/null &')
       
    26 
       
    27 
       
    28 class RepositoryTC(RepositoryBasedTC):
       
    29     """ singleton providing access to a persistent storage for entities
       
    30     and relation
       
    31     """
       
    32     
       
    33 #     def setUp(self):
       
    34 #         pass
       
    35     
       
    36 #     def tearDown(self):
       
    37 #         self.repo.config.db_perms = True
       
    38 #         cnxid = self.repo.connect(*self.default_user_password())
       
    39 #         for etype in ('Affaire', 'Note', 'Societe', 'Personne'):
       
    40 #             self.repo.execute(cnxid, 'DELETE %s X' % etype)
       
    41 #             self.repo.commit(cnxid)
       
    42 #         self.repo.close(cnxid)
       
    43 
       
    44     def test_fill_schema(self):
       
    45         self.repo.schema = CubicWebSchema(self.repo.config.appid)
       
    46         self.repo.config._cubes = None # avoid assertion error
       
    47         self.repo.fill_schema()
       
    48         pool = self.repo._get_pool()
       
    49         try:
       
    50             sqlcursor = pool['system']
       
    51             sqlcursor.execute('SELECT name FROM EEType WHERE final is NULL')
       
    52             self.assertEquals(sqlcursor.fetchall(), [])
       
    53             sqlcursor.execute('SELECT name FROM EEType WHERE final=%(final)s ORDER BY name', {'final': 'TRUE'})
       
    54             self.assertEquals(sqlcursor.fetchall(), [(u'Boolean',), (u'Bytes',),
       
    55                                                      (u'Date',), (u'Datetime',),
       
    56                                                      (u'Decimal',),(u'Float',),
       
    57                                                      (u'Int',),
       
    58                                                      (u'Interval',), (u'Password',),
       
    59                                                      (u'String',), (u'Time',)])
       
    60         finally:
       
    61             self.repo._free_pool(pool)
       
    62             
       
    63     def test_schema_has_owner(self):
       
    64         repo = self.repo
       
    65         cnxid = repo.connect(*self.default_user_password())
       
    66         self.failIf(repo.execute(cnxid, 'EEType X WHERE NOT X owned_by U'))
       
    67         self.failIf(repo.execute(cnxid, 'ERType X WHERE NOT X owned_by U'))
       
    68         self.failIf(repo.execute(cnxid, 'EFRDef X WHERE NOT X owned_by U'))
       
    69         self.failIf(repo.execute(cnxid, 'ENFRDef X WHERE NOT X owned_by U'))
       
    70         self.failIf(repo.execute(cnxid, 'EConstraint X WHERE NOT X owned_by U'))
       
    71         self.failIf(repo.execute(cnxid, 'EConstraintType X WHERE NOT X owned_by U'))
       
    72         
       
    73     def test_connect(self):
       
    74         login, passwd = self.default_user_password()
       
    75         self.assert_(self.repo.connect(login, passwd))
       
    76         self.assertRaises(AuthenticationError,
       
    77                           self.repo.connect, login, 'nimportnawak')
       
    78         self.assertRaises(AuthenticationError,
       
    79                           self.repo.connect, login, None)
       
    80         self.assertRaises(AuthenticationError,
       
    81                           self.repo.connect, None, None)
       
    82     
       
    83     def test_execute(self):
       
    84         repo = self.repo
       
    85         cnxid = repo.connect(*self.default_user_password())
       
    86         repo.execute(cnxid, 'Any X')
       
    87         repo.execute(cnxid, 'Any X where X is Personne')
       
    88         repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"')
       
    89         repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'})
       
    90         repo.close(cnxid)
       
    91         
       
    92     def test_login_upassword_accent(self):
       
    93         repo = self.repo
       
    94         cnxid = repo.connect(*self.default_user_password())
       
    95         repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"',
       
    96                      {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')})
       
    97         repo.commit(cnxid)
       
    98         repo.close(cnxid)
       
    99         self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8')))
       
   100     
       
   101     def test_invalid_entity_rollback(self):
       
   102         repo = self.repo
       
   103         cnxid = repo.connect(*self.default_user_password())
       
   104         repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"',
       
   105                      {'login': u"tutetute", 'passwd': 'tutetute'})
       
   106         self.assertRaises(ValidationError, repo.commit, cnxid)
       
   107         rset = repo.execute(cnxid, 'EUser X WHERE X login "tutetute"')
       
   108         self.assertEquals(rset.rowcount, 0)
       
   109         
       
   110     def test_close(self):
       
   111         repo = self.repo
       
   112         cnxid = repo.connect(*self.default_user_password())
       
   113         self.assert_(cnxid)
       
   114         repo.close(cnxid)
       
   115         self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X')
       
   116     
       
   117     def test_invalid_cnxid(self):
       
   118         self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X')
       
   119         self.assertRaises(BadConnectionId, self.repo.close, None)
       
   120     
       
   121     def test_shared_data(self):
       
   122         repo = self.repo
       
   123         cnxid = repo.connect(*self.default_user_password())
       
   124         repo.set_shared_data(cnxid, 'data', 4)
       
   125         cnxid2 = repo.connect(*self.default_user_password())
       
   126         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
       
   127         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
       
   128         repo.set_shared_data(cnxid2, 'data', 5)
       
   129         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
       
   130         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5)
       
   131         repo.get_shared_data(cnxid2, 'data', pop=True)
       
   132         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
       
   133         self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None)
       
   134         repo.close(cnxid)
       
   135         repo.close(cnxid2)
       
   136         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
       
   137         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data')
       
   138         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1)
       
   139         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1)
       
   140 
       
   141     def test_check_session(self):
       
   142         repo = self.repo
       
   143         cnxid = repo.connect(*self.default_user_password())
       
   144         self.assertEquals(repo.check_session(cnxid), None)
       
   145         repo.close(cnxid)
       
   146         self.assertRaises(BadConnectionId, repo.check_session, cnxid)
       
   147 
       
   148     def test_transaction_base(self):
       
   149         repo = self.repo
       
   150         cnxid = repo.connect(*self.default_user_password())
       
   151         # check db state
       
   152         result = repo.execute(cnxid, 'Personne X')
       
   153         self.assertEquals(result.rowcount, 0)
       
   154         # rollback entity insertion
       
   155         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
       
   156         result = repo.execute(cnxid, 'Personne X')
       
   157         self.assertEquals(result.rowcount, 1)
       
   158         repo.rollback(cnxid)
       
   159         result = repo.execute(cnxid, 'Personne X')
       
   160         self.assertEquals(result.rowcount, 0, result.rows)
       
   161         # commit
       
   162         repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'")
       
   163         repo.commit(cnxid)
       
   164         result = repo.execute(cnxid, 'Personne X')
       
   165         self.assertEquals(result.rowcount, 1)
       
   166 
       
   167     def test_transaction_base2(self):
       
   168         repo = self.repo
       
   169         cnxid = repo.connect(*self.default_user_password())
       
   170         # rollback relation insertion
       
   171         repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'")
       
   172         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
       
   173         self.assertEquals(result.rowcount, 1)
       
   174         repo.rollback(cnxid)
       
   175         result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'")
       
   176         self.assertEquals(result.rowcount, 0, result.rows)
       
   177         
       
   178     def test_transaction_base3(self):
       
   179         repo = self.repo
       
   180         cnxid = repo.connect(*self.default_user_password())
       
   181         # rollback state change which trigger TrInfo insertion
       
   182         ueid = repo._get_session(cnxid).user.eid
       
   183         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
       
   184         self.assertEquals(len(rset), 1)
       
   185         repo.execute(cnxid, 'SET X in_state S WHERE X eid %(x)s, S name "deactivated"',
       
   186                      {'x': ueid}, 'x')
       
   187         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
       
   188         self.assertEquals(len(rset), 2)
       
   189         repo.rollback(cnxid)
       
   190         rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid})
       
   191         self.assertEquals(len(rset), 1)
       
   192         
       
   193     def test_transaction_interleaved(self):
       
   194         self.skip('implement me')
       
   195 
       
   196     def test_initial_schema(self):
       
   197         schema = self.repo.schema
       
   198         # check order of attributes is respected
       
   199         self.assertListEquals([r.type for r in schema.eschema('EFRDef').ordered_relations()
       
   200                                if not r.type in ('eid', 'is', 'is_instance_of', 'identity', 
       
   201                                                  'creation_date', 'modification_date',
       
   202                                                  'owned_by', 'created_by')],
       
   203                               ['relation_type', 'from_entity', 'to_entity', 'constrained_by',
       
   204                                'cardinality', 'ordernum', 
       
   205                                'indexed', 'fulltextindexed', 'internationalizable',
       
   206                                'defaultval', 'description_format', 'description'])
       
   207 
       
   208         self.assertEquals(schema.eschema('EEType').main_attribute(), 'name')
       
   209         self.assertEquals(schema.eschema('State').main_attribute(), 'name')
       
   210 
       
   211         constraints = schema.rschema('name').rproperty('EEType', 'String', 'constraints')
       
   212         self.assertEquals(len(constraints), 2)
       
   213         for cstr in constraints[:]:
       
   214             if isinstance(cstr, UniqueConstraint):
       
   215                 constraints.remove(cstr)
       
   216                 break
       
   217         else:
       
   218             self.fail('unique constraint not found')
       
   219         sizeconstraint = constraints[0]
       
   220         self.assertEquals(sizeconstraint.min, None)
       
   221         self.assertEquals(sizeconstraint.max, 64)
       
   222 
       
   223         constraints = schema.rschema('relation_type').rproperty('EFRDef', 'ERType', 'constraints')
       
   224         self.assertEquals(len(constraints), 1)
       
   225         cstr = constraints[0]
       
   226         self.assert_(isinstance(cstr, RQLConstraint))
       
   227         self.assertEquals(cstr.restriction, 'O final TRUE')
       
   228 
       
   229         ownedby = schema.rschema('owned_by')
       
   230         self.assertEquals(ownedby.objects('EEType'), ('EUser',))
       
   231 
       
   232     def test_pyro(self):
       
   233         import Pyro
       
   234         Pyro.config.PYRO_MULTITHREADED = 0
       
   235         lock = threading.Lock()
       
   236         # the client part has to be in the thread due to sqlite limitations
       
   237         t = threading.Thread(target=self._pyro_client, args=(lock,))
       
   238         try:
       
   239             daemon = self.repo.pyro_register()
       
   240             t.start()
       
   241             # connection
       
   242             daemon.handleRequests(1.0)
       
   243             daemon.handleRequests(1.0)
       
   244             daemon.handleRequests(1.0)
       
   245             # get schema
       
   246             daemon.handleRequests(1.0)
       
   247             # execute
       
   248             daemon.handleRequests(1.0)
       
   249             t.join()
       
   250         finally:
       
   251             repository.pyro_unregister(self.repo.config)
       
   252             
       
   253     def _pyro_client(self, lock):
       
   254         cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
       
   255         # check we can get the schema
       
   256         schema = cnx.get_schema()
       
   257         self.assertEquals(schema.__hashmode__, None)
       
   258         rset = cnx.cursor().execute('Any U,G WHERE U in_group G')
       
   259         
       
   260 
       
   261     def test_internal_api(self):
       
   262         repo = self.repo
       
   263         cnxid = repo.connect(*self.default_user_password())
       
   264         session = repo._get_session(cnxid, setpool=True)
       
   265         self.assertEquals(repo.type_and_source_from_eid(1, session), ('EGroup', 'system', None))
       
   266         self.assertEquals(repo.type_from_eid(1, session), 'EGroup')
       
   267         self.assertEquals(repo.source_from_eid(1, session).uri, 'system')
       
   268         self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None)
       
   269         class dummysource: uri = 'toto'
       
   270         self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session)
       
   271 
       
   272     def test_public_api(self):
       
   273         self.assertEquals(self.repo.get_schema(), self.repo.schema)
       
   274         self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}})
       
   275         # .properties() return a result set
       
   276         self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is EProperty,P pkey K, P value V, NOT P for_user U')
       
   277 
       
   278     def test_session_api(self):
       
   279         repo = self.repo
       
   280         cnxid = repo.connect(*self.default_user_password())
       
   281         self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {}))
       
   282         self.assertEquals(repo.describe(cnxid, 1), (u'EGroup', u'system', None))
       
   283         repo.close(cnxid)
       
   284         self.assertRaises(BadConnectionId, repo.user_info, cnxid)
       
   285         self.assertRaises(BadConnectionId, repo.describe, cnxid, 1)
       
   286 
       
   287     def test_shared_data_api(self):
       
   288         repo = self.repo
       
   289         cnxid = repo.connect(*self.default_user_password())
       
   290         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
       
   291         repo.set_shared_data(cnxid, 'data', 4)
       
   292         self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4)
       
   293         repo.get_shared_data(cnxid, 'data', pop=True)
       
   294         repo.get_shared_data(cnxid, 'whatever', pop=True)
       
   295         self.assertEquals(repo.get_shared_data(cnxid, 'data'), None)
       
   296         repo.close(cnxid)
       
   297         self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0)
       
   298         self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data')
       
   299         
       
   300 
       
   301 class DataHelpersTC(RepositoryBasedTC):
       
   302     
       
   303     def setUp(self):
       
   304         """ called before each test from this class """
       
   305         cnxid = self.repo.connect(*self.default_user_password())
       
   306         self.session = self.repo._sessions[cnxid]
       
   307         self.session.set_pool()
       
   308 
       
   309     def tearDown(self):
       
   310         self.session.rollback()
       
   311         
       
   312     def test_create_eid(self):
       
   313         self.assert_(self.repo.system_source.create_eid(self.session))
       
   314 
       
   315     def test_source_from_eid(self):
       
   316         self.assertEquals(self.repo.source_from_eid(1, self.session),
       
   317                           self.repo.sources_by_uri['system'])
       
   318 
       
   319     def test_source_from_eid_raise(self):
       
   320         self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session)
       
   321 
       
   322     def test_type_from_eid(self):
       
   323         self.assertEquals(self.repo.type_from_eid(1, self.session), 'EGroup')
       
   324         
       
   325     def test_type_from_eid_raise(self):
       
   326         self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session)
       
   327         
       
   328     def test_add_delete_info(self):
       
   329         entity = self.repo.vreg.etype_class('Personne')(self.session, None, None)
       
   330         entity.eid = -1
       
   331         entity.complete = lambda x: None
       
   332         self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system'])
       
   333         cursor = self.session.pool['system']
       
   334         cursor.execute('SELECT * FROM entities WHERE eid = -1')
       
   335         data = cursor.fetchall()
       
   336         self.assertIsInstance(data[0][3], DateTimeType)
       
   337         data[0] = list(data[0])
       
   338         data[0][3] = None
       
   339         self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)])
       
   340         self.repo.delete_info(self.session, -1)
       
   341         #self.repo.commit()
       
   342         cursor.execute('SELECT * FROM entities WHERE eid = -1')
       
   343         data = cursor.fetchall()
       
   344         self.assertEquals(data, [])
       
   345 
       
   346 
       
   347 class FTITC(RepositoryBasedTC):
       
   348     
       
   349     def test_reindex_and_modified_since(self):
       
   350         cursor = self.session.pool['system']
       
   351         eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0]
       
   352         self.commit()
       
   353         ts = now()
       
   354         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
       
   355         cursor.execute('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp)
       
   356         omtime = cursor.fetchone()[0]
       
   357         # our sqlite datetime adapter is ignore seconds fraction, so we have to
       
   358         # ensure update is done the next seconds
       
   359         time.sleep(1 - (ts.second - int(ts.second)))
       
   360         self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x')
       
   361         self.commit()
       
   362         self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1)
       
   363         cursor.execute('SELECT mtime FROM entities WHERE eid = %s' % eidp)
       
   364         mtime = cursor.fetchone()[0]
       
   365         self.failUnless(omtime < mtime)
       
   366         self.commit()
       
   367         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
       
   368         self.assertEquals(modified, [('Personne', eidp)])
       
   369         self.assertEquals(deleted, [])
       
   370         date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime)
       
   371         self.assertEquals(modified, [])
       
   372         self.assertEquals(deleted, [])
       
   373         self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp})
       
   374         self.commit()
       
   375         date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime)
       
   376         self.assertEquals(modified, [])
       
   377         self.assertEquals(deleted, [('Personne', eidp)])
       
   378 
       
   379     def test_composite_entity(self):
       
   380         assert self.schema.rschema('use_email').fulltext_container == 'subject'
       
   381         eid = self.add_entity('EmailAddress', address=u'toto@logilab.fr').eid
       
   382         self.commit()
       
   383         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
       
   384         self.assertEquals(rset.rows, [[eid]])
       
   385         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
       
   386         self.commit()
       
   387         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
       
   388         self.assertEquals(rset.rows, [[self.session.user.eid]])
       
   389         self.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
       
   390         self.commit()
       
   391         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'})
       
   392         self.assertEquals(rset.rows, [])
       
   393         eid = self.add_entity('EmailAddress', address=u'tutu@logilab.fr').eid
       
   394         self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid})
       
   395         self.commit()
       
   396         rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'})
       
   397         self.assertEquals(rset.rows, [[self.session.user.eid]])
       
   398         
       
   399         
       
   400 class DBInitTC(RepositoryBasedTC):
       
   401     
       
   402     def test_versions_inserted(self):
       
   403         inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')]
       
   404         self.assertEquals(inserted,
       
   405                           [u'system.version.ebasket', u'system.version.eclassfolders',
       
   406                            u'system.version.eclasstags', u'system.version.ecomment',
       
   407                            u'system.version.eemail', u'system.version.efile',
       
   408                            u'system.version.cubicweb'])
       
   409 
       
   410         
       
   411 class InlineRelHooksTC(RepositoryBasedTC):
       
   412     """test relation hooks are called for inlined relations
       
   413     """
       
   414     def setUp(self):
       
   415         RepositoryBasedTC.setUp(self)
       
   416         self.hm = self.repo.hm
       
   417         self.called = []
       
   418     
       
   419     def _before_relation_hook(self, pool, fromeid, rtype, toeid):
       
   420         self.called.append((fromeid, rtype, toeid))
       
   421 
       
   422     def _after_relation_hook(self, pool, fromeid, rtype, toeid):
       
   423         self.called.append((fromeid, rtype, toeid))
       
   424         
       
   425     def test_before_add_inline_relation(self):
       
   426         """make sure before_<event>_relation hooks are called directly"""
       
   427         self.hm.register_hook(self._before_relation_hook,
       
   428                              'before_add_relation', 'ecrit_par')
       
   429         eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
       
   430         eidn = self.execute('INSERT Note X: X type "T"')[0][0]
       
   431         self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   432         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)])
       
   433         
       
   434     def test_after_add_inline_relation(self):
       
   435         """make sure after_<event>_relation hooks are deferred"""
       
   436         self.hm.register_hook(self._after_relation_hook,
       
   437                              'after_add_relation', 'ecrit_par')
       
   438         eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
       
   439         eidn = self.execute('INSERT Note X: X type "T"')[0][0]
       
   440         self.assertEquals(self.called, [])
       
   441         self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   442         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)])
       
   443         
       
   444     def test_after_add_inline(self):
       
   445         """make sure after_<event>_relation hooks are deferred"""
       
   446         self.hm.register_hook(self._after_relation_hook,
       
   447                              'after_add_relation', 'in_state')
       
   448         eidp = self.execute('INSERT EUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0]
       
   449         eids = self.execute('State X WHERE X name "activated"')[0][0]
       
   450         self.assertEquals(self.called, [(eidp, 'in_state', eids,)])
       
   451     
       
   452     def test_before_delete_inline_relation(self):
       
   453         """make sure before_<event>_relation hooks are called directly"""
       
   454         self.hm.register_hook(self._before_relation_hook,
       
   455                              'before_delete_relation', 'ecrit_par')
       
   456         eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
       
   457         eidn = self.execute('INSERT Note X: X type "T"')[0][0]
       
   458         self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   459         self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   460         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)])
       
   461         rset = self.execute('Any Y where N ecrit_par Y, N type "T", Y nom "toto"')
       
   462         # make sure the relation is really deleted
       
   463         self.failUnless(len(rset) == 0, "failed to delete inline relation")
       
   464 
       
   465     def test_after_delete_inline_relation(self):
       
   466         """make sure after_<event>_relation hooks are deferred"""
       
   467         self.hm.register_hook(self._after_relation_hook,
       
   468                              'after_delete_relation', 'ecrit_par')
       
   469         eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0]
       
   470         eidn = self.execute('INSERT Note X: X type "T"')[0][0]
       
   471         self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   472         self.assertEquals(self.called, [])
       
   473         self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"')
       
   474         self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)])
       
   475 
       
   476     
       
   477 if __name__ == '__main__':
       
   478     unittest_main()