268 except ValidationError, ex: |
268 except ValidationError, ex: |
269 self.assertIsInstance(ex.entity, int) |
269 self.assertIsInstance(ex.entity, int) |
270 self.assertEquals(ex.errors, {'login': 'the value "admin" is already used, use another one'}) |
270 self.assertEquals(ex.errors, {'login': 'the value "admin" is already used, use another one'}) |
271 |
271 |
272 |
272 |
273 class SchemaModificationHooksTC(CubicWebTC): |
|
274 |
|
275 @classmethod |
|
276 def init_config(cls, config): |
|
277 super(SchemaModificationHooksTC, cls).init_config(config) |
|
278 config._cubes = None |
|
279 cls.repo.fill_schema() |
|
280 |
|
281 def index_exists(self, etype, attr, unique=False): |
|
282 self.session.set_pool() |
|
283 dbhelper = self.session.pool.source('system').dbhelper |
|
284 sqlcursor = self.session.pool['system'] |
|
285 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
|
286 |
|
287 def _set_perms(self, eid): |
|
288 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
|
289 {'x': eid}, 'x') |
|
290 self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
|
291 {'x': eid}, 'x') |
|
292 self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"', |
|
293 {'x': eid}, 'x') |
|
294 |
|
295 def _set_attr_perms(self, eid): |
|
296 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
|
297 {'x': eid}, 'x') |
|
298 self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
|
299 {'x': eid}, 'x') |
|
300 |
|
301 def test_base(self): |
|
302 schema = self.repo.schema |
|
303 self.session.set_pool() |
|
304 dbhelper = self.session.pool.source('system').dbhelper |
|
305 sqlcursor = self.session.pool['system'] |
|
306 self.failIf(schema.has_entity('Societe2')) |
|
307 self.failIf(schema.has_entity('concerne2')) |
|
308 # schema should be update on insertion (after commit) |
|
309 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
|
310 self._set_perms(eeid) |
|
311 self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE') |
|
312 self.failIf(schema.has_entity('Societe2')) |
|
313 self.failIf(schema.has_entity('concerne2')) |
|
314 # have to commit before adding definition relations |
|
315 self.commit() |
|
316 self.failUnless(schema.has_entity('Societe2')) |
|
317 self.failUnless(schema.has_relation('concerne2')) |
|
318 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", ' |
|
319 ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
|
320 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0] |
|
321 self._set_attr_perms(attreid) |
|
322 concerne2_rdef_eid = self.execute( |
|
323 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
|
324 'WHERE RT name "concerne2", E name "Societe2"')[0][0] |
|
325 self._set_perms(concerne2_rdef_eid) |
|
326 self.failIf('name' in schema['Societe2'].subject_relations()) |
|
327 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
|
328 self.failIf(self.index_exists('Societe2', 'name')) |
|
329 self.commit() |
|
330 self.failUnless('name' in schema['Societe2'].subject_relations()) |
|
331 self.failUnless('concerne2' in schema['Societe2'].subject_relations()) |
|
332 self.failUnless(self.index_exists('Societe2', 'name')) |
|
333 # now we should be able to insert and query Societe2 |
|
334 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
|
335 self.execute('Societe2 X WHERE X name "logilab"') |
|
336 self.execute('SET X concerne2 X WHERE X name "logilab"') |
|
337 rset = self.execute('Any X WHERE X concerne2 Y') |
|
338 self.assertEquals(rset.rows, [[s2eid]]) |
|
339 # check that when a relation definition is deleted, existing relations are deleted |
|
340 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
|
341 ' X from_entity E, X to_entity E ' |
|
342 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
|
343 self._set_perms(rdefeid) |
|
344 self.commit() |
|
345 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x') |
|
346 self.commit() |
|
347 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
|
348 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
|
349 self.failIf(self.execute('Any X WHERE X concerne2 Y')) |
|
350 # schema should be cleaned on delete (after commit) |
|
351 self.execute('DELETE CWEType X WHERE X name "Societe2"') |
|
352 self.execute('DELETE CWRType X WHERE X name "concerne2"') |
|
353 self.failUnless(self.index_exists('Societe2', 'name')) |
|
354 self.failUnless(schema.has_entity('Societe2')) |
|
355 self.failUnless(schema.has_relation('concerne2')) |
|
356 self.commit() |
|
357 self.failIf(self.index_exists('Societe2', 'name')) |
|
358 self.failIf(schema.has_entity('Societe2')) |
|
359 self.failIf(schema.has_entity('concerne2')) |
|
360 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
|
361 |
|
362 def test_is_instance_of_insertions(self): |
|
363 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
|
364 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
|
365 self.assertEquals(is_etypes, ['Transition']) |
|
366 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
|
367 self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
|
368 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
|
369 self.failIf('subdiv' in snames) |
|
370 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
|
371 self.failUnless('subdiv' in snames) |
|
372 |
|
373 |
|
374 def test_perms_synchronization_1(self): |
|
375 schema = self.repo.schema |
|
376 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
|
377 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
|
378 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
|
379 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
|
380 self.commit() |
|
381 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', ))) |
|
382 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
|
383 self.commit() |
|
384 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',))) |
|
385 |
|
386 def test_perms_synchronization_2(self): |
|
387 schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')] |
|
388 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
|
389 self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
|
390 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
|
391 self.commit() |
|
392 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
|
393 self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
|
394 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
|
395 self.commit() |
|
396 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
|
397 |
|
398 def test_nonregr_user_edit_itself(self): |
|
399 ueid = self.session.user.eid |
|
400 groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] |
|
401 self.execute('DELETE X in_group Y WHERE X eid %s' % ueid) |
|
402 self.execute('SET X surname "toto" WHERE X eid %s' % ueid) |
|
403 self.execute('SET X in_group Y WHERE X eid %s, Y name "managers"' % ueid) |
|
404 self.commit() |
|
405 eeid = self.execute('Any X WHERE X is CWEType, X name "CWEType"')[0][0] |
|
406 self.execute('DELETE X read_permission Y WHERE X eid %s' % eeid) |
|
407 self.execute('SET X final FALSE WHERE X eid %s' % eeid) |
|
408 self.execute('SET X read_permission Y WHERE X eid %s, Y eid in (%s, %s)' |
|
409 % (eeid, groupeids[0], groupeids[1])) |
|
410 self.commit() |
|
411 self.execute('Any X WHERE X is CWEType, X name "CWEType"') |
|
412 |
|
413 # schema modification hooks tests ######################################### |
|
414 |
|
415 def test_uninline_relation(self): |
|
416 self.session.set_pool() |
|
417 dbhelper = self.session.pool.source('system').dbhelper |
|
418 sqlcursor = self.session.pool['system'] |
|
419 self.failUnless(self.schema['state_of'].inlined) |
|
420 try: |
|
421 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
|
422 self.failUnless(self.schema['state_of'].inlined) |
|
423 self.commit() |
|
424 self.failIf(self.schema['state_of'].inlined) |
|
425 self.failIf(self.index_exists('State', 'state_of')) |
|
426 rset = self.execute('Any X, Y WHERE X state_of Y') |
|
427 self.assertEquals(len(rset), 2) # user states |
|
428 finally: |
|
429 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
|
430 self.failIf(self.schema['state_of'].inlined) |
|
431 self.commit() |
|
432 self.failUnless(self.schema['state_of'].inlined) |
|
433 self.failUnless(self.index_exists('State', 'state_of')) |
|
434 rset = self.execute('Any X, Y WHERE X state_of Y') |
|
435 self.assertEquals(len(rset), 2) |
|
436 |
|
437 def test_indexed_change(self): |
|
438 self.session.set_pool() |
|
439 dbhelper = self.session.pool.source('system').dbhelper |
|
440 sqlcursor = self.session.pool['system'] |
|
441 try: |
|
442 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
|
443 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
444 self.failUnless(self.index_exists('Workflow', 'name')) |
|
445 self.commit() |
|
446 self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
447 self.failIf(self.index_exists('Workflow', 'name')) |
|
448 finally: |
|
449 self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"') |
|
450 self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
451 self.failIf(self.index_exists('Workflow', 'name')) |
|
452 self.commit() |
|
453 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
454 self.failUnless(self.index_exists('Workflow', 'name')) |
|
455 |
|
456 def test_unique_change(self): |
|
457 self.session.set_pool() |
|
458 dbhelper = self.session.pool.source('system').dbhelper |
|
459 sqlcursor = self.session.pool['system'] |
|
460 try: |
|
461 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
|
462 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
|
463 'RT name "name", E name "Workflow"') |
|
464 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
|
465 self.failIf(self.index_exists('Workflow', 'name', unique=True)) |
|
466 self.commit() |
|
467 self.failUnless(self.schema['Workflow'].has_unique_values('name')) |
|
468 self.failUnless(self.index_exists('Workflow', 'name', unique=True)) |
|
469 finally: |
|
470 self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, ' |
|
471 'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
|
472 'RT name "name", E name "Workflow"') |
|
473 self.failUnless(self.schema['Workflow'].has_unique_values('name')) |
|
474 self.failUnless(self.index_exists('Workflow', 'name', unique=True)) |
|
475 self.commit() |
|
476 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
|
477 self.failIf(self.index_exists('Workflow', 'name', unique=True)) |
|
478 |
|
479 def test_required_change_1(self): |
|
480 self.execute('SET DEF cardinality "?1" ' |
|
481 'WHERE DEF relation_type RT, DEF from_entity E,' |
|
482 'RT name "title", E name "Bookmark"') |
|
483 self.commit() |
|
484 # should now be able to add bookmark without title |
|
485 self.execute('INSERT Bookmark X: X path "/view"') |
|
486 self.commit() |
|
487 |
|
488 def test_required_change_2(self): |
|
489 self.execute('SET DEF cardinality "11" ' |
|
490 'WHERE DEF relation_type RT, DEF from_entity E,' |
|
491 'RT name "surname", E name "CWUser"') |
|
492 self.commit() |
|
493 # should not be able anymore to add cwuser without surname |
|
494 self.assertRaises(ValidationError, self.create_user, "toto") |
|
495 self.execute('SET DEF cardinality "?1" ' |
|
496 'WHERE DEF relation_type RT, DEF from_entity E,' |
|
497 'RT name "surname", E name "CWUser"') |
|
498 self.commit() |
|
499 |
|
500 |
|
501 def test_add_attribute_to_base_class(self): |
|
502 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
|
503 'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0] |
|
504 assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
|
505 {'x': attreid}, 'x') |
|
506 self.commit() |
|
507 self.schema.rebuild_infered_relations() |
|
508 self.failUnless('Transition' in self.schema['messageid'].subjects()) |
|
509 self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects()) |
|
510 self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"') |
|
511 |
|
512 if __name__ == '__main__': |
273 if __name__ == '__main__': |
513 unittest_main() |
274 unittest_main() |