19 from yams.constraints import UniqueConstraint |
19 from yams.constraints import UniqueConstraint |
20 |
20 |
21 from cubicweb import (BadConnectionId, RepositoryError, ValidationError, |
21 from cubicweb import (BadConnectionId, RepositoryError, ValidationError, |
22 UnknownEid, AuthenticationError) |
22 UnknownEid, AuthenticationError) |
23 from cubicweb.schema import CubicWebSchema, RQLConstraint |
23 from cubicweb.schema import CubicWebSchema, RQLConstraint |
24 from cubicweb.dbapi import connect, repo_connect, multiple_connections_unfix |
24 from cubicweb.dbapi import connect, multiple_connections_unfix |
25 from cubicweb.devtools.testlib import CubicWebTC |
25 from cubicweb.devtools.testlib import CubicWebTC |
26 from cubicweb.devtools.repotest import tuplify |
26 from cubicweb.devtools.repotest import tuplify |
27 from cubicweb.server import repository, hook |
27 from cubicweb.server import repository, hook |
28 from cubicweb.server.sqlutils import SQL_PREFIX |
28 from cubicweb.server.sqlutils import SQL_PREFIX |
29 |
29 |
36 """ singleton providing access to a persistent storage for entities |
36 """ singleton providing access to a persistent storage for entities |
37 and relation |
37 and relation |
38 """ |
38 """ |
39 |
39 |
40 def test_fill_schema(self): |
40 def test_fill_schema(self): |
41 self.repo.schema = CubicWebSchema(self.repo.config.appid) |
41 origshema = self.repo.schema |
42 self.repo.config._cubes = None # avoid assertion error |
42 try: |
43 self.repo.config.repairing = True # avoid versions checking |
43 self.repo.schema = CubicWebSchema(self.repo.config.appid) |
44 self.repo.fill_schema() |
44 self.repo.config._cubes = None # avoid assertion error |
45 table = SQL_PREFIX + 'CWEType' |
45 self.repo.config.repairing = True # avoid versions checking |
46 namecol = SQL_PREFIX + 'name' |
46 self.repo.fill_schema() |
47 finalcol = SQL_PREFIX + 'final' |
47 table = SQL_PREFIX + 'CWEType' |
48 self.session.set_pool() |
48 namecol = SQL_PREFIX + 'name' |
49 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( |
49 finalcol = SQL_PREFIX + 'final' |
50 namecol, table, finalcol)) |
50 self.session.set_pool() |
51 self.assertEquals(cu.fetchall(), []) |
51 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( |
52 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' |
52 namecol, table, finalcol)) |
53 % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) |
53 self.assertEquals(cu.fetchall(), []) |
54 self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), |
54 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' |
55 (u'Date',), (u'Datetime',), |
55 % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) |
56 (u'Decimal',),(u'Float',), |
56 self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), |
57 (u'Int',), |
57 (u'Date',), (u'Datetime',), |
58 (u'Interval',), (u'Password',), |
58 (u'Decimal',),(u'Float',), |
59 (u'String',), (u'Time',)]) |
59 (u'Int',), |
|
60 (u'Interval',), (u'Password',), |
|
61 (u'String',), (u'Time',)]) |
|
62 finally: |
|
63 self.repo.set_schema(origshema) |
60 |
64 |
61 def test_schema_has_owner(self): |
65 def test_schema_has_owner(self): |
62 repo = self.repo |
66 repo = self.repo |
63 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
67 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
64 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
68 self.failIf(repo.execute(cnxid, 'CWEType X WHERE NOT X owned_by U')) |
261 t.join(1) |
265 t.join(1) |
262 if t.isAlive(): |
266 if t.isAlive(): |
263 self.fail('something went wrong, thread still alive') |
267 self.fail('something went wrong, thread still alive') |
264 finally: |
268 finally: |
265 repository.pyro_unregister(self.repo.config) |
269 repository.pyro_unregister(self.repo.config) |
|
270 from logilab.common import pyro_ext |
|
271 pyro_ext._DAEMONS.clear() |
266 |
272 |
267 def _pyro_client(self, done): |
273 def _pyro_client(self, done): |
268 cnx = connect(self.repo.config.appid, u'admin', password='gingkow') |
274 cnx = connect(self.repo.config.appid, u'admin', password='gingkow') |
269 try: |
275 try: |
270 # check we can get the schema |
276 # check we can get the schema |
454 u'system.version.cubicweb', u'system.version.email', |
460 u'system.version.cubicweb', u'system.version.email', |
455 u'system.version.file', u'system.version.folder', |
461 u'system.version.file', u'system.version.folder', |
456 u'system.version.tag']) |
462 u'system.version.tag']) |
457 |
463 |
458 CALLED = [] |
464 CALLED = [] |
459 class EcritParHook(hook.Hook): |
|
460 __regid__ = 'inlinedrelhook' |
|
461 __select__ = hook.Hook.__select__ & hook.match_rtype('ecrit_par') |
|
462 events = ('before_add_relation', 'after_add_relation', |
|
463 'before_delete_relation', 'after_delete_relation') |
|
464 def __call__(self): |
|
465 CALLED.append((self.event, self.eidfrom, self.rtype, self.eidto)) |
|
466 |
465 |
467 class InlineRelHooksTC(CubicWebTC): |
466 class InlineRelHooksTC(CubicWebTC): |
468 """test relation hooks are called for inlined relations |
467 """test relation hooks are called for inlined relations |
469 """ |
468 """ |
470 def setUp(self): |
469 def setUp(self): |
475 def _after_relation_hook(self, pool, fromeid, rtype, toeid): |
474 def _after_relation_hook(self, pool, fromeid, rtype, toeid): |
476 self.called.append((fromeid, rtype, toeid)) |
475 self.called.append((fromeid, rtype, toeid)) |
477 |
476 |
478 def test_inline_relation(self): |
477 def test_inline_relation(self): |
479 """make sure <event>_relation hooks are called for inlined relation""" |
478 """make sure <event>_relation hooks are called for inlined relation""" |
|
479 class EcritParHook(hook.Hook): |
|
480 __regid__ = 'inlinedrelhook' |
|
481 __select__ = hook.Hook.__select__ & hook.match_rtype('ecrit_par') |
|
482 events = ('before_add_relation', 'after_add_relation', |
|
483 'before_delete_relation', 'after_delete_relation') |
|
484 def __call__(self): |
|
485 CALLED.append((self.event, self.eidfrom, self.rtype, self.eidto)) |
|
486 |
480 self.hm.register(EcritParHook) |
487 self.hm.register(EcritParHook) |
481 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
488 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
482 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
489 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
483 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
490 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
484 self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
491 self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |