1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """unit tests for module cubicweb.server.migractions""" |
|
19 |
|
20 from datetime import date |
|
21 import os, os.path as osp |
|
22 from contextlib import contextmanager |
|
23 |
|
24 from logilab.common.testlib import unittest_main, Tags, tag |
|
25 from logilab.common import tempattr |
|
26 |
|
27 from yams.constraints import UniqueConstraint |
|
28 |
|
29 from cubicweb import ConfigurationError, ValidationError, ExecutionError |
|
30 from cubicweb.devtools import startpgcluster, stoppgcluster |
|
31 from cubicweb.devtools.testlib import CubicWebTC |
|
32 from cubicweb.server.sqlutils import SQL_PREFIX |
|
33 from cubicweb.server.migractions import ServerMigrationHelper |
|
34 |
|
35 import cubicweb.devtools |
|
36 |
|
37 |
|
38 HERE = osp.dirname(osp.abspath(__file__)) |
|
39 |
|
40 |
|
41 def setUpModule(): |
|
42 startpgcluster(__file__) |
|
43 |
|
44 |
|
45 migrschema = None |
|
46 def tearDownModule(*args): |
|
47 global migrschema |
|
48 del migrschema |
|
49 if hasattr(MigrationCommandsTC, 'origschema'): |
|
50 del MigrationCommandsTC.origschema |
|
51 if hasattr(MigrationCommandsComputedTC, 'origschema'): |
|
52 del MigrationCommandsComputedTC.origschema |
|
53 stoppgcluster(__file__) |
|
54 |
|
55 |
|
56 class MigrationConfig(cubicweb.devtools.TestServerConfiguration): |
|
57 default_sources = cubicweb.devtools.DEFAULT_PSQL_SOURCES |
|
58 CUBES_PATH = cubicweb.devtools.TestServerConfiguration.CUBES_PATH + [ |
|
59 osp.join(HERE, 'data-migractions', 'cubes')] |
|
60 |
|
61 |
|
62 class MigrationTC(CubicWebTC): |
|
63 |
|
64 appid = 'data-migractions' |
|
65 |
|
66 configcls = MigrationConfig |
|
67 |
|
68 tags = CubicWebTC.tags | Tags(('server', 'migration', 'migractions')) |
|
69 |
|
70 def _init_repo(self): |
|
71 super(MigrationTC, self)._init_repo() |
|
72 # we have to read schema from the database to get eid for schema entities |
|
73 self.repo.set_schema(self.repo.deserialize_schema(), resetvreg=False) |
|
74 # hack to read the schema from data/migrschema |
|
75 config = self.config |
|
76 config.appid = osp.join(self.appid, 'migratedapp') |
|
77 config._apphome = osp.join(HERE, config.appid) |
|
78 global migrschema |
|
79 migrschema = config.load_schema() |
|
80 config.appid = self.appid |
|
81 config._apphome = osp.join(HERE, self.appid) |
|
82 |
|
83 def setUp(self): |
|
84 self.configcls.cls_adjust_sys_path() |
|
85 super(MigrationTC, self).setUp() |
|
86 |
|
87 def tearDown(self): |
|
88 super(MigrationTC, self).tearDown() |
|
89 self.repo.vreg['etypes'].clear_caches() |
|
90 |
|
91 @contextmanager |
|
92 def mh(self): |
|
93 with self.admin_access.repo_cnx() as cnx: |
|
94 yield cnx, ServerMigrationHelper(self.repo.config, migrschema, |
|
95 repo=self.repo, cnx=cnx, |
|
96 interactive=False) |
|
97 |
|
98 def table_sql(self, mh, tablename): |
|
99 result = mh.sqlexec("SELECT table_name FROM information_schema.tables WHERE LOWER(table_name)=%(table)s", |
|
100 {'table': tablename.lower()}) |
|
101 if result: |
|
102 return result[0][0] |
|
103 return None # no such table |
|
104 |
|
105 def table_schema(self, mh, tablename): |
|
106 result = mh.sqlexec("SELECT column_name, data_type, character_maximum_length FROM information_schema.columns " |
|
107 "WHERE LOWER(table_name) = %(table)s", {'table': tablename.lower()}) |
|
108 assert result, 'no table %s' % tablename |
|
109 return dict((x[0], (x[1], x[2])) for x in result) |
|
110 |
|
111 |
|
112 class MigrationCommandsTC(MigrationTC): |
|
113 |
|
114 def _init_repo(self): |
|
115 super(MigrationCommandsTC, self)._init_repo() |
|
116 assert 'Folder' in migrschema |
|
117 |
|
118 def test_add_attribute_bool(self): |
|
119 with self.mh() as (cnx, mh): |
|
120 self.assertNotIn('yesno', self.schema) |
|
121 cnx.create_entity('Note') |
|
122 cnx.commit() |
|
123 mh.cmd_add_attribute('Note', 'yesno') |
|
124 self.assertIn('yesno', self.schema) |
|
125 self.assertEqual(self.schema['yesno'].subjects(), ('Note',)) |
|
126 self.assertEqual(self.schema['yesno'].objects(), ('Boolean',)) |
|
127 self.assertEqual(self.schema['Note'].default('yesno'), False) |
|
128 # test default value set on existing entities |
|
129 note = cnx.execute('Note X').get_entity(0, 0) |
|
130 self.assertEqual(note.yesno, False) |
|
131 # test default value set for next entities |
|
132 self.assertEqual(cnx.create_entity('Note').yesno, False) |
|
133 |
|
134 def test_add_attribute_int(self): |
|
135 with self.mh() as (cnx, mh): |
|
136 self.assertNotIn('whatever', self.schema) |
|
137 cnx.create_entity('Note') |
|
138 cnx.commit() |
|
139 orderdict = dict(mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' |
|
140 'RDEF relation_type RT, RDEF ordernum O, RT name RTN')) |
|
141 mh.cmd_add_attribute('Note', 'whatever') |
|
142 self.assertIn('whatever', self.schema) |
|
143 self.assertEqual(self.schema['whatever'].subjects(), ('Note',)) |
|
144 self.assertEqual(self.schema['whatever'].objects(), ('Int',)) |
|
145 self.assertEqual(self.schema['Note'].default('whatever'), 0) |
|
146 # test default value set on existing entities |
|
147 note = cnx.execute('Note X').get_entity(0, 0) |
|
148 self.assertIsInstance(note.whatever, int) |
|
149 self.assertEqual(note.whatever, 0) |
|
150 # test default value set for next entities |
|
151 self.assertEqual(cnx.create_entity('Note').whatever, 0) |
|
152 # test attribute order |
|
153 orderdict2 = dict(mh.rqlexec('Any RTN, O WHERE X name "Note", RDEF from_entity X, ' |
|
154 'RDEF relation_type RT, RDEF ordernum O, RT name RTN')) |
|
155 whateverorder = migrschema['whatever'].rdef('Note', 'Int').order |
|
156 for k, v in orderdict.items(): |
|
157 if v >= whateverorder: |
|
158 orderdict[k] = v+1 |
|
159 orderdict['whatever'] = whateverorder |
|
160 self.assertDictEqual(orderdict, orderdict2) |
|
161 #self.assertEqual([r.type for r in self.schema['Note'].ordered_relations()], |
|
162 # ['modification_date', 'creation_date', 'owned_by', |
|
163 # 'eid', 'ecrit_par', 'inline1', 'date', 'type', |
|
164 # 'whatever', 'date', 'in_basket']) |
|
165 # NB: commit instead of rollback make following test fail with py2.5 |
|
166 # this sounds like a pysqlite/2.5 bug (the same eid is affected to |
|
167 # two different entities) |
|
168 |
|
169 def test_add_attribute_varchar(self): |
|
170 with self.mh() as (cnx, mh): |
|
171 self.assertNotIn('whatever', self.schema) |
|
172 cnx.create_entity('Note') |
|
173 cnx.commit() |
|
174 self.assertNotIn('shortpara', self.schema) |
|
175 mh.cmd_add_attribute('Note', 'shortpara') |
|
176 self.assertIn('shortpara', self.schema) |
|
177 self.assertEqual(self.schema['shortpara'].subjects(), ('Note', )) |
|
178 self.assertEqual(self.schema['shortpara'].objects(), ('String', )) |
|
179 # test created column is actually a varchar(64) |
|
180 fields = self.table_schema(mh, '%sNote' % SQL_PREFIX) |
|
181 self.assertEqual(fields['%sshortpara' % SQL_PREFIX], ('character varying', 64)) |
|
182 # test default value set on existing entities |
|
183 self.assertEqual(cnx.execute('Note X').get_entity(0, 0).shortpara, 'hop') |
|
184 # test default value set for next entities |
|
185 self.assertEqual(cnx.create_entity('Note').shortpara, 'hop') |
|
186 |
|
187 def test_add_datetime_with_default_value_attribute(self): |
|
188 with self.mh() as (cnx, mh): |
|
189 self.assertNotIn('mydate', self.schema) |
|
190 self.assertNotIn('oldstyledefaultdate', self.schema) |
|
191 self.assertNotIn('newstyledefaultdate', self.schema) |
|
192 mh.cmd_add_attribute('Note', 'mydate') |
|
193 mh.cmd_add_attribute('Note', 'oldstyledefaultdate') |
|
194 mh.cmd_add_attribute('Note', 'newstyledefaultdate') |
|
195 self.assertIn('mydate', self.schema) |
|
196 self.assertIn('oldstyledefaultdate', self.schema) |
|
197 self.assertIn('newstyledefaultdate', self.schema) |
|
198 self.assertEqual(self.schema['mydate'].subjects(), ('Note', )) |
|
199 self.assertEqual(self.schema['mydate'].objects(), ('Date', )) |
|
200 testdate = date(2005, 12, 13) |
|
201 eid1 = mh.rqlexec('INSERT Note N')[0][0] |
|
202 eid2 = mh.rqlexec('INSERT Note N: N mydate %(mydate)s', {'mydate' : testdate})[0][0] |
|
203 d1 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid1})[0][0] |
|
204 d2 = mh.rqlexec('Any D WHERE X eid %(x)s, X mydate D', {'x': eid2})[0][0] |
|
205 d3 = mh.rqlexec('Any D WHERE X eid %(x)s, X oldstyledefaultdate D', {'x': eid1})[0][0] |
|
206 d4 = mh.rqlexec('Any D WHERE X eid %(x)s, X newstyledefaultdate D', {'x': eid1})[0][0] |
|
207 self.assertEqual(d1, date.today()) |
|
208 self.assertEqual(d2, testdate) |
|
209 myfavoritedate = date(2013, 1, 1) |
|
210 self.assertEqual(d3, myfavoritedate) |
|
211 self.assertEqual(d4, myfavoritedate) |
|
212 |
|
213 def test_drop_chosen_constraints_ctxmanager(self): |
|
214 with self.mh() as (cnx, mh): |
|
215 with mh.cmd_dropped_constraints('Note', 'unique_id', UniqueConstraint): |
|
216 mh.cmd_add_attribute('Note', 'unique_id') |
|
217 # make sure the maxsize constraint is not dropped |
|
218 self.assertRaises(ValidationError, |
|
219 mh.rqlexec, |
|
220 'INSERT Note N: N unique_id "xyz"') |
|
221 mh.rollback() |
|
222 # make sure the unique constraint is dropped |
|
223 mh.rqlexec('INSERT Note N: N unique_id "x"') |
|
224 mh.rqlexec('INSERT Note N: N unique_id "x"') |
|
225 mh.rqlexec('DELETE Note N') |
|
226 |
|
227 def test_drop_required_ctxmanager(self): |
|
228 with self.mh() as (cnx, mh): |
|
229 with mh.cmd_dropped_constraints('Note', 'unique_id', cstrtype=None, |
|
230 droprequired=True): |
|
231 mh.cmd_add_attribute('Note', 'unique_id') |
|
232 mh.rqlexec('INSERT Note N') |
|
233 mh.rqlexec('SET N unique_id "x"') |
|
234 # make sure the required=True was restored |
|
235 self.assertRaises(ValidationError, mh.rqlexec, 'INSERT Note N') |
|
236 mh.rollback() |
|
237 |
|
238 def test_rename_attribute(self): |
|
239 with self.mh() as (cnx, mh): |
|
240 self.assertNotIn('civility', self.schema) |
|
241 eid1 = mh.rqlexec('INSERT Personne X: X nom "lui", X sexe "M"')[0][0] |
|
242 eid2 = mh.rqlexec('INSERT Personne X: X nom "l\'autre", X sexe NULL')[0][0] |
|
243 mh.cmd_rename_attribute('Personne', 'sexe', 'civility') |
|
244 self.assertNotIn('sexe', self.schema) |
|
245 self.assertIn('civility', self.schema) |
|
246 # test data has been backported |
|
247 c1 = mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid1)[0][0] |
|
248 self.assertEqual(c1, 'M') |
|
249 c2 = mh.rqlexec('Any C WHERE X eid %s, X civility C' % eid2)[0][0] |
|
250 self.assertEqual(c2, None) |
|
251 |
|
252 def test_workflow_actions(self): |
|
253 with self.mh() as (cnx, mh): |
|
254 wf = mh.cmd_add_workflow(u'foo', ('Personne', 'Email'), |
|
255 ensure_workflowable=False) |
|
256 for etype in ('Personne', 'Email'): |
|
257 s1 = mh.rqlexec('Any N WHERE WF workflow_of ET, ET name "%s", WF name N' % |
|
258 etype)[0][0] |
|
259 self.assertEqual(s1, "foo") |
|
260 s1 = mh.rqlexec('Any N WHERE ET default_workflow WF, ET name "%s", WF name N' % |
|
261 etype)[0][0] |
|
262 self.assertEqual(s1, "foo") |
|
263 |
|
264 def test_add_entity_type(self): |
|
265 with self.mh() as (cnx, mh): |
|
266 self.assertNotIn('Folder2', self.schema) |
|
267 self.assertNotIn('filed_under2', self.schema) |
|
268 mh.cmd_add_entity_type('Folder2') |
|
269 self.assertIn('Folder2', self.schema) |
|
270 self.assertIn('Old', self.schema) |
|
271 self.assertTrue(cnx.execute('CWEType X WHERE X name "Folder2"')) |
|
272 self.assertIn('filed_under2', self.schema) |
|
273 self.assertTrue(cnx.execute('CWRType X WHERE X name "filed_under2"')) |
|
274 self.assertEqual(sorted(str(rs) for rs in self.schema['Folder2'].subject_relations()), |
|
275 ['created_by', 'creation_date', 'cw_source', 'cwuri', |
|
276 'description', 'description_format', |
|
277 'eid', |
|
278 'filed_under2', 'has_text', |
|
279 'identity', 'in_basket', 'is', 'is_instance_of', |
|
280 'modification_date', 'name', 'owned_by']) |
|
281 self.assertCountEqual([str(rs) for rs in self.schema['Folder2'].object_relations()], |
|
282 ['filed_under2', 'identity']) |
|
283 # Old will be missing as it has been renamed into 'New' in the migrated |
|
284 # schema while New hasn't been added here. |
|
285 self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), |
|
286 sorted(str(e) for e in self.schema.entities() if not e.final and e != 'Old')) |
|
287 self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) |
|
288 eschema = self.schema.eschema('Folder2') |
|
289 for cstr in eschema.rdef('name').constraints: |
|
290 self.assertTrue(hasattr(cstr, 'eid')) |
|
291 |
|
292 def test_add_cube_with_custom_final_type(self): |
|
293 with self.mh() as (cnx, mh): |
|
294 try: |
|
295 mh.cmd_add_cube('fakecustomtype') |
|
296 self.assertIn('Numeric', self.schema) |
|
297 self.assertTrue(self.schema['Numeric'].final) |
|
298 rdef = self.schema['num'].rdefs[('Location', 'Numeric')] |
|
299 self.assertEqual(rdef.scale, 10) |
|
300 self.assertEqual(rdef.precision, 18) |
|
301 fields = self.table_schema(mh, '%sLocation' % SQL_PREFIX) |
|
302 self.assertEqual(fields['%snum' % SQL_PREFIX], ('numeric', None)) # XXX |
|
303 finally: |
|
304 mh.cmd_drop_cube('fakecustomtype') |
|
305 |
|
306 def test_add_drop_entity_type(self): |
|
307 with self.mh() as (cnx, mh): |
|
308 mh.cmd_add_entity_type('Folder2') |
|
309 wf = mh.cmd_add_workflow(u'folder2 wf', 'Folder2', |
|
310 ensure_workflowable=False) |
|
311 todo = wf.add_state(u'todo', initial=True) |
|
312 done = wf.add_state(u'done') |
|
313 wf.add_transition(u'redoit', done, todo) |
|
314 wf.add_transition(u'markasdone', todo, done) |
|
315 cnx.commit() |
|
316 eschema = self.schema.eschema('Folder2') |
|
317 mh.cmd_drop_entity_type('Folder2') |
|
318 self.assertNotIn('Folder2', self.schema) |
|
319 self.assertFalse(cnx.execute('CWEType X WHERE X name "Folder2"')) |
|
320 # test automatic workflow deletion |
|
321 self.assertFalse(cnx.execute('Workflow X WHERE NOT X workflow_of ET')) |
|
322 self.assertFalse(cnx.execute('State X WHERE NOT X state_of WF')) |
|
323 self.assertFalse(cnx.execute('Transition X WHERE NOT X transition_of WF')) |
|
324 |
|
325 def test_rename_entity_type(self): |
|
326 with self.mh() as (cnx, mh): |
|
327 entity = mh.create_entity('Old', name=u'old') |
|
328 self.repo.type_and_source_from_eid(entity.eid, entity._cw) |
|
329 mh.cmd_rename_entity_type('Old', 'New') |
|
330 mh.cmd_rename_attribute('New', 'name', 'new_name') |
|
331 |
|
332 def test_add_drop_relation_type(self): |
|
333 with self.mh() as (cnx, mh): |
|
334 mh.cmd_add_entity_type('Folder2', auto=False) |
|
335 mh.cmd_add_relation_type('filed_under2') |
|
336 self.assertIn('filed_under2', self.schema) |
|
337 # Old will be missing as it has been renamed into 'New' in the migrated |
|
338 # schema while New hasn't been added here. |
|
339 self.assertEqual(sorted(str(e) for e in self.schema['filed_under2'].subjects()), |
|
340 sorted(str(e) for e in self.schema.entities() |
|
341 if not e.final and e != 'Old')) |
|
342 self.assertEqual(self.schema['filed_under2'].objects(), ('Folder2',)) |
|
343 mh.cmd_drop_relation_type('filed_under2') |
|
344 self.assertNotIn('filed_under2', self.schema) |
|
345 # this should not crash |
|
346 mh.cmd_drop_relation_type('filed_under2') |
|
347 |
|
348 def test_add_relation_definition_nortype(self): |
|
349 with self.mh() as (cnx, mh): |
|
350 mh.cmd_add_relation_definition('Personne', 'concerne2', 'Affaire') |
|
351 self.assertEqual(self.schema['concerne2'].subjects(), |
|
352 ('Personne',)) |
|
353 self.assertEqual(self.schema['concerne2'].objects(), |
|
354 ('Affaire', )) |
|
355 self.assertEqual(self.schema['concerne2'].rdef('Personne', 'Affaire').cardinality, |
|
356 '1*') |
|
357 mh.cmd_add_relation_definition('Personne', 'concerne2', 'Note') |
|
358 self.assertEqual(sorted(self.schema['concerne2'].objects()), ['Affaire', 'Note']) |
|
359 mh.create_entity('Personne', nom=u'tot') |
|
360 mh.create_entity('Affaire') |
|
361 mh.rqlexec('SET X concerne2 Y WHERE X is Personne, Y is Affaire') |
|
362 cnx.commit() |
|
363 mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Affaire') |
|
364 self.assertIn('concerne2', self.schema) |
|
365 mh.cmd_drop_relation_definition('Personne', 'concerne2', 'Note') |
|
366 self.assertNotIn('concerne2', self.schema) |
|
367 |
|
368 def test_drop_relation_definition_existant_rtype(self): |
|
369 with self.mh() as (cnx, mh): |
|
370 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), |
|
371 ['Affaire', 'Personne']) |
|
372 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), |
|
373 ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) |
|
374 mh.cmd_drop_relation_definition('Personne', 'concerne', 'Affaire') |
|
375 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), |
|
376 ['Affaire']) |
|
377 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), |
|
378 ['Division', 'Note', 'Societe', 'SubDivision']) |
|
379 mh.cmd_add_relation_definition('Personne', 'concerne', 'Affaire') |
|
380 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), |
|
381 ['Affaire', 'Personne']) |
|
382 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), |
|
383 ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) |
|
384 # trick: overwrite self.maxeid to avoid deletion of just reintroduced types |
|
385 self.maxeid = cnx.execute('Any MAX(X)')[0][0] |
|
386 |
|
387 def test_drop_relation_definition_with_specialization(self): |
|
388 with self.mh() as (cnx, mh): |
|
389 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), |
|
390 ['Affaire', 'Personne']) |
|
391 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), |
|
392 ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) |
|
393 mh.cmd_drop_relation_definition('Affaire', 'concerne', 'Societe') |
|
394 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), |
|
395 ['Affaire', 'Personne']) |
|
396 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), |
|
397 ['Affaire', 'Note']) |
|
398 mh.cmd_add_relation_definition('Affaire', 'concerne', 'Societe') |
|
399 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].subjects()), |
|
400 ['Affaire', 'Personne']) |
|
401 self.assertEqual(sorted(str(e) for e in self.schema['concerne'].objects()), |
|
402 ['Affaire', 'Division', 'Note', 'Societe', 'SubDivision']) |
|
403 # trick: overwrite self.maxeid to avoid deletion of just reintroduced types |
|
404 self.maxeid = cnx.execute('Any MAX(X)')[0][0] |
|
405 |
|
406 def test_rename_relation(self): |
|
407 self.skipTest('implement me') |
|
408 |
|
409 def test_change_relation_props_non_final(self): |
|
410 with self.mh() as (cnx, mh): |
|
411 rschema = self.schema['concerne'] |
|
412 card = rschema.rdef('Affaire', 'Societe').cardinality |
|
413 self.assertEqual(card, '**') |
|
414 try: |
|
415 mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', |
|
416 cardinality='?*') |
|
417 card = rschema.rdef('Affaire', 'Societe').cardinality |
|
418 self.assertEqual(card, '?*') |
|
419 finally: |
|
420 mh.cmd_change_relation_props('Affaire', 'concerne', 'Societe', |
|
421 cardinality='**') |
|
422 |
|
423 def test_change_relation_props_final(self): |
|
424 with self.mh() as (cnx, mh): |
|
425 rschema = self.schema['adel'] |
|
426 card = rschema.rdef('Personne', 'String').fulltextindexed |
|
427 self.assertEqual(card, False) |
|
428 try: |
|
429 mh.cmd_change_relation_props('Personne', 'adel', 'String', |
|
430 fulltextindexed=True) |
|
431 card = rschema.rdef('Personne', 'String').fulltextindexed |
|
432 self.assertEqual(card, True) |
|
433 finally: |
|
434 mh.cmd_change_relation_props('Personne', 'adel', 'String', |
|
435 fulltextindexed=False) |
|
436 |
|
437 def test_sync_schema_props_perms_rqlconstraints(self): |
|
438 with self.mh() as (cnx, mh): |
|
439 # Drop one of the RQLConstraint. |
|
440 rdef = self.schema['evaluee'].rdefs[('Personne', 'Note')] |
|
441 oldconstraints = rdef.constraints |
|
442 self.assertIn('S created_by U', |
|
443 [cstr.expression for cstr in oldconstraints]) |
|
444 mh.cmd_sync_schema_props_perms('evaluee', commit=True) |
|
445 newconstraints = rdef.constraints |
|
446 self.assertNotIn('S created_by U', |
|
447 [cstr.expression for cstr in newconstraints]) |
|
448 |
|
449 # Drop all RQLConstraint. |
|
450 rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')] |
|
451 oldconstraints = rdef.constraints |
|
452 self.assertEqual(len(oldconstraints), 2) |
|
453 mh.cmd_sync_schema_props_perms('travaille', commit=True) |
|
454 rdef = self.schema['travaille'].rdefs[('Personne', 'Societe')] |
|
455 newconstraints = rdef.constraints |
|
456 self.assertEqual(len(newconstraints), 0) |
|
457 |
|
458 @tag('longrun') |
|
459 def test_sync_schema_props_perms(self): |
|
460 with self.mh() as (cnx, mh): |
|
461 nbrqlexpr_start = cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0] |
|
462 migrschema['titre'].rdefs[('Personne', 'String')].order = 7 |
|
463 migrschema['adel'].rdefs[('Personne', 'String')].order = 6 |
|
464 migrschema['ass'].rdefs[('Personne', 'String')].order = 5 |
|
465 migrschema['Personne'].description = 'blabla bla' |
|
466 migrschema['titre'].description = 'usually a title' |
|
467 migrschema['titre'].rdefs[('Personne', 'String')].description = 'title for this person' |
|
468 delete_concerne_rqlexpr = self._rrqlexpr_rset(cnx, 'delete', 'concerne') |
|
469 add_concerne_rqlexpr = self._rrqlexpr_rset(cnx, 'add', 'concerne') |
|
470 |
|
471 # make sure properties (e.g. etype descriptions) are synced by the |
|
472 # second call to sync_schema |
|
473 mh.cmd_sync_schema_props_perms(syncprops=False, commit=False) |
|
474 mh.cmd_sync_schema_props_perms(commit=False) |
|
475 |
|
476 self.assertEqual(cnx.execute('Any D WHERE X name "Personne", X description D')[0][0], |
|
477 'blabla bla') |
|
478 self.assertEqual(cnx.execute('Any D WHERE X name "titre", X description D')[0][0], |
|
479 'usually a title') |
|
480 self.assertEqual(cnx.execute('Any D WHERE X relation_type RT, RT name "titre",' |
|
481 'X from_entity FE, FE name "Personne",' |
|
482 'X description D')[0][0], |
|
483 'title for this person') |
|
484 rinorder = [n for n, in cnx.execute( |
|
485 'Any N ORDERBY O,N WHERE X is CWAttribute, X relation_type RT, RT name N,' |
|
486 'X from_entity FE, FE name "Personne",' |
|
487 'X ordernum O')] |
|
488 expected = [u'nom', u'prenom', u'sexe', u'promo', u'ass', u'adel', u'titre', |
|
489 u'web', u'tel', u'fax', u'datenaiss', u'test', u'tzdatenaiss', |
|
490 u'description', u'firstname', |
|
491 u'creation_date', u'cwuri', u'modification_date'] |
|
492 self.assertEqual(expected, rinorder) |
|
493 |
|
494 # test permissions synchronization #################################### |
|
495 # new rql expr to add note entity |
|
496 eexpr = self._erqlexpr_entity(cnx, 'add', 'Note') |
|
497 self.assertEqual(eexpr.expression, |
|
498 'X ecrit_part PE, U in_group G, ' |
|
499 'PE require_permission P, P name "add_note", P require_group G') |
|
500 self.assertEqual([et.name for et in eexpr.reverse_add_permission], ['Note']) |
|
501 self.assertEqual(eexpr.reverse_read_permission, ()) |
|
502 self.assertEqual(eexpr.reverse_delete_permission, ()) |
|
503 self.assertEqual(eexpr.reverse_update_permission, ()) |
|
504 self.assertTrue(self._rrqlexpr_rset(cnx, 'add', 'para')) |
|
505 # no rqlexpr to delete para attribute |
|
506 self.assertFalse(self._rrqlexpr_rset(cnx, 'delete', 'para')) |
|
507 # new rql expr to add ecrit_par relation |
|
508 rexpr = self._rrqlexpr_entity(cnx, 'add', 'ecrit_par') |
|
509 self.assertEqual(rexpr.expression, |
|
510 'O require_permission P, P name "add_note", ' |
|
511 'U in_group G, P require_group G') |
|
512 self.assertEqual([rdef.rtype.name for rdef in rexpr.reverse_add_permission], ['ecrit_par']) |
|
513 self.assertEqual(rexpr.reverse_read_permission, ()) |
|
514 self.assertEqual(rexpr.reverse_delete_permission, ()) |
|
515 # no more rqlexpr to delete and add travaille relation |
|
516 self.assertFalse(self._rrqlexpr_rset(cnx, 'add', 'travaille')) |
|
517 self.assertFalse(self._rrqlexpr_rset(cnx, 'delete', 'travaille')) |
|
518 # no more rqlexpr to delete and update Societe entity |
|
519 self.assertFalse(self._erqlexpr_rset(cnx, 'update', 'Societe')) |
|
520 self.assertFalse(self._erqlexpr_rset(cnx, 'delete', 'Societe')) |
|
521 # no more rqlexpr to read Affaire entity |
|
522 self.assertFalse(self._erqlexpr_rset(cnx, 'read', 'Affaire')) |
|
523 # rqlexpr to update Affaire entity has been updated |
|
524 eexpr = self._erqlexpr_entity(cnx, 'update', 'Affaire') |
|
525 self.assertEqual(eexpr.expression, 'X concerne S, S owned_by U') |
|
526 # no change for rqlexpr to add and delete Affaire entity |
|
527 self.assertEqual(len(self._erqlexpr_rset(cnx, 'delete', 'Affaire')), 1) |
|
528 self.assertEqual(len(self._erqlexpr_rset(cnx, 'add', 'Affaire')), 1) |
|
529 # no change for rqlexpr to add and delete concerne relation |
|
530 self.assertEqual(len(self._rrqlexpr_rset(cnx, 'delete', 'concerne')), |
|
531 len(delete_concerne_rqlexpr)) |
|
532 self.assertEqual(len(self._rrqlexpr_rset(cnx, 'add', 'concerne')), |
|
533 len(add_concerne_rqlexpr)) |
|
534 # * migrschema involve: |
|
535 # * 7 erqlexprs deletions (2 in (Affaire + Societe + Note.para) + 1 Note.something |
|
536 # * 2 rrqlexprs deletions (travaille) |
|
537 # * 1 update (Affaire update) |
|
538 # * 2 new (Note add, ecrit_par add) |
|
539 # * 2 implicit new for attributes (Note.para, Person.test) |
|
540 # remaining orphan rql expr which should be deleted at commit (composite relation) |
|
541 # unattached expressions -> pending deletion on commit |
|
542 self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "ERQLExpression",' |
|
543 'NOT ET1 read_permission X, NOT ET2 add_permission X, ' |
|
544 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0], |
|
545 7) |
|
546 self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression, X exprtype "RRQLExpression",' |
|
547 'NOT ET1 read_permission X, NOT ET2 add_permission X, ' |
|
548 'NOT ET3 delete_permission X, NOT ET4 update_permission X')[0][0], |
|
549 2) |
|
550 # finally |
|
551 self.assertEqual(cnx.execute('Any COUNT(X) WHERE X is RQLExpression')[0][0], |
|
552 nbrqlexpr_start + 1 + 2 + 2 + 2) |
|
553 cnx.commit() |
|
554 # unique_together test |
|
555 self.assertEqual(len(self.schema.eschema('Personne')._unique_together), 1) |
|
556 self.assertCountEqual(self.schema.eschema('Personne')._unique_together[0], |
|
557 ('nom', 'prenom', 'datenaiss')) |
|
558 rset = cnx.execute('Any C WHERE C is CWUniqueTogetherConstraint, C constraint_of ET, ET name "Personne"') |
|
559 self.assertEqual(len(rset), 1) |
|
560 relations = [r.name for r in rset.get_entity(0, 0).relations] |
|
561 self.assertCountEqual(relations, ('nom', 'prenom', 'datenaiss')) |
|
562 |
|
563 def _erqlexpr_rset(self, cnx, action, ertype): |
|
564 rql = 'RQLExpression X WHERE ET is CWEType, ET %s_permission X, ET name %%(name)s' % action |
|
565 return cnx.execute(rql, {'name': ertype}) |
|
566 |
|
567 def _erqlexpr_entity(self, cnx, action, ertype): |
|
568 rset = self._erqlexpr_rset(cnx, action, ertype) |
|
569 self.assertEqual(len(rset), 1) |
|
570 return rset.get_entity(0, 0) |
|
571 |
|
572 def _rrqlexpr_rset(self, cnx, action, ertype): |
|
573 rql = 'RQLExpression X WHERE RT is CWRType, RDEF %s_permission X, RT name %%(name)s, RDEF relation_type RT' % action |
|
574 return cnx.execute(rql, {'name': ertype}) |
|
575 |
|
576 def _rrqlexpr_entity(self, cnx, action, ertype): |
|
577 rset = self._rrqlexpr_rset(cnx, action, ertype) |
|
578 self.assertEqual(len(rset), 1) |
|
579 return rset.get_entity(0, 0) |
|
580 |
|
581 def test_set_size_constraint(self): |
|
582 with self.mh() as (cnx, mh): |
|
583 # existing previous value |
|
584 try: |
|
585 mh.cmd_set_size_constraint('CWEType', 'name', 128) |
|
586 finally: |
|
587 mh.cmd_set_size_constraint('CWEType', 'name', 64) |
|
588 # non existing previous value |
|
589 try: |
|
590 mh.cmd_set_size_constraint('CWEType', 'description', 256) |
|
591 finally: |
|
592 mh.cmd_set_size_constraint('CWEType', 'description', None) |
|
593 |
|
594 @tag('longrun') |
|
595 def test_add_drop_cube_and_deps(self): |
|
596 with self.mh() as (cnx, mh): |
|
597 schema = self.repo.schema |
|
598 self.assertEqual(sorted((str(s), str(o)) for s, o in schema['see_also'].rdefs), |
|
599 sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), |
|
600 ('Bookmark', 'Bookmark'), ('Bookmark', 'Note'), |
|
601 ('Note', 'Note'), ('Note', 'Bookmark')])) |
|
602 try: |
|
603 mh.cmd_drop_cube('fakeemail', removedeps=True) |
|
604 # file was there because it's an email dependancy, should have been removed |
|
605 self.assertNotIn('fakeemail', self.config.cubes()) |
|
606 self.assertNotIn(self.config.cube_dir('fakeemail'), self.config.cubes_path()) |
|
607 self.assertNotIn('file', self.config.cubes()) |
|
608 self.assertNotIn(self.config.cube_dir('file'), self.config.cubes_path()) |
|
609 for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', |
|
610 'sender', 'in_thread', 'reply_to', 'data_format'): |
|
611 self.assertNotIn(ertype, schema) |
|
612 self.assertEqual(sorted(schema['see_also'].rdefs), |
|
613 sorted([('Folder', 'Folder'), |
|
614 ('Bookmark', 'Bookmark'), |
|
615 ('Bookmark', 'Note'), |
|
616 ('Note', 'Note'), |
|
617 ('Note', 'Bookmark')])) |
|
618 self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'Folder', 'Note']) |
|
619 self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'Folder', 'Note']) |
|
620 self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.fakeemail"').rowcount, 0) |
|
621 self.assertEqual(cnx.execute('Any X WHERE X pkey "system.version.file"').rowcount, 0) |
|
622 finally: |
|
623 mh.cmd_add_cube('fakeemail') |
|
624 self.assertIn('fakeemail', self.config.cubes()) |
|
625 self.assertIn(self.config.cube_dir('fakeemail'), self.config.cubes_path()) |
|
626 self.assertIn('file', self.config.cubes()) |
|
627 self.assertIn(self.config.cube_dir('file'), self.config.cubes_path()) |
|
628 for ertype in ('Email', 'EmailThread', 'EmailPart', 'File', |
|
629 'sender', 'in_thread', 'reply_to', 'data_format'): |
|
630 self.assertIn(ertype, schema) |
|
631 self.assertEqual(sorted(schema['see_also'].rdefs), |
|
632 sorted([('EmailThread', 'EmailThread'), ('Folder', 'Folder'), |
|
633 ('Bookmark', 'Bookmark'), |
|
634 ('Bookmark', 'Note'), |
|
635 ('Note', 'Note'), |
|
636 ('Note', 'Bookmark')])) |
|
637 self.assertEqual(sorted(schema['see_also'].subjects()), ['Bookmark', 'EmailThread', 'Folder', 'Note']) |
|
638 self.assertEqual(sorted(schema['see_also'].objects()), ['Bookmark', 'EmailThread', 'Folder', 'Note']) |
|
639 from cubes.fakeemail.__pkginfo__ import version as email_version |
|
640 from cubes.file.__pkginfo__ import version as file_version |
|
641 self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.fakeemail"')[0][0], |
|
642 email_version) |
|
643 self.assertEqual(cnx.execute('Any V WHERE X value V, X pkey "system.version.file"')[0][0], |
|
644 file_version) |
|
645 # trick: overwrite self.maxeid to avoid deletion of just reintroduced |
|
646 # types (and their associated tables!) |
|
647 self.maxeid = cnx.execute('Any MAX(X)')[0][0] |
|
648 # why this commit is necessary is unclear to me (though without it |
|
649 # next test may fail complaining of missing tables |
|
650 cnx.commit() |
|
651 |
|
652 |
|
653 @tag('longrun') |
|
654 def test_add_drop_cube_no_deps(self): |
|
655 with self.mh() as (cnx, mh): |
|
656 cubes = set(self.config.cubes()) |
|
657 schema = self.repo.schema |
|
658 try: |
|
659 mh.cmd_drop_cube('fakeemail') |
|
660 cubes.remove('fakeemail') |
|
661 self.assertNotIn('fakeemail', self.config.cubes()) |
|
662 self.assertIn('file', self.config.cubes()) |
|
663 for ertype in ('Email', 'EmailThread', 'EmailPart', |
|
664 'sender', 'in_thread', 'reply_to'): |
|
665 self.assertNotIn(ertype, schema) |
|
666 finally: |
|
667 mh.cmd_add_cube('fakeemail') |
|
668 self.assertIn('fakeemail', self.config.cubes()) |
|
669 # trick: overwrite self.maxeid to avoid deletion of just reintroduced |
|
670 # types (and their associated tables!) |
|
671 self.maxeid = cnx.execute('Any MAX(X)')[0][0] # XXXXXXX KILL KENNY |
|
672 # why this commit is necessary is unclear to me (though without it |
|
673 # next test may fail complaining of missing tables |
|
674 cnx.commit() |
|
675 |
|
676 def test_drop_dep_cube(self): |
|
677 with self.mh() as (cnx, mh): |
|
678 with self.assertRaises(ConfigurationError) as cm: |
|
679 mh.cmd_drop_cube('file') |
|
680 self.assertEqual(str(cm.exception), "can't remove cube file, used as a dependency") |
|
681 |
|
682 @tag('longrun') |
|
683 def test_introduce_base_class(self): |
|
684 with self.mh() as (cnx, mh): |
|
685 mh.cmd_add_entity_type('Para') |
|
686 self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), |
|
687 ['Note']) |
|
688 self.assertEqual(self.schema['Note'].specializes().type, 'Para') |
|
689 mh.cmd_add_entity_type('Text') |
|
690 self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), |
|
691 ['Note', 'Text']) |
|
692 self.assertEqual(self.schema['Text'].specializes().type, 'Para') |
|
693 # test columns have been actually added |
|
694 text = cnx.execute('INSERT Text X: X para "hip", X summary "hop", X newattr "momo"').get_entity(0, 0) |
|
695 note = cnx.execute('INSERT Note X: X para "hip", X shortpara "hop", X newattr "momo", X unique_id "x"').get_entity(0, 0) |
|
696 aff = cnx.execute('INSERT Affaire X').get_entity(0, 0) |
|
697 self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', |
|
698 {'x': text.eid, 'y': aff.eid})) |
|
699 self.assertTrue(cnx.execute('SET X newnotinlined Y WHERE X eid %(x)s, Y eid %(y)s', |
|
700 {'x': note.eid, 'y': aff.eid})) |
|
701 self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', |
|
702 {'x': text.eid, 'y': aff.eid})) |
|
703 self.assertTrue(cnx.execute('SET X newinlined Y WHERE X eid %(x)s, Y eid %(y)s', |
|
704 {'x': note.eid, 'y': aff.eid})) |
|
705 # XXX remove specializes by ourselves, else tearDown fails when removing |
|
706 # Para because of Note inheritance. This could be fixed by putting the |
|
707 # MemSchemaCWETypeDel(session, name) operation in the |
|
708 # after_delete_entity(CWEType) hook, since in that case the MemSchemaSpecializesDel |
|
709 # operation would be removed before, but I'm not sure this is a desired behaviour. |
|
710 # |
|
711 # also we need more tests about introducing/removing base classes or |
|
712 # specialization relationship... |
|
713 cnx.execute('DELETE X specializes Y WHERE Y name "Para"') |
|
714 cnx.commit() |
|
715 self.assertEqual(sorted(et.type for et in self.schema['Para'].specialized_by()), |
|
716 []) |
|
717 self.assertEqual(self.schema['Note'].specializes(), None) |
|
718 self.assertEqual(self.schema['Text'].specializes(), None) |
|
719 |
|
720 def test_add_symmetric_relation_type(self): |
|
721 with self.mh() as (cnx, mh): |
|
722 self.assertFalse(self.table_sql(mh, 'same_as_relation')) |
|
723 mh.cmd_add_relation_type('same_as') |
|
724 self.assertTrue(self.table_sql(mh, 'same_as_relation')) |
|
725 |
|
726 def test_change_attribute_type(self): |
|
727 with self.mh() as (cnx, mh): |
|
728 mh.cmd_create_entity('Societe', tel=1) |
|
729 mh.commit() |
|
730 mh.change_attribute_type('Societe', 'tel', 'Float') |
|
731 self.assertNotIn(('Societe', 'Int'), self.schema['tel'].rdefs) |
|
732 self.assertIn(('Societe', 'Float'), self.schema['tel'].rdefs) |
|
733 self.assertEqual(self.schema['tel'].rdefs[('Societe', 'Float')].object, 'Float') |
|
734 tel = mh.rqlexec('Any T WHERE X tel T')[0][0] |
|
735 self.assertEqual(tel, 1.0) |
|
736 self.assertIsInstance(tel, float) |
|
737 |
|
738 def test_drop_required_inlined_relation(self): |
|
739 with self.mh() as (cnx, mh): |
|
740 bob = mh.cmd_create_entity('Personne', nom=u'bob') |
|
741 note = mh.cmd_create_entity('Note', ecrit_par=bob) |
|
742 mh.commit() |
|
743 rdef = mh.fs_schema.rschema('ecrit_par').rdefs[('Note', 'Personne')] |
|
744 with tempattr(rdef, 'cardinality', '1*'): |
|
745 mh.sync_schema_props_perms('ecrit_par', syncperms=False) |
|
746 mh.cmd_drop_relation_type('ecrit_par') |
|
747 self.assertNotIn('%secrit_par' % SQL_PREFIX, |
|
748 self.table_schema(mh, '%sPersonne' % SQL_PREFIX)) |
|
749 |
|
750 def test_drop_inlined_rdef_delete_data(self): |
|
751 with self.mh() as (cnx, mh): |
|
752 note = mh.cmd_create_entity('Note', ecrit_par=cnx.user.eid) |
|
753 mh.commit() |
|
754 mh.drop_relation_definition('Note', 'ecrit_par', 'CWUser') |
|
755 self.assertFalse(mh.sqlexec('SELECT * FROM cw_Note WHERE cw_ecrit_par IS NOT NULL')) |
|
756 |
|
757 class MigrationCommandsComputedTC(MigrationTC): |
|
758 """ Unit tests for computed relations and attributes |
|
759 """ |
|
760 appid = 'datacomputed' |
|
761 |
|
762 def setUp(self): |
|
763 MigrationTC.setUp(self) |
|
764 # ensure vregistry is reloaded, needed by generated hooks for computed |
|
765 # attributes |
|
766 self.repo.vreg.set_schema(self.repo.schema) |
|
767 |
|
768 def test_computed_relation_add_relation_definition(self): |
|
769 self.assertNotIn('works_for', self.schema) |
|
770 with self.mh() as (cnx, mh): |
|
771 with self.assertRaises(ExecutionError) as exc: |
|
772 mh.cmd_add_relation_definition('Employee', 'works_for', 'Company') |
|
773 self.assertEqual(str(exc.exception), |
|
774 'Cannot add a relation definition for a computed ' |
|
775 'relation (works_for)') |
|
776 |
|
777 def test_computed_relation_drop_relation_definition(self): |
|
778 self.assertIn('notes', self.schema) |
|
779 with self.mh() as (cnx, mh): |
|
780 with self.assertRaises(ExecutionError) as exc: |
|
781 mh.cmd_drop_relation_definition('Company', 'notes', 'Note') |
|
782 self.assertEqual(str(exc.exception), |
|
783 'Cannot drop a relation definition for a computed ' |
|
784 'relation (notes)') |
|
785 |
|
786 def test_computed_relation_add_relation_type(self): |
|
787 self.assertNotIn('works_for', self.schema) |
|
788 with self.mh() as (cnx, mh): |
|
789 mh.cmd_add_relation_type('works_for') |
|
790 self.assertIn('works_for', self.schema) |
|
791 self.assertEqual(self.schema['works_for'].rule, |
|
792 'O employees S, NOT EXISTS (O associates S)') |
|
793 self.assertEqual(self.schema['works_for'].objects(), ('Company',)) |
|
794 self.assertEqual(self.schema['works_for'].subjects(), ('Employee',)) |
|
795 self.assertFalse(self.table_sql(mh, 'works_for_relation')) |
|
796 e = cnx.create_entity('Employee') |
|
797 a = cnx.create_entity('Employee') |
|
798 cnx.create_entity('Company', employees=e, associates=a) |
|
799 cnx.commit() |
|
800 company = cnx.execute('Company X').get_entity(0, 0) |
|
801 self.assertEqual([e.eid], |
|
802 [x.eid for x in company.reverse_works_for]) |
|
803 mh.rollback() |
|
804 |
|
805 def test_computed_relation_drop_relation_type(self): |
|
806 self.assertIn('notes', self.schema) |
|
807 with self.mh() as (cnx, mh): |
|
808 mh.cmd_drop_relation_type('notes') |
|
809 self.assertNotIn('notes', self.schema) |
|
810 |
|
811 def test_computed_relation_sync_schema_props_perms(self): |
|
812 self.assertIn('whatever', self.schema) |
|
813 with self.mh() as (cnx, mh): |
|
814 mh.cmd_sync_schema_props_perms('whatever') |
|
815 self.assertEqual(self.schema['whatever'].rule, |
|
816 'S employees E, O associates E') |
|
817 self.assertEqual(self.schema['whatever'].objects(), ('Company',)) |
|
818 self.assertEqual(self.schema['whatever'].subjects(), ('Company',)) |
|
819 self.assertFalse(self.table_sql(mh, 'whatever_relation')) |
|
820 |
|
821 def test_computed_relation_sync_schema_props_perms_security(self): |
|
822 with self.mh() as (cnx, mh): |
|
823 rdef = next(iter(self.schema['perm_changes'].rdefs.values())) |
|
824 self.assertEqual(rdef.permissions, |
|
825 {'add': (), 'delete': (), |
|
826 'read': ('managers', 'users')}) |
|
827 mh.cmd_sync_schema_props_perms('perm_changes') |
|
828 self.assertEqual(self.schema['perm_changes'].permissions, |
|
829 {'read': ('managers',)}) |
|
830 rdef = next(iter(self.schema['perm_changes'].rdefs.values())) |
|
831 self.assertEqual(rdef.permissions, |
|
832 {'add': (), 'delete': (), |
|
833 'read': ('managers',)}) |
|
834 |
|
835 def test_computed_relation_sync_schema_props_perms_on_rdef(self): |
|
836 self.assertIn('whatever', self.schema) |
|
837 with self.mh() as (cnx, mh): |
|
838 with self.assertRaises(ExecutionError) as exc: |
|
839 mh.cmd_sync_schema_props_perms( |
|
840 ('Company', 'whatever', 'Person')) |
|
841 self.assertEqual(str(exc.exception), |
|
842 'Cannot synchronize a relation definition for a computed ' |
|
843 'relation (whatever)') |
|
844 |
|
845 def test_computed_relation_rename_relation_type(self): |
|
846 with self.mh() as (cnx, mh): |
|
847 mh.cmd_rename_relation_type('to_be_renamed', 'renamed') |
|
848 self.assertIn('renamed', self.schema) |
|
849 self.assertNotIn('to_be_renamed', self.schema) |
|
850 |
|
851 # computed attributes migration ############################################ |
|
852 |
|
853 def setup_add_score(self): |
|
854 with self.admin_access.client_cnx() as cnx: |
|
855 assert not cnx.execute('Company X') |
|
856 c = cnx.create_entity('Company') |
|
857 e1 = cnx.create_entity('Employee', reverse_employees=c) |
|
858 cnx.create_entity('Note', note=2, concerns=e1) |
|
859 e2 = cnx.create_entity('Employee', reverse_employees=c) |
|
860 cnx.create_entity('Note', note=4, concerns=e2) |
|
861 cnx.commit() |
|
862 |
|
863 def assert_score_initialized(self, mh): |
|
864 self.assertEqual(self.schema['score'].rdefs['Company', 'Float'].formula, |
|
865 'Any AVG(NN) WHERE X employees E, N concerns E, N note NN') |
|
866 fields = self.table_schema(mh, '%sCompany' % SQL_PREFIX) |
|
867 self.assertEqual(fields['%sscore' % SQL_PREFIX], ('double precision', None)) |
|
868 self.assertEqual([[3.0]], |
|
869 mh.rqlexec('Any CS WHERE C score CS, C is Company').rows) |
|
870 |
|
871 def test_computed_attribute_add_relation_type(self): |
|
872 self.assertNotIn('score', self.schema) |
|
873 self.setup_add_score() |
|
874 with self.mh() as (cnx, mh): |
|
875 mh.cmd_add_relation_type('score') |
|
876 self.assertIn('score', self.schema) |
|
877 self.assertEqual(self.schema['score'].objects(), ('Float',)) |
|
878 self.assertEqual(self.schema['score'].subjects(), ('Company',)) |
|
879 self.assert_score_initialized(mh) |
|
880 |
|
881 def test_computed_attribute_add_attribute(self): |
|
882 self.assertNotIn('score', self.schema) |
|
883 self.setup_add_score() |
|
884 with self.mh() as (cnx, mh): |
|
885 mh.cmd_add_attribute('Company', 'score') |
|
886 self.assertIn('score', self.schema) |
|
887 self.assert_score_initialized(mh) |
|
888 |
|
889 def assert_computed_attribute_dropped(self): |
|
890 self.assertNotIn('note20', self.schema) |
|
891 with self.mh() as (cnx, mh): |
|
892 fields = self.table_schema(mh, '%sNote' % SQL_PREFIX) |
|
893 self.assertNotIn('%snote20' % SQL_PREFIX, fields) |
|
894 |
|
895 def test_computed_attribute_drop_type(self): |
|
896 self.assertIn('note20', self.schema) |
|
897 with self.mh() as (cnx, mh): |
|
898 mh.cmd_drop_relation_type('note20') |
|
899 self.assert_computed_attribute_dropped() |
|
900 |
|
901 def test_computed_attribute_drop_relation_definition(self): |
|
902 self.assertIn('note20', self.schema) |
|
903 with self.mh() as (cnx, mh): |
|
904 mh.cmd_drop_relation_definition('Note', 'note20', 'Int') |
|
905 self.assert_computed_attribute_dropped() |
|
906 |
|
907 def test_computed_attribute_drop_attribute(self): |
|
908 self.assertIn('note20', self.schema) |
|
909 with self.mh() as (cnx, mh): |
|
910 mh.cmd_drop_attribute('Note', 'note20') |
|
911 self.assert_computed_attribute_dropped() |
|
912 |
|
913 def test_computed_attribute_sync_schema_props_perms_rtype(self): |
|
914 self.assertIn('note100', self.schema) |
|
915 with self.mh() as (cnx, mh): |
|
916 mh.cmd_sync_schema_props_perms('note100') |
|
917 rdef = self.schema['note100'].rdefs['Note', 'Int'] |
|
918 self.assertEqual(rdef.formula_select.as_string(), |
|
919 'Any (N * 100) WHERE X note N, X is Note') |
|
920 self.assertEqual(rdef.formula, 'Any N*100 WHERE X note N') |
|
921 |
|
922 def test_computed_attribute_sync_schema_props_perms_rdef(self): |
|
923 self.setup_add_score() |
|
924 with self.mh() as (cnx, mh): |
|
925 mh.cmd_sync_schema_props_perms(('Note', 'note100', 'Int')) |
|
926 self.assertEqual([[200], [400]], |
|
927 cnx.execute('Any N ORDERBY N WHERE X note100 N').rows) |
|
928 self.assertEqual([[300]], |
|
929 cnx.execute('Any CS WHERE C score100 CS, C is Company').rows) |
|
930 |
|
931 |
|
932 if __name__ == '__main__': |
|
933 unittest_main() |
|