|
1 # -*- coding: iso-8859-1 -*- |
|
2 """unit tests for module cubicweb.server.repository""" |
|
3 |
|
4 import os |
|
5 import sys |
|
6 import threading |
|
7 import time |
|
8 from copy import deepcopy |
|
9 |
|
10 from mx.DateTime import DateTimeType, now |
|
11 from logilab.common.testlib import TestCase, unittest_main |
|
12 from cubicweb.devtools.apptest import RepositoryBasedTC |
|
13 from cubicweb.devtools.repotest import tuplify |
|
14 |
|
15 from yams.constraints import UniqueConstraint |
|
16 |
|
17 from cubicweb import BadConnectionId, RepositoryError, ValidationError, UnknownEid, AuthenticationError |
|
18 from cubicweb.schema import CubicWebSchema, RQLConstraint |
|
19 from cubicweb.dbapi import connect, repo_connect |
|
20 |
|
21 from cubicweb.server import repository |
|
22 |
|
23 |
|
24 # start name server anyway, process will fail if already running |
|
25 os.system('pyro-ns >/dev/null 2>/dev/null &') |
|
26 |
|
27 |
|
28 class RepositoryTC(RepositoryBasedTC): |
|
29 """ singleton providing access to a persistent storage for entities |
|
30 and relation |
|
31 """ |
|
32 |
|
33 # def setUp(self): |
|
34 # pass |
|
35 |
|
36 # def tearDown(self): |
|
37 # self.repo.config.db_perms = True |
|
38 # cnxid = self.repo.connect(*self.default_user_password()) |
|
39 # for etype in ('Affaire', 'Note', 'Societe', 'Personne'): |
|
40 # self.repo.execute(cnxid, 'DELETE %s X' % etype) |
|
41 # self.repo.commit(cnxid) |
|
42 # self.repo.close(cnxid) |
|
43 |
|
44 def test_fill_schema(self): |
|
45 self.repo.schema = CubicWebSchema(self.repo.config.appid) |
|
46 self.repo.config._cubes = None # avoid assertion error |
|
47 self.repo.fill_schema() |
|
48 pool = self.repo._get_pool() |
|
49 try: |
|
50 sqlcursor = pool['system'] |
|
51 sqlcursor.execute('SELECT name FROM EEType WHERE final is NULL') |
|
52 self.assertEquals(sqlcursor.fetchall(), []) |
|
53 sqlcursor.execute('SELECT name FROM EEType WHERE final=%(final)s ORDER BY name', {'final': 'TRUE'}) |
|
54 self.assertEquals(sqlcursor.fetchall(), [(u'Boolean',), (u'Bytes',), |
|
55 (u'Date',), (u'Datetime',), |
|
56 (u'Decimal',),(u'Float',), |
|
57 (u'Int',), |
|
58 (u'Interval',), (u'Password',), |
|
59 (u'String',), (u'Time',)]) |
|
60 finally: |
|
61 self.repo._free_pool(pool) |
|
62 |
|
63 def test_schema_has_owner(self): |
|
64 repo = self.repo |
|
65 cnxid = repo.connect(*self.default_user_password()) |
|
66 self.failIf(repo.execute(cnxid, 'EEType X WHERE NOT X owned_by U')) |
|
67 self.failIf(repo.execute(cnxid, 'ERType X WHERE NOT X owned_by U')) |
|
68 self.failIf(repo.execute(cnxid, 'EFRDef X WHERE NOT X owned_by U')) |
|
69 self.failIf(repo.execute(cnxid, 'ENFRDef X WHERE NOT X owned_by U')) |
|
70 self.failIf(repo.execute(cnxid, 'EConstraint X WHERE NOT X owned_by U')) |
|
71 self.failIf(repo.execute(cnxid, 'EConstraintType X WHERE NOT X owned_by U')) |
|
72 |
|
73 def test_connect(self): |
|
74 login, passwd = self.default_user_password() |
|
75 self.assert_(self.repo.connect(login, passwd)) |
|
76 self.assertRaises(AuthenticationError, |
|
77 self.repo.connect, login, 'nimportnawak') |
|
78 self.assertRaises(AuthenticationError, |
|
79 self.repo.connect, login, None) |
|
80 self.assertRaises(AuthenticationError, |
|
81 self.repo.connect, None, None) |
|
82 |
|
83 def test_execute(self): |
|
84 repo = self.repo |
|
85 cnxid = repo.connect(*self.default_user_password()) |
|
86 repo.execute(cnxid, 'Any X') |
|
87 repo.execute(cnxid, 'Any X where X is Personne') |
|
88 repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') |
|
89 repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) |
|
90 repo.close(cnxid) |
|
91 |
|
92 def test_login_upassword_accent(self): |
|
93 repo = self.repo |
|
94 cnxid = repo.connect(*self.default_user_password()) |
|
95 repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', |
|
96 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
|
97 repo.commit(cnxid) |
|
98 repo.close(cnxid) |
|
99 self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) |
|
100 |
|
101 def test_invalid_entity_rollback(self): |
|
102 repo = self.repo |
|
103 cnxid = repo.connect(*self.default_user_password()) |
|
104 repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', |
|
105 {'login': u"tutetute", 'passwd': 'tutetute'}) |
|
106 self.assertRaises(ValidationError, repo.commit, cnxid) |
|
107 rset = repo.execute(cnxid, 'EUser X WHERE X login "tutetute"') |
|
108 self.assertEquals(rset.rowcount, 0) |
|
109 |
|
110 def test_close(self): |
|
111 repo = self.repo |
|
112 cnxid = repo.connect(*self.default_user_password()) |
|
113 self.assert_(cnxid) |
|
114 repo.close(cnxid) |
|
115 self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') |
|
116 |
|
117 def test_invalid_cnxid(self): |
|
118 self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') |
|
119 self.assertRaises(BadConnectionId, self.repo.close, None) |
|
120 |
|
121 def test_shared_data(self): |
|
122 repo = self.repo |
|
123 cnxid = repo.connect(*self.default_user_password()) |
|
124 repo.set_shared_data(cnxid, 'data', 4) |
|
125 cnxid2 = repo.connect(*self.default_user_password()) |
|
126 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
|
127 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
|
128 repo.set_shared_data(cnxid2, 'data', 5) |
|
129 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
|
130 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5) |
|
131 repo.get_shared_data(cnxid2, 'data', pop=True) |
|
132 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
|
133 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
|
134 repo.close(cnxid) |
|
135 repo.close(cnxid2) |
|
136 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
|
137 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') |
|
138 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
|
139 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) |
|
140 |
|
141 def test_check_session(self): |
|
142 repo = self.repo |
|
143 cnxid = repo.connect(*self.default_user_password()) |
|
144 self.assertEquals(repo.check_session(cnxid), None) |
|
145 repo.close(cnxid) |
|
146 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
|
147 |
|
148 def test_transaction_base(self): |
|
149 repo = self.repo |
|
150 cnxid = repo.connect(*self.default_user_password()) |
|
151 # check db state |
|
152 result = repo.execute(cnxid, 'Personne X') |
|
153 self.assertEquals(result.rowcount, 0) |
|
154 # rollback entity insertion |
|
155 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
|
156 result = repo.execute(cnxid, 'Personne X') |
|
157 self.assertEquals(result.rowcount, 1) |
|
158 repo.rollback(cnxid) |
|
159 result = repo.execute(cnxid, 'Personne X') |
|
160 self.assertEquals(result.rowcount, 0, result.rows) |
|
161 # commit |
|
162 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
|
163 repo.commit(cnxid) |
|
164 result = repo.execute(cnxid, 'Personne X') |
|
165 self.assertEquals(result.rowcount, 1) |
|
166 |
|
167 def test_transaction_base2(self): |
|
168 repo = self.repo |
|
169 cnxid = repo.connect(*self.default_user_password()) |
|
170 # rollback relation insertion |
|
171 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
|
172 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
|
173 self.assertEquals(result.rowcount, 1) |
|
174 repo.rollback(cnxid) |
|
175 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
|
176 self.assertEquals(result.rowcount, 0, result.rows) |
|
177 |
|
178 def test_transaction_base3(self): |
|
179 repo = self.repo |
|
180 cnxid = repo.connect(*self.default_user_password()) |
|
181 # rollback state change which trigger TrInfo insertion |
|
182 ueid = repo._get_session(cnxid).user.eid |
|
183 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
|
184 self.assertEquals(len(rset), 1) |
|
185 repo.execute(cnxid, 'SET X in_state S WHERE X eid %(x)s, S name "deactivated"', |
|
186 {'x': ueid}, 'x') |
|
187 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
|
188 self.assertEquals(len(rset), 2) |
|
189 repo.rollback(cnxid) |
|
190 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
|
191 self.assertEquals(len(rset), 1) |
|
192 |
|
193 def test_transaction_interleaved(self): |
|
194 self.skip('implement me') |
|
195 |
|
196 def test_initial_schema(self): |
|
197 schema = self.repo.schema |
|
198 # check order of attributes is respected |
|
199 self.assertListEquals([r.type for r in schema.eschema('EFRDef').ordered_relations() |
|
200 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
|
201 'creation_date', 'modification_date', |
|
202 'owned_by', 'created_by')], |
|
203 ['relation_type', 'from_entity', 'to_entity', 'constrained_by', |
|
204 'cardinality', 'ordernum', |
|
205 'indexed', 'fulltextindexed', 'internationalizable', |
|
206 'defaultval', 'description_format', 'description']) |
|
207 |
|
208 self.assertEquals(schema.eschema('EEType').main_attribute(), 'name') |
|
209 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
|
210 |
|
211 constraints = schema.rschema('name').rproperty('EEType', 'String', 'constraints') |
|
212 self.assertEquals(len(constraints), 2) |
|
213 for cstr in constraints[:]: |
|
214 if isinstance(cstr, UniqueConstraint): |
|
215 constraints.remove(cstr) |
|
216 break |
|
217 else: |
|
218 self.fail('unique constraint not found') |
|
219 sizeconstraint = constraints[0] |
|
220 self.assertEquals(sizeconstraint.min, None) |
|
221 self.assertEquals(sizeconstraint.max, 64) |
|
222 |
|
223 constraints = schema.rschema('relation_type').rproperty('EFRDef', 'ERType', 'constraints') |
|
224 self.assertEquals(len(constraints), 1) |
|
225 cstr = constraints[0] |
|
226 self.assert_(isinstance(cstr, RQLConstraint)) |
|
227 self.assertEquals(cstr.restriction, 'O final TRUE') |
|
228 |
|
229 ownedby = schema.rschema('owned_by') |
|
230 self.assertEquals(ownedby.objects('EEType'), ('EUser',)) |
|
231 |
|
232 def test_pyro(self): |
|
233 import Pyro |
|
234 Pyro.config.PYRO_MULTITHREADED = 0 |
|
235 lock = threading.Lock() |
|
236 # the client part has to be in the thread due to sqlite limitations |
|
237 t = threading.Thread(target=self._pyro_client, args=(lock,)) |
|
238 try: |
|
239 daemon = self.repo.pyro_register() |
|
240 t.start() |
|
241 # connection |
|
242 daemon.handleRequests(1.0) |
|
243 daemon.handleRequests(1.0) |
|
244 daemon.handleRequests(1.0) |
|
245 # get schema |
|
246 daemon.handleRequests(1.0) |
|
247 # execute |
|
248 daemon.handleRequests(1.0) |
|
249 t.join() |
|
250 finally: |
|
251 repository.pyro_unregister(self.repo.config) |
|
252 |
|
253 def _pyro_client(self, lock): |
|
254 cnx = connect(self.repo.config.appid, u'admin', 'gingkow') |
|
255 # check we can get the schema |
|
256 schema = cnx.get_schema() |
|
257 self.assertEquals(schema.__hashmode__, None) |
|
258 rset = cnx.cursor().execute('Any U,G WHERE U in_group G') |
|
259 |
|
260 |
|
261 def test_internal_api(self): |
|
262 repo = self.repo |
|
263 cnxid = repo.connect(*self.default_user_password()) |
|
264 session = repo._get_session(cnxid, setpool=True) |
|
265 self.assertEquals(repo.type_and_source_from_eid(1, session), ('EGroup', 'system', None)) |
|
266 self.assertEquals(repo.type_from_eid(1, session), 'EGroup') |
|
267 self.assertEquals(repo.source_from_eid(1, session).uri, 'system') |
|
268 self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None) |
|
269 class dummysource: uri = 'toto' |
|
270 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
|
271 |
|
272 def test_public_api(self): |
|
273 self.assertEquals(self.repo.get_schema(), self.repo.schema) |
|
274 self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
|
275 # .properties() return a result set |
|
276 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is EProperty,P pkey K, P value V, NOT P for_user U') |
|
277 |
|
278 def test_session_api(self): |
|
279 repo = self.repo |
|
280 cnxid = repo.connect(*self.default_user_password()) |
|
281 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
|
282 self.assertEquals(repo.describe(cnxid, 1), (u'EGroup', u'system', None)) |
|
283 repo.close(cnxid) |
|
284 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
|
285 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
|
286 |
|
287 def test_shared_data_api(self): |
|
288 repo = self.repo |
|
289 cnxid = repo.connect(*self.default_user_password()) |
|
290 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
|
291 repo.set_shared_data(cnxid, 'data', 4) |
|
292 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
|
293 repo.get_shared_data(cnxid, 'data', pop=True) |
|
294 repo.get_shared_data(cnxid, 'whatever', pop=True) |
|
295 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
|
296 repo.close(cnxid) |
|
297 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
|
298 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
|
299 |
|
300 |
|
301 class DataHelpersTC(RepositoryBasedTC): |
|
302 |
|
303 def setUp(self): |
|
304 """ called before each test from this class """ |
|
305 cnxid = self.repo.connect(*self.default_user_password()) |
|
306 self.session = self.repo._sessions[cnxid] |
|
307 self.session.set_pool() |
|
308 |
|
309 def tearDown(self): |
|
310 self.session.rollback() |
|
311 |
|
312 def test_create_eid(self): |
|
313 self.assert_(self.repo.system_source.create_eid(self.session)) |
|
314 |
|
315 def test_source_from_eid(self): |
|
316 self.assertEquals(self.repo.source_from_eid(1, self.session), |
|
317 self.repo.sources_by_uri['system']) |
|
318 |
|
319 def test_source_from_eid_raise(self): |
|
320 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
|
321 |
|
322 def test_type_from_eid(self): |
|
323 self.assertEquals(self.repo.type_from_eid(1, self.session), 'EGroup') |
|
324 |
|
325 def test_type_from_eid_raise(self): |
|
326 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
|
327 |
|
328 def test_add_delete_info(self): |
|
329 entity = self.repo.vreg.etype_class('Personne')(self.session, None, None) |
|
330 entity.eid = -1 |
|
331 entity.complete = lambda x: None |
|
332 self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system']) |
|
333 cursor = self.session.pool['system'] |
|
334 cursor.execute('SELECT * FROM entities WHERE eid = -1') |
|
335 data = cursor.fetchall() |
|
336 self.assertIsInstance(data[0][3], DateTimeType) |
|
337 data[0] = list(data[0]) |
|
338 data[0][3] = None |
|
339 self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)]) |
|
340 self.repo.delete_info(self.session, -1) |
|
341 #self.repo.commit() |
|
342 cursor.execute('SELECT * FROM entities WHERE eid = -1') |
|
343 data = cursor.fetchall() |
|
344 self.assertEquals(data, []) |
|
345 |
|
346 |
|
347 class FTITC(RepositoryBasedTC): |
|
348 |
|
349 def test_reindex_and_modified_since(self): |
|
350 cursor = self.session.pool['system'] |
|
351 eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] |
|
352 self.commit() |
|
353 ts = now() |
|
354 self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
|
355 cursor.execute('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) |
|
356 omtime = cursor.fetchone()[0] |
|
357 # our sqlite datetime adapter is ignore seconds fraction, so we have to |
|
358 # ensure update is done the next seconds |
|
359 time.sleep(1 - (ts.second - int(ts.second))) |
|
360 self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}, 'x') |
|
361 self.commit() |
|
362 self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
|
363 cursor.execute('SELECT mtime FROM entities WHERE eid = %s' % eidp) |
|
364 mtime = cursor.fetchone()[0] |
|
365 self.failUnless(omtime < mtime) |
|
366 self.commit() |
|
367 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
|
368 self.assertEquals(modified, [('Personne', eidp)]) |
|
369 self.assertEquals(deleted, []) |
|
370 date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime) |
|
371 self.assertEquals(modified, []) |
|
372 self.assertEquals(deleted, []) |
|
373 self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp}) |
|
374 self.commit() |
|
375 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
|
376 self.assertEquals(modified, []) |
|
377 self.assertEquals(deleted, [('Personne', eidp)]) |
|
378 |
|
379 def test_composite_entity(self): |
|
380 assert self.schema.rschema('use_email').fulltext_container == 'subject' |
|
381 eid = self.add_entity('EmailAddress', address=u'toto@logilab.fr').eid |
|
382 self.commit() |
|
383 rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
|
384 self.assertEquals(rset.rows, [[eid]]) |
|
385 self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid}) |
|
386 self.commit() |
|
387 rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
|
388 self.assertEquals(rset.rows, [[self.session.user.eid]]) |
|
389 self.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid}) |
|
390 self.commit() |
|
391 rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
|
392 self.assertEquals(rset.rows, []) |
|
393 eid = self.add_entity('EmailAddress', address=u'tutu@logilab.fr').eid |
|
394 self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid}) |
|
395 self.commit() |
|
396 rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
|
397 self.assertEquals(rset.rows, [[self.session.user.eid]]) |
|
398 |
|
399 |
|
400 class DBInitTC(RepositoryBasedTC): |
|
401 |
|
402 def test_versions_inserted(self): |
|
403 inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] |
|
404 self.assertEquals(inserted, |
|
405 [u'system.version.ebasket', u'system.version.eclassfolders', |
|
406 u'system.version.eclasstags', u'system.version.ecomment', |
|
407 u'system.version.eemail', u'system.version.efile', |
|
408 u'system.version.cubicweb']) |
|
409 |
|
410 |
|
411 class InlineRelHooksTC(RepositoryBasedTC): |
|
412 """test relation hooks are called for inlined relations |
|
413 """ |
|
414 def setUp(self): |
|
415 RepositoryBasedTC.setUp(self) |
|
416 self.hm = self.repo.hm |
|
417 self.called = [] |
|
418 |
|
419 def _before_relation_hook(self, pool, fromeid, rtype, toeid): |
|
420 self.called.append((fromeid, rtype, toeid)) |
|
421 |
|
422 def _after_relation_hook(self, pool, fromeid, rtype, toeid): |
|
423 self.called.append((fromeid, rtype, toeid)) |
|
424 |
|
425 def test_before_add_inline_relation(self): |
|
426 """make sure before_<event>_relation hooks are called directly""" |
|
427 self.hm.register_hook(self._before_relation_hook, |
|
428 'before_add_relation', 'ecrit_par') |
|
429 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
|
430 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
|
431 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
432 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)]) |
|
433 |
|
434 def test_after_add_inline_relation(self): |
|
435 """make sure after_<event>_relation hooks are deferred""" |
|
436 self.hm.register_hook(self._after_relation_hook, |
|
437 'after_add_relation', 'ecrit_par') |
|
438 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
|
439 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
|
440 self.assertEquals(self.called, []) |
|
441 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
442 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)]) |
|
443 |
|
444 def test_after_add_inline(self): |
|
445 """make sure after_<event>_relation hooks are deferred""" |
|
446 self.hm.register_hook(self._after_relation_hook, |
|
447 'after_add_relation', 'in_state') |
|
448 eidp = self.execute('INSERT EUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0] |
|
449 eids = self.execute('State X WHERE X name "activated"')[0][0] |
|
450 self.assertEquals(self.called, [(eidp, 'in_state', eids,)]) |
|
451 |
|
452 def test_before_delete_inline_relation(self): |
|
453 """make sure before_<event>_relation hooks are called directly""" |
|
454 self.hm.register_hook(self._before_relation_hook, |
|
455 'before_delete_relation', 'ecrit_par') |
|
456 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
|
457 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
|
458 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
459 self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
460 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)]) |
|
461 rset = self.execute('Any Y where N ecrit_par Y, N type "T", Y nom "toto"') |
|
462 # make sure the relation is really deleted |
|
463 self.failUnless(len(rset) == 0, "failed to delete inline relation") |
|
464 |
|
465 def test_after_delete_inline_relation(self): |
|
466 """make sure after_<event>_relation hooks are deferred""" |
|
467 self.hm.register_hook(self._after_relation_hook, |
|
468 'after_delete_relation', 'ecrit_par') |
|
469 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
|
470 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
|
471 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
472 self.assertEquals(self.called, []) |
|
473 self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
474 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)]) |
|
475 |
|
476 |
|
477 if __name__ == '__main__': |
|
478 unittest_main() |