62 (u'Int',), |
62 (u'Int',), |
63 (u'Interval',), (u'Password',), |
63 (u'Interval',), (u'Password',), |
64 (u'String',), (u'Time',)]) |
64 (u'String',), (u'Time',)]) |
65 finally: |
65 finally: |
66 self.repo._free_pool(pool) |
66 self.repo._free_pool(pool) |
67 |
67 |
68 def test_schema_has_owner(self): |
68 def test_schema_has_owner(self): |
69 repo = self.repo |
69 repo = self.repo |
70 cnxid = repo.connect(*self.default_user_password()) |
70 cnxid = repo.connect(*self.default_user_password()) |
71 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
71 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
72 self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) |
72 self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) |
73 self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) |
73 self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) |
74 self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U')) |
74 self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U')) |
75 self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U')) |
75 self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U')) |
76 self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) |
76 self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) |
77 |
77 |
78 def test_connect(self): |
78 def test_connect(self): |
79 login, passwd = self.default_user_password() |
79 login, passwd = self.default_user_password() |
80 self.assert_(self.repo.connect(login, passwd)) |
80 self.assert_(self.repo.connect(login, passwd)) |
81 self.assertRaises(AuthenticationError, |
81 self.assertRaises(AuthenticationError, |
82 self.repo.connect, login, 'nimportnawak') |
82 self.repo.connect, login, 'nimportnawak') |
83 self.assertRaises(AuthenticationError, |
83 self.assertRaises(AuthenticationError, |
84 self.repo.connect, login, None) |
84 self.repo.connect, login, None) |
85 self.assertRaises(AuthenticationError, |
85 self.assertRaises(AuthenticationError, |
86 self.repo.connect, None, None) |
86 self.repo.connect, None, None) |
87 |
87 |
88 def test_execute(self): |
88 def test_execute(self): |
89 repo = self.repo |
89 repo = self.repo |
90 cnxid = repo.connect(*self.default_user_password()) |
90 cnxid = repo.connect(*self.default_user_password()) |
91 repo.execute(cnxid, 'Any X') |
91 repo.execute(cnxid, 'Any X') |
92 repo.execute(cnxid, 'Any X where X is Personne') |
92 repo.execute(cnxid, 'Any X where X is Personne') |
93 repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') |
93 repo.execute(cnxid, 'Any X where X is Personne, X nom ~= "to"') |
94 repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) |
94 repo.execute(cnxid, 'Any X WHERE X has_text %(text)s', {'text': u'\xe7a'}) |
95 repo.close(cnxid) |
95 repo.close(cnxid) |
96 |
96 |
97 def test_login_upassword_accent(self): |
97 def test_login_upassword_accent(self): |
98 repo = self.repo |
98 repo = self.repo |
99 cnxid = repo.connect(*self.default_user_password()) |
99 cnxid = repo.connect(*self.default_user_password()) |
100 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', |
100 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', |
101 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
101 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
102 repo.commit(cnxid) |
102 repo.commit(cnxid) |
103 repo.close(cnxid) |
103 repo.close(cnxid) |
104 self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) |
104 self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) |
105 |
105 |
106 def test_invalid_entity_rollback(self): |
106 def test_invalid_entity_rollback(self): |
107 repo = self.repo |
107 repo = self.repo |
108 cnxid = repo.connect(*self.default_user_password()) |
108 cnxid = repo.connect(*self.default_user_password()) |
|
109 # no group |
109 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', |
110 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', |
110 {'login': u"tutetute", 'passwd': 'tutetute'}) |
111 {'login': u"tutetute", 'passwd': 'tutetute'}) |
111 self.assertRaises(ValidationError, repo.commit, cnxid) |
112 self.assertRaises(ValidationError, repo.commit, cnxid) |
112 rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"') |
113 rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"') |
113 self.assertEquals(rset.rowcount, 0) |
114 self.assertEquals(rset.rowcount, 0) |
114 |
115 |
115 def test_close(self): |
116 def test_close(self): |
116 repo = self.repo |
117 repo = self.repo |
117 cnxid = repo.connect(*self.default_user_password()) |
118 cnxid = repo.connect(*self.default_user_password()) |
118 self.assert_(cnxid) |
119 self.assert_(cnxid) |
119 repo.close(cnxid) |
120 repo.close(cnxid) |
120 self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') |
121 self.assertRaises(BadConnectionId, repo.execute, cnxid, 'Any X') |
121 |
122 |
122 def test_invalid_cnxid(self): |
123 def test_invalid_cnxid(self): |
123 self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') |
124 self.assertRaises(BadConnectionId, self.repo.execute, 0, 'Any X') |
124 self.assertRaises(BadConnectionId, self.repo.close, None) |
125 self.assertRaises(BadConnectionId, self.repo.close, None) |
125 |
126 |
126 def test_shared_data(self): |
127 def test_shared_data(self): |
127 repo = self.repo |
128 repo = self.repo |
128 cnxid = repo.connect(*self.default_user_password()) |
129 cnxid = repo.connect(*self.default_user_password()) |
129 repo.set_shared_data(cnxid, 'data', 4) |
130 repo.set_shared_data(cnxid, 'data', 4) |
130 cnxid2 = repo.connect(*self.default_user_password()) |
131 cnxid2 = repo.connect(*self.default_user_password()) |
192 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
193 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
193 self.assertEquals(len(rset), 2) |
194 self.assertEquals(len(rset), 2) |
194 repo.rollback(cnxid) |
195 repo.rollback(cnxid) |
195 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
196 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': ueid}) |
196 self.assertEquals(len(rset), 1) |
197 self.assertEquals(len(rset), 1) |
197 |
198 |
198 def test_transaction_interleaved(self): |
199 def test_transaction_interleaved(self): |
199 self.skip('implement me') |
200 self.skip('implement me') |
200 |
201 |
201 def test_initial_schema(self): |
202 def test_initial_schema(self): |
202 schema = self.repo.schema |
203 schema = self.repo.schema |
203 # check order of attributes is respected |
204 # check order of attributes is respected |
204 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
205 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
205 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
206 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
206 'creation_date', 'modification_date', |
207 'creation_date', 'modification_date', |
207 'owned_by', 'created_by')], |
208 'owned_by', 'created_by')], |
208 ['relation_type', 'from_entity', 'to_entity', 'constrained_by', |
209 ['relation_type', 'from_entity', 'to_entity', 'constrained_by', |
209 'cardinality', 'ordernum', |
210 'cardinality', 'ordernum', |
210 'indexed', 'fulltextindexed', 'internationalizable', |
211 'indexed', 'fulltextindexed', 'internationalizable', |
211 'defaultval', 'description_format', 'description']) |
212 'defaultval', 'description_format', 'description']) |
212 |
213 |
213 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
214 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
214 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
215 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
299 repo.get_shared_data(cnxid, 'whatever', pop=True) |
300 repo.get_shared_data(cnxid, 'whatever', pop=True) |
300 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
301 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
301 repo.close(cnxid) |
302 repo.close(cnxid) |
302 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
303 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
303 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
304 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
304 |
305 |
305 |
306 |
306 class DataHelpersTC(RepositoryBasedTC): |
307 class DataHelpersTC(RepositoryBasedTC): |
307 |
308 |
308 def setUp(self): |
309 def setUp(self): |
309 """ called before each test from this class """ |
310 """ called before each test from this class """ |
310 cnxid = self.repo.connect(*self.default_user_password()) |
311 cnxid = self.repo.connect(*self.default_user_password()) |
311 self.session = self.repo._sessions[cnxid] |
312 self.session = self.repo._sessions[cnxid] |
312 self.session.set_pool() |
313 self.session.set_pool() |
313 |
314 |
314 def tearDown(self): |
315 def tearDown(self): |
315 self.session.rollback() |
316 self.session.rollback() |
316 |
317 |
317 def test_create_eid(self): |
318 def test_create_eid(self): |
318 self.assert_(self.repo.system_source.create_eid(self.session)) |
319 self.assert_(self.repo.system_source.create_eid(self.session)) |
319 |
320 |
320 def test_source_from_eid(self): |
321 def test_source_from_eid(self): |
321 self.assertEquals(self.repo.source_from_eid(1, self.session), |
322 self.assertEquals(self.repo.source_from_eid(1, self.session), |
324 def test_source_from_eid_raise(self): |
325 def test_source_from_eid_raise(self): |
325 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
326 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
326 |
327 |
327 def test_type_from_eid(self): |
328 def test_type_from_eid(self): |
328 self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') |
329 self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') |
329 |
330 |
330 def test_type_from_eid_raise(self): |
331 def test_type_from_eid_raise(self): |
331 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
332 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
332 |
333 |
333 def test_add_delete_info(self): |
334 def test_add_delete_info(self): |
334 entity = self.repo.vreg.etype_class('Personne')(self.session, None, None) |
335 entity = self.repo.vreg.etype_class('Personne')(self.session, None, None) |
335 entity.eid = -1 |
336 entity.eid = -1 |
336 entity.complete = lambda x: None |
337 entity.complete = lambda x: None |
337 self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system']) |
338 self.repo.add_info(self.session, entity, self.repo.sources_by_uri['system']) |
398 eid = self.add_entity('EmailAddress', address=u'tutu@logilab.fr').eid |
399 eid = self.add_entity('EmailAddress', address=u'tutu@logilab.fr').eid |
399 self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid}) |
400 self.execute('SET X use_email Y WHERE X login "admin", Y eid %(y)s', {'y': eid}) |
400 self.commit() |
401 self.commit() |
401 rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
402 rset = self.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
402 self.assertEquals(rset.rows, [[self.session.user.eid]]) |
403 self.assertEquals(rset.rows, [[self.session.user.eid]]) |
403 |
404 |
404 |
405 |
405 class DBInitTC(RepositoryBasedTC): |
406 class DBInitTC(RepositoryBasedTC): |
406 |
407 |
407 def test_versions_inserted(self): |
408 def test_versions_inserted(self): |
408 inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] |
409 inserted = [r[0] for r in self.execute('Any K ORDERBY K WHERE P pkey K, P pkey ~= "system.version.%"')] |
409 self.assertEquals(inserted, |
410 self.assertEquals(inserted, |
410 [u'system.version.basket', u'system.version.comment', |
411 [u'system.version.basket', u'system.version.card', u'system.version.comment', |
411 u'system.version.cubicweb', u'system.version.email', |
412 u'system.version.cubicweb', u'system.version.email', |
412 u'system.version.file', u'system.version.folder', |
413 u'system.version.file', u'system.version.folder', |
413 u'system.version.tag']) |
414 u'system.version.tag']) |
414 |
415 |
415 class InlineRelHooksTC(RepositoryBasedTC): |
416 class InlineRelHooksTC(RepositoryBasedTC): |
416 """test relation hooks are called for inlined relations |
417 """test relation hooks are called for inlined relations |
417 """ |
418 """ |
418 def setUp(self): |
419 def setUp(self): |
419 RepositoryBasedTC.setUp(self) |
420 RepositoryBasedTC.setUp(self) |
420 self.hm = self.repo.hm |
421 self.hm = self.repo.hm |
421 self.called = [] |
422 self.called = [] |
422 |
423 |
423 def _before_relation_hook(self, pool, fromeid, rtype, toeid): |
424 def _before_relation_hook(self, pool, fromeid, rtype, toeid): |
424 self.called.append((fromeid, rtype, toeid)) |
425 self.called.append((fromeid, rtype, toeid)) |
425 |
426 |
426 def _after_relation_hook(self, pool, fromeid, rtype, toeid): |
427 def _after_relation_hook(self, pool, fromeid, rtype, toeid): |
427 self.called.append((fromeid, rtype, toeid)) |
428 self.called.append((fromeid, rtype, toeid)) |
428 |
429 |
429 def test_before_add_inline_relation(self): |
430 def test_before_add_inline_relation(self): |
430 """make sure before_<event>_relation hooks are called directly""" |
431 """make sure before_<event>_relation hooks are called directly""" |
431 self.hm.register_hook(self._before_relation_hook, |
432 self.hm.register_hook(self._before_relation_hook, |
432 'before_add_relation', 'ecrit_par') |
433 'before_add_relation', 'ecrit_par') |
433 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
434 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
434 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
435 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
435 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
436 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
436 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)]) |
437 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp)]) |
437 |
438 |
438 def test_after_add_inline_relation(self): |
439 def test_after_add_inline_relation(self): |
439 """make sure after_<event>_relation hooks are deferred""" |
440 """make sure after_<event>_relation hooks are deferred""" |
440 self.hm.register_hook(self._after_relation_hook, |
441 self.hm.register_hook(self._after_relation_hook, |
441 'after_add_relation', 'ecrit_par') |
442 'after_add_relation', 'ecrit_par') |
442 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
443 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
443 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
444 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
444 self.assertEquals(self.called, []) |
445 self.assertEquals(self.called, []) |
445 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
446 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
446 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)]) |
447 self.assertEquals(self.called, [(eidn, 'ecrit_par', eidp,)]) |
447 |
448 |
448 def test_after_add_inline(self): |
449 def test_after_add_inline(self): |
449 """make sure after_<event>_relation hooks are deferred""" |
450 """make sure after_<event>_relation hooks are deferred""" |
450 self.hm.register_hook(self._after_relation_hook, |
451 self.hm.register_hook(self._after_relation_hook, |
451 'after_add_relation', 'in_state') |
452 'after_add_relation', 'in_state') |
452 eidp = self.execute('INSERT CWUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0] |
453 eidp = self.execute('INSERT CWUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0] |
453 eids = self.execute('State X WHERE X name "activated"')[0][0] |
454 eids = self.execute('State X WHERE X name "activated"')[0][0] |
454 self.assertEquals(self.called, [(eidp, 'in_state', eids,)]) |
455 self.assertEquals(self.called, [(eidp, 'in_state', eids,)]) |
455 |
456 |
456 def test_before_delete_inline_relation(self): |
457 def test_before_delete_inline_relation(self): |
457 """make sure before_<event>_relation hooks are called directly""" |
458 """make sure before_<event>_relation hooks are called directly""" |
458 self.hm.register_hook(self._before_relation_hook, |
459 self.hm.register_hook(self._before_relation_hook, |
459 'before_delete_relation', 'ecrit_par') |
460 'before_delete_relation', 'ecrit_par') |
460 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
461 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |