44 def test_fill_schema(self): |
44 def test_fill_schema(self): |
45 self.repo.schema = CubicWebSchema(self.repo.config.appid) |
45 self.repo.schema = CubicWebSchema(self.repo.config.appid) |
46 self.repo.config._cubes = None # avoid assertion error |
46 self.repo.config._cubes = None # avoid assertion error |
47 self.repo.fill_schema() |
47 self.repo.fill_schema() |
48 pool = self.repo._get_pool() |
48 pool = self.repo._get_pool() |
49 table = SQL_PREFIX + 'EEType' |
49 table = SQL_PREFIX + 'CWEType' |
50 namecol = SQL_PREFIX + 'name' |
50 namecol = SQL_PREFIX + 'name' |
51 finalcol = SQL_PREFIX + 'final' |
51 finalcol = SQL_PREFIX + 'final' |
52 try: |
52 try: |
53 sqlcursor = pool['system'] |
53 sqlcursor = pool['system'] |
54 sqlcursor.execute('SELECT %s FROM %s WHERE %s is NULL' % ( |
54 sqlcursor.execute('SELECT %s FROM %s WHERE %s is NULL' % ( |
66 self.repo._free_pool(pool) |
66 self.repo._free_pool(pool) |
67 |
67 |
68 def test_schema_has_owner(self): |
68 def test_schema_has_owner(self): |
69 repo = self.repo |
69 repo = self.repo |
70 cnxid = repo.connect(*self.default_user_password()) |
70 cnxid = repo.connect(*self.default_user_password()) |
71 self.failIf(repo.execute(cnxid, 'EEType X WHERE NOT X owned_by U')) |
71 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
72 self.failIf(repo.execute(cnxid, 'ERType X WHERE NOT X owned_by U')) |
72 self.failIf(repo.execute(cnxid, 'CWRType X WHERE NOT X owned_by U')) |
73 self.failIf(repo.execute(cnxid, 'EFRDef X WHERE NOT X owned_by U')) |
73 self.failIf(repo.execute(cnxid, 'CWAttribute X WHERE NOT X owned_by U')) |
74 self.failIf(repo.execute(cnxid, 'ENFRDef X WHERE NOT X owned_by U')) |
74 self.failIf(repo.execute(cnxid, 'CWRelation X WHERE NOT X owned_by U')) |
75 self.failIf(repo.execute(cnxid, 'EConstraint X WHERE NOT X owned_by U')) |
75 self.failIf(repo.execute(cnxid, 'CWConstraint X WHERE NOT X owned_by U')) |
76 self.failIf(repo.execute(cnxid, 'EConstraintType X WHERE NOT X owned_by U')) |
76 self.failIf(repo.execute(cnxid, 'CWConstraintType X WHERE NOT X owned_by U')) |
77 |
77 |
78 def test_connect(self): |
78 def test_connect(self): |
79 login, passwd = self.default_user_password() |
79 login, passwd = self.default_user_password() |
80 self.assert_(self.repo.connect(login, passwd)) |
80 self.assert_(self.repo.connect(login, passwd)) |
81 self.assertRaises(AuthenticationError, |
81 self.assertRaises(AuthenticationError, |
95 repo.close(cnxid) |
95 repo.close(cnxid) |
96 |
96 |
97 def test_login_upassword_accent(self): |
97 def test_login_upassword_accent(self): |
98 repo = self.repo |
98 repo = self.repo |
99 cnxid = repo.connect(*self.default_user_password()) |
99 cnxid = repo.connect(*self.default_user_password()) |
100 repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', |
100 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S, X in_group G WHERE S name "activated", G name "users"', |
101 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
101 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
102 repo.commit(cnxid) |
102 repo.commit(cnxid) |
103 repo.close(cnxid) |
103 repo.close(cnxid) |
104 self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) |
104 self.assert_(repo.connect(u"barnabé", u"héhéhé".encode('UTF8'))) |
105 |
105 |
106 def test_invalid_entity_rollback(self): |
106 def test_invalid_entity_rollback(self): |
107 repo = self.repo |
107 repo = self.repo |
108 cnxid = repo.connect(*self.default_user_password()) |
108 cnxid = repo.connect(*self.default_user_password()) |
109 repo.execute(cnxid, 'INSERT EUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', |
109 repo.execute(cnxid, 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, X in_state S WHERE S name "activated"', |
110 {'login': u"tutetute", 'passwd': 'tutetute'}) |
110 {'login': u"tutetute", 'passwd': 'tutetute'}) |
111 self.assertRaises(ValidationError, repo.commit, cnxid) |
111 self.assertRaises(ValidationError, repo.commit, cnxid) |
112 rset = repo.execute(cnxid, 'EUser X WHERE X login "tutetute"') |
112 rset = repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"') |
113 self.assertEquals(rset.rowcount, 0) |
113 self.assertEquals(rset.rowcount, 0) |
114 |
114 |
115 def test_close(self): |
115 def test_close(self): |
116 repo = self.repo |
116 repo = self.repo |
117 cnxid = repo.connect(*self.default_user_password()) |
117 cnxid = repo.connect(*self.default_user_password()) |
199 self.skip('implement me') |
199 self.skip('implement me') |
200 |
200 |
201 def test_initial_schema(self): |
201 def test_initial_schema(self): |
202 schema = self.repo.schema |
202 schema = self.repo.schema |
203 # check order of attributes is respected |
203 # check order of attributes is respected |
204 self.assertListEquals([r.type for r in schema.eschema('EFRDef').ordered_relations() |
204 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
205 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
205 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
206 'creation_date', 'modification_date', |
206 'creation_date', 'modification_date', |
207 'owned_by', 'created_by')], |
207 'owned_by', 'created_by')], |
208 ['relation_type', 'from_entity', 'to_entity', 'constrained_by', |
208 ['relation_type', 'from_entity', 'to_entity', 'constrained_by', |
209 'cardinality', 'ordernum', |
209 'cardinality', 'ordernum', |
210 'indexed', 'fulltextindexed', 'internationalizable', |
210 'indexed', 'fulltextindexed', 'internationalizable', |
211 'defaultval', 'description_format', 'description']) |
211 'defaultval', 'description_format', 'description']) |
212 |
212 |
213 self.assertEquals(schema.eschema('EEType').main_attribute(), 'name') |
213 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
214 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
214 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
215 |
215 |
216 constraints = schema.rschema('name').rproperty('EEType', 'String', 'constraints') |
216 constraints = schema.rschema('name').rproperty('CWEType', 'String', 'constraints') |
217 self.assertEquals(len(constraints), 2) |
217 self.assertEquals(len(constraints), 2) |
218 for cstr in constraints[:]: |
218 for cstr in constraints[:]: |
219 if isinstance(cstr, UniqueConstraint): |
219 if isinstance(cstr, UniqueConstraint): |
220 constraints.remove(cstr) |
220 constraints.remove(cstr) |
221 break |
221 break |
223 self.fail('unique constraint not found') |
223 self.fail('unique constraint not found') |
224 sizeconstraint = constraints[0] |
224 sizeconstraint = constraints[0] |
225 self.assertEquals(sizeconstraint.min, None) |
225 self.assertEquals(sizeconstraint.min, None) |
226 self.assertEquals(sizeconstraint.max, 64) |
226 self.assertEquals(sizeconstraint.max, 64) |
227 |
227 |
228 constraints = schema.rschema('relation_type').rproperty('EFRDef', 'ERType', 'constraints') |
228 constraints = schema.rschema('relation_type').rproperty('CWAttribute', 'CWRType', 'constraints') |
229 self.assertEquals(len(constraints), 1) |
229 self.assertEquals(len(constraints), 1) |
230 cstr = constraints[0] |
230 cstr = constraints[0] |
231 self.assert_(isinstance(cstr, RQLConstraint)) |
231 self.assert_(isinstance(cstr, RQLConstraint)) |
232 self.assertEquals(cstr.restriction, 'O final TRUE') |
232 self.assertEquals(cstr.restriction, 'O final TRUE') |
233 |
233 |
234 ownedby = schema.rschema('owned_by') |
234 ownedby = schema.rschema('owned_by') |
235 self.assertEquals(ownedby.objects('EEType'), ('EUser',)) |
235 self.assertEquals(ownedby.objects('CWEType'), ('CWUser',)) |
236 |
236 |
237 def test_pyro(self): |
237 def test_pyro(self): |
238 import Pyro |
238 import Pyro |
239 Pyro.config.PYRO_MULTITHREADED = 0 |
239 Pyro.config.PYRO_MULTITHREADED = 0 |
240 lock = threading.Lock() |
240 lock = threading.Lock() |
265 |
265 |
266 def test_internal_api(self): |
266 def test_internal_api(self): |
267 repo = self.repo |
267 repo = self.repo |
268 cnxid = repo.connect(*self.default_user_password()) |
268 cnxid = repo.connect(*self.default_user_password()) |
269 session = repo._get_session(cnxid, setpool=True) |
269 session = repo._get_session(cnxid, setpool=True) |
270 self.assertEquals(repo.type_and_source_from_eid(1, session), ('EGroup', 'system', None)) |
270 self.assertEquals(repo.type_and_source_from_eid(1, session), ('CWGroup', 'system', None)) |
271 self.assertEquals(repo.type_from_eid(1, session), 'EGroup') |
271 self.assertEquals(repo.type_from_eid(1, session), 'CWGroup') |
272 self.assertEquals(repo.source_from_eid(1, session).uri, 'system') |
272 self.assertEquals(repo.source_from_eid(1, session).uri, 'system') |
273 self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None) |
273 self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None) |
274 class dummysource: uri = 'toto' |
274 class dummysource: uri = 'toto' |
275 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
275 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
276 |
276 |
277 def test_public_api(self): |
277 def test_public_api(self): |
278 self.assertEquals(self.repo.get_schema(), self.repo.schema) |
278 self.assertEquals(self.repo.get_schema(), self.repo.schema) |
279 self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
279 self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
280 # .properties() return a result set |
280 # .properties() return a result set |
281 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is EProperty,P pkey K, P value V, NOT P for_user U') |
281 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
282 |
282 |
283 def test_session_api(self): |
283 def test_session_api(self): |
284 repo = self.repo |
284 repo = self.repo |
285 cnxid = repo.connect(*self.default_user_password()) |
285 cnxid = repo.connect(*self.default_user_password()) |
286 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
286 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
287 self.assertEquals(repo.describe(cnxid, 1), (u'EGroup', u'system', None)) |
287 self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
288 repo.close(cnxid) |
288 repo.close(cnxid) |
289 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
289 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
290 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
290 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
291 |
291 |
292 def test_shared_data_api(self): |
292 def test_shared_data_api(self): |
323 |
323 |
324 def test_source_from_eid_raise(self): |
324 def test_source_from_eid_raise(self): |
325 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
325 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
326 |
326 |
327 def test_type_from_eid(self): |
327 def test_type_from_eid(self): |
328 self.assertEquals(self.repo.type_from_eid(1, self.session), 'EGroup') |
328 self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') |
329 |
329 |
330 def test_type_from_eid_raise(self): |
330 def test_type_from_eid_raise(self): |
331 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
331 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
332 |
332 |
333 def test_add_delete_info(self): |
333 def test_add_delete_info(self): |
447 |
447 |
448 def test_after_add_inline(self): |
448 def test_after_add_inline(self): |
449 """make sure after_<event>_relation hooks are deferred""" |
449 """make sure after_<event>_relation hooks are deferred""" |
450 self.hm.register_hook(self._after_relation_hook, |
450 self.hm.register_hook(self._after_relation_hook, |
451 'after_add_relation', 'in_state') |
451 'after_add_relation', 'in_state') |
452 eidp = self.execute('INSERT EUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0] |
452 eidp = self.execute('INSERT CWUser X: X login "toto", X upassword "tutu", X in_state S WHERE S name "activated"')[0][0] |
453 eids = self.execute('State X WHERE X name "activated"')[0][0] |
453 eids = self.execute('State X WHERE X name "activated"')[0][0] |
454 self.assertEquals(self.called, [(eidp, 'in_state', eids,)]) |
454 self.assertEquals(self.called, [(eidp, 'in_state', eids,)]) |
455 |
455 |
456 def test_before_delete_inline_relation(self): |
456 def test_before_delete_inline_relation(self): |
457 """make sure before_<event>_relation hooks are called directly""" |
457 """make sure before_<event>_relation hooks are called directly""" |