|
1 """unit tests for module cubicweb.server.migractions |
|
2 """ |
|
3 |
|
4 from mx.DateTime import DateTime, today |
|
5 |
|
6 from logilab.common.testlib import TestCase, unittest_main |
|
7 from cubicweb.devtools.apptest import RepositoryBasedTC, get_versions |
|
8 |
|
9 from cubicweb.server.repository import Repository |
|
10 from cubicweb.server.migractions import * |
|
11 |
|
12 orig_get_versions = Repository.get_versions |
|
13 |
|
14 def setup_module(*args): |
|
15 Repository.get_versions = get_versions |
|
16 |
|
17 def teardown_module(*args): |
|
18 Repository.get_versions = orig_get_versions |
|
19 |
|
20 |
|
21 class MigrationCommandsTC(RepositoryBasedTC): |
|
22 copy_schema = True |
|
23 |
|
24 def setUp(self): |
|
25 if not hasattr(self, '_repo'): |
|
26 # first initialization |
|
27 repo = self.repo # set by the RepositoryBasedTC metaclass |
|
28 # force to read schema from the database |
|
29 repo.config._cubes = None |
|
30 repo.fill_schema() |
|
31 # hack to read the schema from data/migrschema |
|
32 from cubicweb.schema import CubicWebSchemaLoader |
|
33 CubicWebSchemaLoader.main_schema_directory = 'migrschema' |
|
34 global migrschema |
|
35 migrschema = self.repo.config.load_schema() |
|
36 del CubicWebSchemaLoader.main_schema_directory |
|
37 assert 'Folder' in migrschema |
|
38 self.repo.hm.deactivate_verification_hooks() |
|
39 RepositoryBasedTC.setUp(self) |
|
40 self.mh = ServerMigrationHelper(self.repo.config, migrschema, |
|
41 repo=self.repo, cnx=self.cnx, |
|
42 interactive=False) |
|
43 |
|
44 def test_add_attribute_int(self): |
|
45 self.failIf('whatever' in self.schema) |
|
46 paraordernum = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0] |
|
47 self.mh.cmd_add_attribute('Note', 'whatever') |
|
48 self.failUnless('whatever' in self.schema) |
|
49 self.assertEquals(self.schema['whatever'].subjects(), ('Note',)) |
|
50 self.assertEquals(self.schema['whatever'].objects(), ('Int',)) |
|
51 paraordernum2 = self.mh.rqlexec('Any O WHERE X name "Note", RT name "para", RDEF from_entity X, RDEF relation_type RT, RDEF ordernum O')[0][0] |
|
52 self.assertEquals(paraordernum2, paraordernum+1) |
|
53 #self.assertEquals([r.type for r in self.schema['Note'].ordered_relations()], |
|
54 # ['modification_date', 'creation_date', 'owned_by', |
|
55 # 'eid', 'ecrit_par', 'inline1', 'date', 'type', |
|
56 # 'whatever', 'para', 'in_basket']) |
|
57 # NB: commit instead of rollback make following test fail with py2.5 |
|
58 # this sounds like a pysqlite/2.5 bug (the same eid is affected to |
|
59 # two different entities) |
|
60 self.mh.rollback() |
|
61 |
|
62 def test_add_attribute_varchar(self): |
|
63 self.failIf('shortpara' in self.schema) |
|
64 self.mh.cmd_add_attribute('Note', 'shortpara') |
|
65 self.failUnless('shortpara' in self.schema) |
|
66 self.assertEquals(self.schema['shortpara'].subjects(), ('Note', )) |
|
67 self.assertEquals(self.schema['shortpara'].objects(), ('String', )) |
|
68 # test created column is actually a varchar(64) |
|
69 notesql = self.mh.sqlexec("SELECT sql FROM sqlite_master WHERE type='table' and name='Note'")[0][0] |
|
70 fields = dict(x.strip().split()[:2] for x in notesql.split('(', 1)[1].rsplit(')', 1)[0].split(',')) |
|
71 self.assertEquals(fields['shortpara'], 'varchar(64)') |
|
72 self.mh.rollback() |
|
73 |
|
74 def test_add_datetime_with_default_value_attribute(self): |
|
75 self.failIf('mydate' in self.schema) |
|
76 self.mh.cmd_add_attribute('Note', 'mydate') |
|
77 self.failUnless('mydate' in self.schema) |
|
78 self.assertEquals(self.schema['mydate'].subjects(), ('Note', )) |
|
79 self.assertEquals(self.schema['mydate'].objects(), ('Date', )) |
|
80 testdate = DateTime(2005, 12, 13) |
|
81 eid1 = self.mh.rqlexec('INSERT Note N')[0][0] |
|
82 eid2 = self.mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0] |
|
83 d1 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1}, 'x')[0][0] |
|
84 d2 = self.mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2}, 'x')[0][0] |
|
85 self.assertEquals(d1, today()) |
|
86 self.assertEquals(d2, testdate) |
|
87 self.mh.rollback() |
|
88 |
|
89 def test_rename_attribute(self): |
|
90 self.failIf('civility' in self.schema) |
|
91 eid1 = self.mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0] |
|
92 eid2 = self.mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0] |
|
93 self.mh.cmd_rename_attribute('Personne', 'sexe', 'civility') |
|
94 self.failIf('sexe' in self.schema) |
|
95 self.failUnless('civility' in self.schema) |
|
96 # test data has been backported |
|
97 c1 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0] |
|
98 self.failUnlessEqual(c1, 'M') |
|
99 c2 = self.mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0] |
|
100 self.failUnlessEqual(c2, None) |
|
101 |
|
102 |
|
103 def test_workflow_actions(self): |
|
104 foo = self.mh.cmd_add_state(u'foo', ('Personne', 'Email'), initial=True) |
|
105 for etype in ('Personne', 'Email'): |
|
106 s1 = self.mh.rqlexec('Any N WHERE S state_of ET, ET name "%s", S name N' % |
|
107 etype)[0][0] |
|
108 self.assertEquals(s1, "foo") |
|
109 s1 = self.mh.rqlexec('Any N WHERE ET initial_state S, ET name "%s", S name N' % |
|
110 etype)[0][0] |
|
111 self.assertEquals(s1, "foo") |
|
112 bar = self.mh.cmd_add_state(u'bar', ('Personne', 'Email'), initial=True) |
|
113 baz = self.mh.cmd_add_transition(u'baz', ('Personne', 'Email'), |
|
114 (foo,), bar, ('managers',)) |
|
115 for etype in ('Personne', 'Email'): |
|
116 t1 = self.mh.rqlexec('Any N WHERE T transition_of ET, ET name "%s", T name N' % |
|
117 etype)[0][0] |
|
118 self.assertEquals(t1, "baz") |
|
119 gn = self.mh.rqlexec('Any GN WHERE T require_group G, G name GN, T eid %s' % baz)[0][0] |
|
120 self.assertEquals(gn, 'managers') |
|
121 |
|
122 def test_add_entity_type(self): |
|
123 self.failIf('Folder2' in self.schema) |
|
124 self.failIf('filed_under2' in self.schema) |
|
125 self.mh.cmd_add_entity_type('Folder2') |
|
126 self.failUnless('Folder2' in self.schema) |
|
127 self.failUnless(self.execute('EEType X WHERE X name "Folder2"')) |
|
128 self.failUnless('filed_under2' in self.schema) |
|
129 self.failUnless(self.execute('ERType X WHERE X name "filed_under2"')) |
|
130 self.assertEquals(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()), |
|
131 ['created_by', 'creation_date', 'description', 'description_format', 'eid', |
|
132 'filed_under2', 'has_text', 'identity', 'is', 'is_instance_of', |
|
133 'modification_date', 'name', 'owned_by']) |
|
134 self.assertEquals([str(rs) for rs in self.schema['Folder2'].object_relations()], |
|
135 ['filed_under2', 'identity']) |
|
136 self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()), |
|
137 ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', 'Folder2', |
|
138 'Image', 'Note', 'Personne', 'Societe', 'SubDivision']) |
|
139 self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',)) |
|
140 eschema = self.schema.eschema('Folder2') |
|
141 for cstr in eschema.constraints('name'): |
|
142 self.failUnless(hasattr(cstr, 'eid')) |
|
143 |
|
144 def test_drop_entity_type(self): |
|
145 self.mh.cmd_add_entity_type('Folder2') |
|
146 todoeid = self.mh.cmd_add_state(u'todo', 'Folder2', initial=True) |
|
147 doneeid = self.mh.cmd_add_state(u'done', 'Folder2') |
|
148 self.mh.cmd_add_transition(u'redoit', 'Folder2', (doneeid,), todoeid) |
|
149 self.mh.cmd_add_transition(u'markasdone', 'Folder2', (todoeid,), doneeid) |
|
150 self.commit() |
|
151 eschema = self.schema.eschema('Folder2') |
|
152 self.mh.cmd_drop_entity_type('Folder2') |
|
153 self.failIf('Folder2' in self.schema) |
|
154 self.failIf(self.execute('EEType X WHERE X name "Folder2"')) |
|
155 # test automatic workflow deletion |
|
156 self.failIf(self.execute('State X WHERE NOT X state_of ET')) |
|
157 self.failIf(self.execute('Transition X WHERE NOT X transition_of ET')) |
|
158 |
|
159 def test_add_relation_type(self): |
|
160 self.mh.cmd_add_entity_type('Folder2', auto=False) |
|
161 self.mh.cmd_add_relation_type('filed_under2') |
|
162 self.failUnless('filed_under2' in self.schema) |
|
163 self.assertEquals(sorted(str(e) for e in self.schema['filed_under2'].subjects()), |
|
164 ['Affaire', 'Card', 'Division', 'Email', 'EmailThread', 'File', 'Folder2', |
|
165 'Image', 'Note', 'Personne', 'Societe', 'SubDivision']) |
|
166 self.assertEquals(self.schema['filed_under2'].objects(), ('Folder2',)) |
|
167 |
|
168 |
|
169 def test_drop_relation_type(self): |
|
170 self.mh.cmd_add_entity_type('Folder2', auto=False) |
|
171 self.mh.cmd_add_relation_type('filed_under2') |
|
172 self.failUnless('filed_under2' in self.schema) |
|
173 self.mh.cmd_drop_relation_type('filed_under2') |
|
174 self.failIf('filed_under2' in self.schema) |
|
175 |
|
176 def test_add_relation_definition(self): |
|
177 self.mh.cmd_add_relation_definition('Societe', 'in_state', 'State') |
|
178 self.assertEquals(sorted(self.schema['in_state'].subjects()), |
|
179 ['Affaire', 'Division', 'EUser', 'Note', 'Societe', 'SubDivision']) |
|
180 self.assertEquals(self.schema['in_state'].objects(), ('State',)) |
|
181 |
|
182 def test_add_relation_definition_nortype(self): |
|
183 self.mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire') |
|
184 self.assertEquals(self.schema['concerne2'].subjects(), |
|
185 ('Personne',)) |
|
186 self.assertEquals(self.schema['concerne2'].objects(), ('Affaire',)) |
|
187 |
|
188 def test_drop_relation_definition1(self): |
|
189 self.failUnless('concerne' in self.schema) |
|
190 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne']) |
|
191 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) |
|
192 self.mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire') |
|
193 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire']) |
|
194 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Division', 'Note', 'Societe', 'SubDivision']) |
|
195 |
|
196 def test_drop_relation_definition_with_specialization(self): |
|
197 self.failUnless('concerne' in self.schema) |
|
198 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne']) |
|
199 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) |
|
200 self.mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe') |
|
201 self.mh.cmd_drop_relation_definition('None', 'concerne', 'Societe') |
|
202 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].subjects()), ['Affaire', 'Personne']) |
|
203 self.assertEquals(sorted(str(e) for e in self.schema['concerne'].objects()), ['Affaire', 'Note']) |
|
204 |
|
205 def test_drop_relation_definition2(self): |
|
206 self.failUnless('evaluee' in self.schema) |
|
207 self.mh.cmd_drop_relation_definition('Personne', 'evaluee', 'Note') |
|
208 self.failUnless('evaluee' in self.schema) |
|
209 self.assertEquals(sorted(self.schema['evaluee'].subjects()), |
|
210 ['Division', 'EUser', 'Societe', 'SubDivision']) |
|
211 self.assertEquals(sorted(self.schema['evaluee'].objects()), |
|
212 ['Note']) |
|
213 |
|
214 def test_rename_relation(self): |
|
215 self.skip('implement me') |
|
216 |
|
217 def test_change_relation_props_non_final(self): |
|
218 rschema = self.schema['concerne'] |
|
219 card = rschema.rproperty('Affaire', 'Societe', 'cardinality') |
|
220 self.assertEquals(card, '**') |
|
221 try: |
|
222 self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', |
|
223 cardinality='?*') |
|
224 card = rschema.rproperty('Affaire', 'Societe', 'cardinality') |
|
225 self.assertEquals(card, '?*') |
|
226 finally: |
|
227 self.mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', |
|
228 cardinality='**') |
|
229 |
|
230 def test_change_relation_props_final(self): |
|
231 rschema = self.schema['adel'] |
|
232 card = rschema.rproperty('Personne', 'String', 'fulltextindexed') |
|
233 self.assertEquals(card, False) |
|
234 try: |
|
235 self.mh.cmd_change_relation_props('Personne', 'adel', 'String', |
|
236 fulltextindexed=True) |
|
237 card = rschema.rproperty('Personne', 'String', 'fulltextindexed') |
|
238 self.assertEquals(card, True) |
|
239 finally: |
|
240 self.mh.cmd_change_relation_props('Personne', 'adel', 'String', |
|
241 fulltextindexed=False) |
|
242 |
|
243 def test_synchronize_schema(self): |
|
244 cursor = self.mh.rqlcursor |
|
245 nbrqlexpr_start = len(cursor.execute('RQLExpression X')) |
|
246 migrschema['titre']._rproperties[('Personne', 'String')]['order'] = 7 |
|
247 migrschema['adel']._rproperties[('Personne', 'String')]['order'] = 6 |
|
248 migrschema['ass']._rproperties[('Personne', 'String')]['order'] = 5 |
|
249 # expected = ['eid', 'has_text', 'creation_date', 'modification_date', |
|
250 # 'nom', 'prenom', 'civility', 'promo', 'ass', 'adel', 'titre', |
|
251 # 'web', 'tel', 'fax', 'datenaiss', 'test'] |
|
252 # self.assertEquals([rs.type for rs in migrschema['Personne'].ordered_relations() if rs.is_final()], |
|
253 # expected) |
|
254 migrschema['Personne'].description = 'blabla bla' |
|
255 migrschema['titre'].description = 'usually a title' |
|
256 migrschema['titre']._rproperties[('Personne', 'String')]['description'] = 'title for this person' |
|
257 # rinorderbefore = cursor.execute('Any O,N WHERE X is EFRDef, X relation_type RT, RT name N,' |
|
258 # 'X from_entity FE, FE name "Personne",' |
|
259 # 'X ordernum O ORDERBY O') |
|
260 # expected = [u'creation_date', u'modification_date', u'nom', u'prenom', |
|
261 # u'sexe', u'promo', u'titre', u'adel', u'ass', u'web', u'tel', |
|
262 # u'fax', u'datenaiss', u'test', u'description'] |
|
263 # self.assertListEquals(rinorderbefore, map(list, zip([0, 0]+range(1, len(expected)), expected))) |
|
264 |
|
265 self.mh.cmd_synchronize_schema(commit=False) |
|
266 |
|
267 self.assertEquals(cursor.execute('Any D WHERE X name "Personne", X description D')[0][0], |
|
268 'blabla bla') |
|
269 self.assertEquals(cursor.execute('Any D WHERE X name "titre", X description D')[0][0], |
|
270 'usually a title') |
|
271 self.assertEquals(cursor.execute('Any D WHERE X relation_type RT, RT name "titre",' |
|
272 'X from_entity FE, FE name "Personne",' |
|
273 'X description D')[0][0], |
|
274 'title for this person') |
|
275 # skip "sexe" and "description" since they aren't in the migration |
|
276 # schema and so behaviour is undefined |
|
277 # "civility" is also skipped since it may have been added by |
|
278 # test_rename_attribut :o/ |
|
279 rinorder = [n for n, in cursor.execute('Any N ORDERBY O WHERE X is EFRDef, X relation_type RT, RT name N,' |
|
280 'X from_entity FE, FE name "Personne",' |
|
281 'X ordernum O') if n not in ('sexe', 'description', 'civility')] |
|
282 expected = [u'nom', u'prenom', u'promo', u'ass', u'adel', u'titre', |
|
283 u'web', u'tel', u'fax', u'datenaiss', u'test', u'firstname', |
|
284 u'creation_date', u'modification_date'] |
|
285 self.assertEquals(rinorder, expected) |
|
286 |
|
287 # test permissions synchronization #################################### |
|
288 # new rql expr to add note entity |
|
289 eexpr = self._erqlexpr_entity('add', 'Note') |
|
290 self.assertEquals(eexpr.expression, |
|
291 'X ecrit_part PE, U in_group G, ' |
|
292 'PE require_permission P, P name "add_note", P require_group G') |
|
293 self.assertEquals([et.name for et in eexpr.reverse_add_permission], ['Note']) |
|
294 self.assertEquals(eexpr.reverse_read_permission, []) |
|
295 self.assertEquals(eexpr.reverse_delete_permission, []) |
|
296 self.assertEquals(eexpr.reverse_update_permission, []) |
|
297 # no more rqlexpr to delete and add para attribute |
|
298 self.failIf(self._rrqlexpr_rset('add', 'para')) |
|
299 self.failIf(self._rrqlexpr_rset('delete', 'para')) |
|
300 # new rql expr to add ecrit_par relation |
|
301 rexpr = self._rrqlexpr_entity('add', 'ecrit_par') |
|
302 self.assertEquals(rexpr.expression, |
|
303 'O require_permission P, P name "add_note", ' |
|
304 'U in_group G, P require_group G') |
|
305 self.assertEquals([rt.name for rt in rexpr.reverse_add_permission], ['ecrit_par']) |
|
306 self.assertEquals(rexpr.reverse_read_permission, []) |
|
307 self.assertEquals(rexpr.reverse_delete_permission, []) |
|
308 # no more rqlexpr to delete and add travaille relation |
|
309 self.failIf(self._rrqlexpr_rset('add', 'travaille')) |
|
310 self.failIf(self._rrqlexpr_rset('delete', 'travaille')) |
|
311 # no more rqlexpr to delete and update Societe entity |
|
312 self.failIf(self._erqlexpr_rset('update', 'Societe')) |
|
313 self.failIf(self._erqlexpr_rset('delete', 'Societe')) |
|
314 # no more rqlexpr to read Affaire entity |
|
315 self.failIf(self._erqlexpr_rset('read', 'Affaire')) |
|
316 # rqlexpr to update Affaire entity has been updated |
|
317 eexpr = self._erqlexpr_entity('update', 'Affaire') |
|
318 self.assertEquals(eexpr.expression, 'X concerne S, S owned_by U') |
|
319 # no change for rqlexpr to add and delete Affaire entity |
|
320 self.assertEquals(len(self._erqlexpr_rset('delete', 'Affaire')), 1) |
|
321 self.assertEquals(len(self._erqlexpr_rset('add', 'Affaire')), 1) |
|
322 # no change for rqlexpr to add and delete concerne relation |
|
323 self.assertEquals(len(self._rrqlexpr_rset('delete', 'concerne')), 1) |
|
324 self.assertEquals(len(self._rrqlexpr_rset('add', 'concerne')), 1) |
|
325 # * migrschema involve: |
|
326 # * 8 deletion (2 in Affaire read + Societe + travaille + para rqlexprs) |
|
327 # * 1 update (Affaire update) |
|
328 # * 2 new (Note add, ecrit_par add) |
|
329 # remaining orphan rql expr which should be deleted at commit (composite relation) |
|
330 self.assertEquals(len(cursor.execute('RQLExpression X WHERE NOT ET1 read_permission X, NOT ET2 add_permission X, ' |
|
331 'NOT ET3 delete_permission X, NOT ET4 update_permission X')), 8+1) |
|
332 # finally |
|
333 self.assertEquals(len(cursor.execute('RQLExpression X')), nbrqlexpr_start + 1 + 2) |
|
334 |
|
335 self.mh.rollback() |
|
336 |
|
337 def _erqlexpr_rset(self, action, ertype): |
|
338 rql = 'RQLExpression X WHERE ET is EEType, ET %s_permission X, ET name %%(name)s' % action |
|
339 return self.mh.rqlcursor.execute(rql, {'name': ertype}) |
|
340 def _erqlexpr_entity(self, action, ertype): |
|
341 rset = self._erqlexpr_rset(action, ertype) |
|
342 self.assertEquals(len(rset), 1) |
|
343 return rset.get_entity(0, 0) |
|
344 def _rrqlexpr_rset(self, action, ertype): |
|
345 rql = 'RQLExpression X WHERE ET is ERType, ET %s_permission X, ET name %%(name)s' % action |
|
346 return self.mh.rqlcursor.execute(rql, {'name': ertype}) |
|
347 def _rrqlexpr_entity(self, action, ertype): |
|
348 rset = self._rrqlexpr_rset(action, ertype) |
|
349 self.assertEquals(len(rset), 1) |
|
350 return rset.get_entity(0, 0) |
|
351 |
|
352 def test_set_size_constraint(self): |
|
353 # existing previous value |
|
354 try: |
|
355 self.mh.cmd_set_size_constraint('EEType', 'name', 128) |
|
356 finally: |
|
357 self.mh.cmd_set_size_constraint('EEType', 'name', 64) |
|
358 # non existing previous value |
|
359 try: |
|
360 self.mh.cmd_set_size_constraint('EEType', 'description', 256) |
|
361 finally: |
|
362 self.mh.cmd_set_size_constraint('EEType', 'description', None) |
|
363 |
|
364 def test_add_remove_cube(self): |
|
365 cubes = set(self.config.cubes()) |
|
366 schema = self.repo.schema |
|
367 try: |
|
368 self.mh.cmd_remove_cube('eemail') |
|
369 # efile was there because it's an eemail dependancy, should have been removed |
|
370 cubes.remove('eemail') |
|
371 cubes.remove('efile') |
|
372 self.assertEquals(set(self.config.cubes()), cubes) |
|
373 for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', |
|
374 'sender', 'in_thread', 'reply_to', 'data_format'): |
|
375 self.failIf(ertype in schema, ertype) |
|
376 self.assertEquals(sorted(schema['see_also']._rproperties.keys()), |
|
377 [('Folder', 'Folder')]) |
|
378 self.assertEquals(schema['see_also'].subjects(), ('Folder',)) |
|
379 self.assertEquals(schema['see_also'].objects(), ('Folder',)) |
|
380 self.assertEquals(self.execute('Any X WHERE X pkey "system.version.eemail"').rowcount, 0) |
|
381 self.assertEquals(self.execute('Any X WHERE X pkey "system.version.efile"').rowcount, 0) |
|
382 self.failIf('eemail' in self.config.cubes()) |
|
383 self.failIf('efile' in self.config.cubes()) |
|
384 finally: |
|
385 self.mh.cmd_add_cube('eemail') |
|
386 cubes.add('eemail') |
|
387 cubes.add('efile') |
|
388 self.assertEquals(set(self.config.cubes()), cubes) |
|
389 for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', 'Image', |
|
390 'sender', 'in_thread', 'reply_to', 'data_format'): |
|
391 self.failUnless(ertype in schema, ertype) |
|
392 self.assertEquals(sorted(schema['see_also']._rproperties.keys()), |
|
393 [('EmailThread', 'EmailThread'), ('Folder', 'Folder')]) |
|
394 self.assertEquals(sorted(schema['see_also'].subjects()), ['EmailThread', 'Folder']) |
|
395 self.assertEquals(sorted(schema['see_also'].objects()), ['EmailThread', 'Folder']) |
|
396 from eemail.__pkginfo__ import version as eemail_version |
|
397 from efile.__pkginfo__ import version as efile_version |
|
398 self.assertEquals(self.execute('Any V WHERE X value V, X pkey "system.version.eemail"')[0][0], |
|
399 eemail_version) |
|
400 self.assertEquals(self.execute('Any V WHERE X value V, X pkey "system.version.efile"')[0][0], |
|
401 efile_version) |
|
402 self.failUnless('eemail' in self.config.cubes()) |
|
403 self.failUnless('efile' in self.config.cubes()) |
|
404 # trick: overwrite self.maxeid to avoid deletion of just reintroduced |
|
405 # types (and their associated tables!) |
|
406 self.maxeid = self.execute('Any MAX(X)')[0][0] |
|
407 # why this commit is necessary is unclear to me (though without it |
|
408 # next test may fail complaining of missing tables |
|
409 self.commit() |
|
410 |
|
411 if __name__ == '__main__': |
|
412 unittest_main() |