server/test/unittest_migractions.py
changeset 0 b97547f5f1fa
child 47 54087a269bdd
equal deleted inserted replaced
-1:000000000000 0:b97547f5f1fa
       
     1 """unit tests for module cubicweb.server.migractions
       
     2 """
       
     3 
       
     4 from mx.DateTime import DateTime, today
       
     5 
       
     6 from logilab.common.testlib import TestCase, unittest_main
       
     7 from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions
       
     8 
       
     9 from cubicweb.server.repository import Repository
       
    10 from cubicweb.server.migractions import *
       
    11 
       
    12 orig_get_versions = Repository.get_versions
       
    13 
       
    14 def setup_module(*args):
       
    15     Repository.get_versions = get_versions
       
    16 
       
    17 def teardown_module(*args):
       
    18     Repository.get_versions = orig_get_versions
       
    19 
       
    20     
       
    21 class MigrationCommandsTC(RepositoryBasedTC):
       
    22     copy_schema = True
       
    23     
       
    24     def setUp(self):
       
    25         if not hasattr(self, '_repo'):
       
    26             # first initialization
       
    27             repo = self.repo # set by the RepositoryBasedTC metaclass
       
    28             # force to read schema from the database
       
    29             repo.config._cubes = None
       
    30             repo.fill_schema()
       
    31             # hack to read the schema from data/migrschema
       
    32             from cubicweb.schema import CubicWebSchemaLoader
       
    33             CubicWebSchemaLoader.main_schema_directory = 'migrschema'
       
    34             global migrschema
       
    35             migrschema = self.repo.config.load_schema()
       
    36             del CubicWebSchemaLoader.main_schema_directory
       
    37             assert 'Folder' in migrschema
       
    38             self.repo.hm.deactivate_verification_hooks()
       
    39         RepositoryBasedTC.setUp(self)
       
    40         self.mh = ServerMigrationHelper(self.repo.config, migrschema,
       
    41                                         repo=self.repo, cnx=self.cnx,
       
    42                                         interactive=False)
       
    43         
       
    44     def test_add_attribute_int(self):
       
    45         self.failIf('whatever' in self.schema)
       
    46         paraordernum = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0]
       
    47         self.mh.cmd_add_attribute('Note', 'whatever')
       
    48         self.failUnless('whatever' in self.schema)
       
    49         self.assertEquals(self.schema['whatever'].subjects(), ('Note',))
       
    50         self.assertEquals(self.schema['whatever'].objects(), ('Int',))
       
    51         paraordernum2 = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0]
       
    52         self.assertEquals(paraordernum2, paraordernum+1)
       
    53         #self.assertEquals([r.type for r in self.schema['Note'].ordered_relations()],
       
    54         #                  ['modification_date', 'creation_date', 'owned_by',
       
    55         #                   'eid', 'ecrit_par', 'inline1', 'date', 'type',
       
    56         #                   'whatever', 'para', 'in_basket'])
       
    57         # NB: commit instead of rollback make following test fail with py2.5
       
    58         #     this sounds like a pysqlite/2.5 bug (the same eid is affected to
       
    59         #     two different entities)
       
    60         self.mh.rollback()
       
    61 
       
    62     def test_add_attribute_varchar(self):
       
    63         self.failIf('shortpara' in self.schema)
       
    64         self.mh.cmd_add_attribute('Note', 'shortpara')
       
    65         self.failUnless('shortpara' in self.schema)
       
    66         self.assertEquals(self.schema['shortpara'].subjects(), ('Note', ))
       
    67         self.assertEquals(self.schema['shortpara'].objects(), ('String', ))
       
    68         # test created column is actually a varchar(64)
       
    69         notesql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='Note'")[0][0]
       
    70         fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(','))
       
    71         self.assertEquals(fields['shortpara'], 'varchar(64)')
       
    72         self.mh.rollback()
       
    73         
       
    74     def test_add_datetime_with_default_value_attribute(self):
       
    75         self.failIf('mydate' in self.schema)
       
    76         self.mh.cmd_add_attribute('Note', 'mydate')
       
    77         self.failUnless('mydate' in self.schema)
       
    78         self.assertEquals(self.schema['mydate'].subjects(), ('Note', ))
       
    79         self.assertEquals(self.schema['mydate'].objects(), ('Date', ))
       
    80         testdate = DateTime(2005, 12, 13)
       
    81         eid1 = self.mh.rqlexec('INSERT Note N')[0][0]
       
    82         eid2 = self.mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0]
       
    83         d1 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1}, 'x')[0][0]
       
    84         d2 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2}, 'x')[0][0]
       
    85         self.assertEquals(d1, today())
       
    86         self.assertEquals(d2, testdate)
       
    87         self.mh.rollback()
       
    88             
       
    89     def test_rename_attribute(self):
       
    90         self.failIf('civility' in self.schema)
       
    91         eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0]
       
    92         eid2 = self.mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0]
       
    93         self.mh.cmd_rename_attribute('Personne', 'sexe', 'civility')
       
    94         self.failIf('sexe' in self.schema)
       
    95         self.failUnless('civility' in self.schema)
       
    96         # test data has been backported
       
    97         c1 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0]
       
    98         self.failUnlessEqual(c1, 'M')
       
    99         c2 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0]
       
   100         self.failUnlessEqual(c2, None)
       
   101 
       
   102 
       
   103     def test_workflow_actions(self):
       
   104         foo = self.mh.cmd_add_state(u'foo', ('Personne', 'Email'), initial=True)
       
   105         for etype in ('Personne', 'Email'):
       
   106             s1 = self.mh.rqlexec('Any N WHERE S state_of ET, ET name "%s", S name N' %
       
   107                                  etype)[0][0]
       
   108             self.assertEquals(s1, "foo")
       
   109             s1 = self.mh.rqlexec('Any N WHERE ET initial_state S, ET name "%s", S name N' %
       
   110                                  etype)[0][0]
       
   111             self.assertEquals(s1, "foo")
       
   112         bar = self.mh.cmd_add_state(u'bar', ('Personne', 'Email'), initial=True)
       
   113         baz = self.mh.cmd_add_transition(u'baz', ('Personne', 'Email'),
       
   114                                          (foo,), bar, ('managers',))
       
   115         for etype in ('Personne', 'Email'):
       
   116             t1 = self.mh.rqlexec('Any N WHERE T transition_of ET, ET name "%s", T name N' %
       
   117                                  etype)[0][0]
       
   118             self.assertEquals(t1, "baz")
       
   119         gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0]
       
   120         self.assertEquals(gn, 'managers')
       
   121         
       
   122     def test_add_entity_type(self):
       
   123         self.failIf('Folder2' in self.schema)
       
   124         self.failIf('filed_under2' in self.schema)
       
   125         self.mh.cmd_add_entity_type('Folder2')
       
   126         self.failUnless('Folder2' in self.schema)
       
   127         self.failUnless(self.execute('EEType X WHERE X name "Folder2"'))
       
   128         self.failUnless('filed_under2' in self.schema)
       
   129         self.failUnless(self.execute('ERType X WHERE X name "filed_under2"'))
       
   130         self.assertEquals(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()),
       
   131                           ['created_by', 'creation_date', 'description', 'description_format', 'eid',
       
   132                            'filed_under2', 'has_text', 'identity', 'is', 'is_instance_of',
       
   133                            'modification_date', 'name', 'owned_by'])
       
   134         self.assertEquals([str(rs) for rs in self.schema['Folder2'].object_relations()],
       
   135                           ['filed_under2', 'identity'])
       
   136         self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
       
   137                           ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', 'Folder2',
       
   138                            'Image', 'Note', 'Personne', 'Societe', 'SubDivision'])
       
   139         self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',))
       
   140         eschema = self.schema.eschema('Folder2')
       
   141         for cstr in eschema.constraints('name'):
       
   142             self.failUnless(hasattr(cstr, 'eid'))
       
   143 
       
   144     def test_drop_entity_type(self):
       
   145         self.mh.cmd_add_entity_type('Folder2')
       
   146         todoeid = self.mh.cmd_add_state(u'todo', 'Folder2', initial=True)
       
   147         doneeid = self.mh.cmd_add_state(u'done', 'Folder2')
       
   148         self.mh.cmd_add_transition(u'redoit', 'Folder2', (doneeid,), todoeid)
       
   149         self.mh.cmd_add_transition(u'markasdone', 'Folder2', (todoeid,), doneeid)
       
   150         self.commit()
       
   151         eschema = self.schema.eschema('Folder2')
       
   152         self.mh.cmd_drop_entity_type('Folder2')
       
   153         self.failIf('Folder2' in self.schema)
       
   154         self.failIf(self.execute('EEType X WHERE X name "Folder2"'))
       
   155         # test automatic workflow deletion
       
   156         self.failIf(self.execute('State X WHERE NOT X state_of ET'))
       
   157         self.failIf(self.execute('Transition X WHERE NOT X transition_of ET'))
       
   158 
       
   159     def test_add_relation_type(self):
       
   160         self.mh.cmd_add_entity_type('Folder2', auto=False)
       
   161         self.mh.cmd_add_relation_type('filed_under2')
       
   162         self.failUnless('filed_under2' in self.schema)
       
   163         self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()),
       
   164                           ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', 'Folder2',
       
   165                            'Image', 'Note', 'Personne', 'Societe', 'SubDivision'])
       
   166         self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',))
       
   167 
       
   168 
       
   169     def test_drop_relation_type(self):
       
   170         self.mh.cmd_add_entity_type('Folder2', auto=False)
       
   171         self.mh.cmd_add_relation_type('filed_under2')
       
   172         self.failUnless('filed_under2' in self.schema)
       
   173         self.mh.cmd_drop_relation_type('filed_under2')
       
   174         self.failIf('filed_under2' in self.schema)
       
   175 
       
   176     def test_add_relation_definition(self):
       
   177         self.mh.cmd_add_relation_definition('Societe', 'in_state', 'State')
       
   178         self.assertEquals(sorted(self.schema['in_state'].subjects()),
       
   179                           ['Affaire', 'Division', 'EUser', 'Note', 'Societe', 'SubDivision'])
       
   180         self.assertEquals(self.schema['in_state'].objects(), ('State',))
       
   181 
       
   182     def test_add_relation_definition_nortype(self):
       
   183         self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire')
       
   184         self.assertEquals(self.schema['concerne2'].subjects(),
       
   185                           ('Personne',))
       
   186         self.assertEquals(self.schema['concerne2'].objects(), ('Affaire',))
       
   187 
       
   188     def test_drop_relation_definition1(self):
       
   189         self.failUnless('concerne' in self.schema)
       
   190         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
       
   191         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
       
   192         self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire')
       
   193         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire'])
       
   194         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Division', 'Note', 'Societe', 'SubDivision'])
       
   195         
       
   196     def test_drop_relation_definition_with_specialization(self):
       
   197         self.failUnless('concerne' in self.schema)
       
   198         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
       
   199         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision'])
       
   200         self.mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe')
       
   201         self.mh.cmd_drop_relation_definition('None', 'concerne', 'Societe')
       
   202         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne'])
       
   203         self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Note'])
       
   204         
       
   205     def test_drop_relation_definition2(self):
       
   206         self.failUnless('evaluee' in self.schema)
       
   207         self.mh.cmd_drop_relation_definition('Personne', 'evaluee', 'Note')
       
   208         self.failUnless('evaluee' in self.schema)
       
   209         self.assertEquals(sorted(self.schema['evaluee'].subjects()),
       
   210                           ['Division', 'EUser', 'Societe', 'SubDivision'])
       
   211         self.assertEquals(sorted(self.schema['evaluee'].objects()),
       
   212                           ['Note'])
       
   213 
       
   214     def test_rename_relation(self):
       
   215         self.skip('implement me')
       
   216 
       
   217     def test_change_relation_props_non_final(self):
       
   218         rschema = self.schema['concerne']
       
   219         card = rschema.rproperty('Affaire', 'Societe', 'cardinality')
       
   220         self.assertEquals(card, '**')
       
   221         try:
       
   222             self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
       
   223                                               cardinality='?*')
       
   224             card = rschema.rproperty('Affaire', 'Societe', 'cardinality')
       
   225             self.assertEquals(card, '?*')
       
   226         finally:
       
   227             self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe',
       
   228                                               cardinality='**')
       
   229             
       
   230     def test_change_relation_props_final(self):
       
   231         rschema = self.schema['adel']
       
   232         card = rschema.rproperty('Personne', 'String', 'fulltextindexed')
       
   233         self.assertEquals(card, False)
       
   234         try:
       
   235             self.mh.cmd_change_relation_props('Personne', 'adel', 'String',
       
   236                                               fulltextindexed=True)
       
   237             card = rschema.rproperty('Personne', 'String', 'fulltextindexed')
       
   238             self.assertEquals(card, True)
       
   239         finally:
       
   240             self.mh.cmd_change_relation_props('Personne', 'adel', 'String',
       
   241                                               fulltextindexed=False)
       
   242 
       
   243     def test_synchronize_schema(self):
       
   244         cursor = self.mh.rqlcursor
       
   245         nbrqlexpr_start = len(cursor.execute('RQLExpression X'))
       
   246         migrschema['titre']._rproperties[('Personne', 'String')]['order'] = 7
       
   247         migrschema['adel']._rproperties[('Personne', 'String')]['order'] = 6
       
   248         migrschema['ass']._rproperties[('Personne', 'String')]['order'] = 5
       
   249 #         expected = ['eid', 'has_text', 'creation_date', 'modification_date',
       
   250 #                     'nom', 'prenom', 'civility', 'promo', 'ass', 'adel', 'titre',
       
   251 #                     'web', 'tel', 'fax', 'datenaiss', 'test']
       
   252 #         self.assertEquals([rs.type for rs in migrschema['Personne'].ordered_relations() if rs.is_final()],
       
   253 #                           expected)
       
   254         migrschema['Personne'].description = 'blabla bla'
       
   255         migrschema['titre'].description = 'usually a title' 
       
   256         migrschema['titre']._rproperties[('Personne', 'String')]['description'] = 'title for this person'
       
   257 #         rinorderbefore = cursor.execute('Any O,N WHERE X is EFRDef, X relation_type RT, RT name N,'
       
   258 #                                         'X from_entity FE, FE name "Personne",'
       
   259 #                                         'X ordernum O ORDERBY O')
       
   260 #         expected = [u'creation_date', u'modification_date', u'nom', u'prenom',
       
   261 #                     u'sexe', u'promo', u'titre', u'adel', u'ass', u'web', u'tel',
       
   262 #                     u'fax', u'datenaiss', u'test', u'description']
       
   263 #        self.assertListEquals(rinorderbefore, map(list, zip([0, 0]+range(1, len(expected)), expected)))
       
   264         
       
   265         self.mh.cmd_synchronize_schema(commit=False)
       
   266         
       
   267         self.assertEquals(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0],
       
   268                           'blabla bla')
       
   269         self.assertEquals(cursor.execute('Any D WHERE X name "titre", X description D')[0][0],
       
   270                           'usually a title')
       
   271         self.assertEquals(cursor.execute('Any D WHERE X relation_type RT, RT name "titre",'
       
   272                                          'X from_entity FE, FE name "Personne",'
       
   273                                          'X description D')[0][0],
       
   274                           'title for this person')
       
   275         # skip "sexe" and "description" since they aren't in the migration
       
   276         # schema and so behaviour is undefined
       
   277         # "civility" is also skipped since it may have been added by
       
   278         # test_rename_attribut :o/
       
   279         rinorder = [n for n, in cursor.execute('Any N ORDERBY O WHERE X is EFRDef, X relation_type RT, RT name N,'
       
   280                                                'X from_entity FE, FE name "Personne",'
       
   281                                                'X ordernum O') if n not in ('sexe', 'description', 'civility')]
       
   282         expected = [u'nom', u'prenom', u'promo', u'ass', u'adel', u'titre',
       
   283                     u'web', u'tel', u'fax', u'datenaiss', u'test', u'firstname',
       
   284                     u'creation_date', u'modification_date']
       
   285         self.assertEquals(rinorder, expected)
       
   286 
       
   287         # test permissions synchronization ####################################
       
   288         # new rql expr to add note entity
       
   289         eexpr = self._erqlexpr_entity('add', 'Note')
       
   290         self.assertEquals(eexpr.expression,
       
   291                           'X ecrit_part PE, U in_group G, '
       
   292                           'PE require_permission P, P name "add_note", P require_group G')
       
   293         self.assertEquals([et.name for et in eexpr.reverse_add_permission], ['Note'])
       
   294         self.assertEquals(eexpr.reverse_read_permission, [])
       
   295         self.assertEquals(eexpr.reverse_delete_permission, [])
       
   296         self.assertEquals(eexpr.reverse_update_permission, [])
       
   297         # no more rqlexpr to delete and add para attribute
       
   298         self.failIf(self._rrqlexpr_rset('add', 'para'))
       
   299         self.failIf(self._rrqlexpr_rset('delete', 'para'))
       
   300         # new rql expr to add ecrit_par relation        
       
   301         rexpr = self._rrqlexpr_entity('add', 'ecrit_par')
       
   302         self.assertEquals(rexpr.expression,
       
   303                           'O require_permission P, P name "add_note", '
       
   304                           'U in_group G, P require_group G')
       
   305         self.assertEquals([rt.name for rt in rexpr.reverse_add_permission], ['ecrit_par'])
       
   306         self.assertEquals(rexpr.reverse_read_permission, [])
       
   307         self.assertEquals(rexpr.reverse_delete_permission, [])
       
   308         # no more rqlexpr to delete and add travaille relation
       
   309         self.failIf(self._rrqlexpr_rset('add', 'travaille'))
       
   310         self.failIf(self._rrqlexpr_rset('delete', 'travaille'))
       
   311         # no more rqlexpr to delete and update Societe entity
       
   312         self.failIf(self._erqlexpr_rset('update', 'Societe'))
       
   313         self.failIf(self._erqlexpr_rset('delete', 'Societe'))
       
   314         # no more rqlexpr to read Affaire entity
       
   315         self.failIf(self._erqlexpr_rset('read', 'Affaire'))
       
   316         # rqlexpr to update Affaire entity has been updated
       
   317         eexpr = self._erqlexpr_entity('update', 'Affaire')
       
   318         self.assertEquals(eexpr.expression, 'X concerne S, S owned_by U')
       
   319         # no change for rqlexpr to add and delete Affaire entity
       
   320         self.assertEquals(len(self._erqlexpr_rset('delete', 'Affaire')), 1)
       
   321         self.assertEquals(len(self._erqlexpr_rset('add', 'Affaire')), 1)
       
   322         # no change for rqlexpr to add and delete concerne relation
       
   323         self.assertEquals(len(self._rrqlexpr_rset('delete', 'concerne')), 1)
       
   324         self.assertEquals(len(self._rrqlexpr_rset('add', 'concerne')), 1)
       
   325         # * migrschema involve:
       
   326         #   * 8 deletion (2 in Affaire read + Societe + travaille + para rqlexprs)
       
   327         #   * 1 update (Affaire update)
       
   328         #   * 2 new (Note add, ecrit_par add)
       
   329         # remaining orphan rql expr which should be deleted at commit (composite relation)
       
   330         self.assertEquals(len(cursor.execute('RQLExpression X WHERE NOT ET1 read_permission X, NOT ET2 add_permission X, '
       
   331                                              'NOT ET3 delete_permission X, NOT ET4 update_permission X')), 8+1)
       
   332         # finally
       
   333         self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2) 
       
   334                           
       
   335         self.mh.rollback()
       
   336 
       
   337     def _erqlexpr_rset(self, action, ertype):
       
   338         rql = 'RQLExpression X WHERE ET is EEType, ET %s_permission X, ET name %%(name)s' % action
       
   339         return self.mh.rqlcursor.execute(rql, {'name': ertype})
       
   340     def _erqlexpr_entity(self, action, ertype):
       
   341         rset = self._erqlexpr_rset(action, ertype)
       
   342         self.assertEquals(len(rset), 1)
       
   343         return rset.get_entity(0, 0)
       
   344     def _rrqlexpr_rset(self, action, ertype):
       
   345         rql = 'RQLExpression X WHERE ET is ERType, ET %s_permission X, ET name %%(name)s' % action
       
   346         return self.mh.rqlcursor.execute(rql, {'name': ertype})
       
   347     def _rrqlexpr_entity(self, action, ertype):
       
   348         rset = self._rrqlexpr_rset(action, ertype)
       
   349         self.assertEquals(len(rset), 1)
       
   350         return rset.get_entity(0, 0)
       
   351     
       
   352     def test_set_size_constraint(self):
       
   353         # existing previous value
       
   354         try:
       
   355             self.mh.cmd_set_size_constraint('EEType', 'name', 128)
       
   356         finally:
       
   357             self.mh.cmd_set_size_constraint('EEType', 'name', 64)
       
   358         # non existing previous value
       
   359         try:
       
   360             self.mh.cmd_set_size_constraint('EEType', 'description', 256)
       
   361         finally:
       
   362             self.mh.cmd_set_size_constraint('EEType', 'description', None)
       
   363 
       
   364     def test_add_remove_cube(self):
       
   365         cubes = set(self.config.cubes())
       
   366         schema = self.repo.schema
       
   367         try:
       
   368             self.mh.cmd_remove_cube('eemail')
       
   369             # efile was there because it's an eemail dependancy, should have been removed
       
   370             cubes.remove('eemail')
       
   371             cubes.remove('efile')
       
   372             self.assertEquals(set(self.config.cubes()), cubes)
       
   373             for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', 
       
   374                            'sender', 'in_thread', 'reply_to', 'data_format'):
       
   375                 self.failIf(ertype in schema, ertype)
       
   376             self.assertEquals(sorted(schema['see_also']._rproperties.keys()),
       
   377                               [('Folder', 'Folder')])
       
   378             self.assertEquals(schema['see_also'].subjects(), ('Folder',))
       
   379             self.assertEquals(schema['see_also'].objects(), ('Folder',))
       
   380             self.assertEquals(self.execute('Any X WHERE X pkey "system.version.eemail"').rowcount, 0)
       
   381             self.assertEquals(self.execute('Any X WHERE X pkey "system.version.efile"').rowcount, 0)
       
   382             self.failIf('eemail' in self.config.cubes())
       
   383             self.failIf('efile' in self.config.cubes())
       
   384         finally:
       
   385             self.mh.cmd_add_cube('eemail')
       
   386             cubes.add('eemail')
       
   387             cubes.add('efile')
       
   388             self.assertEquals(set(self.config.cubes()), cubes)
       
   389             for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', 
       
   390                            'sender', 'in_thread', 'reply_to', 'data_format'):
       
   391                 self.failUnless(ertype in schema, ertype)
       
   392             self.assertEquals(sorted(schema['see_also']._rproperties.keys()),
       
   393                               [('EmailThread', 'EmailThread'), ('Folder', 'Folder')])
       
   394             self.assertEquals(sorted(schema['see_also'].subjects()), ['EmailThread', 'Folder'])
       
   395             self.assertEquals(sorted(schema['see_also'].objects()), ['EmailThread', 'Folder'])
       
   396             from eemail.__pkginfo__ import version as eemail_version
       
   397             from efile.__pkginfo__ import version as efile_version
       
   398             self.assertEquals(self.execute('Any V WHERE X value V, X pkey "system.version.eemail"')[0][0],
       
   399                               eemail_version)
       
   400             self.assertEquals(self.execute('Any V WHERE X value V, X pkey "system.version.efile"')[0][0],
       
   401                               efile_version)
       
   402             self.failUnless('eemail' in self.config.cubes())
       
   403             self.failUnless('efile' in self.config.cubes())
       
   404             # trick: overwrite self.maxeid to avoid deletion of just reintroduced
       
   405             #        types (and their associated tables!)
       
   406             self.maxeid = self.execute('Any MAX(X)')[0][0]
       
   407             # why this commit is necessary is unclear to me (though without it
       
   408             # next test may fail complaining of missing tables
       
   409             self.commit() 
       
   410         
       
   411 if __name__ == '__main__':
       
   412     unittest_main()