hooks/test/unittest_syncschema.py
branchstable
changeset 6340 470d8e828fda
parent 5891 99024ad59223
child 6410 2e7a7b0829ed
equal deleted inserted replaced
6339:bdc3dc94d744 6340:470d8e828fda
    96         # now we should be able to insert and query Societe2
    96         # now we should be able to insert and query Societe2
    97         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
    97         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
    98         self.execute('Societe2 X WHERE X name "logilab"')
    98         self.execute('Societe2 X WHERE X name "logilab"')
    99         self.execute('SET X concerne2 X WHERE X name "logilab"')
    99         self.execute('SET X concerne2 X WHERE X name "logilab"')
   100         rset = self.execute('Any X WHERE X concerne2 Y')
   100         rset = self.execute('Any X WHERE X concerne2 Y')
   101         self.assertEquals(rset.rows, [[s2eid]])
   101         self.assertEqual(rset.rows, [[s2eid]])
   102         # check that when a relation definition is deleted, existing relations are deleted
   102         # check that when a relation definition is deleted, existing relations are deleted
   103         rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, '
   103         rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, '
   104                                '   X from_entity E, X to_entity E '
   104                                '   X from_entity E, X to_entity E '
   105                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
   105                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
   106         self._set_perms(rdefeid)
   106         self._set_perms(rdefeid)
   123         self.failIf('concerne2' in schema['CWUser'].subject_relations())
   123         self.failIf('concerne2' in schema['CWUser'].subject_relations())
   124 
   124 
   125     def test_is_instance_of_insertions(self):
   125     def test_is_instance_of_insertions(self):
   126         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
   126         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
   127         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
   127         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
   128         self.assertEquals(is_etypes, ['Transition'])
   128         self.assertEqual(is_etypes, ['Transition'])
   129         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
   129         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
   130         self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
   130         self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
   131         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
   131         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
   132         self.failIf('subdiv' in snames)
   132         self.failIf('subdiv' in snames)
   133         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
   133         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
   134         self.failUnless('subdiv' in snames)
   134         self.failUnless('subdiv' in snames)
   135 
   135 
   136 
   136 
   137     def test_perms_synchronization_1(self):
   137     def test_perms_synchronization_1(self):
   138         schema = self.repo.schema
   138         schema = self.repo.schema
   139         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
   139         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
   140         self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
   140         self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
   141         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   141         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   142         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
   142         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
   143         self.commit()
   143         self.commit()
   144         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers',)))
   144         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',)))
   145         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   145         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   146         self.commit()
   146         self.commit()
   147         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',)))
   147         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users',)))
   148 
   148 
   149     def test_perms_synchronization_2(self):
   149     def test_perms_synchronization_2(self):
   150         schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')]
   150         schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')]
   151         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   151         self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   152         self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   152         self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   153         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   153         self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   154         self.commit()
   154         self.commit()
   155         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
   155         self.assertEqual(schema.get_groups('read'), set(('managers', 'users')))
   156         self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   156         self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   157         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
   157         self.assertEqual(schema.get_groups('read'), set(('managers', 'users')))
   158         self.commit()
   158         self.commit()
   159         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   159         self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   160 
   160 
   161     def test_nonregr_user_edit_itself(self):
   161     def test_nonregr_user_edit_itself(self):
   162         ueid = self.session.user.eid
   162         ueid = self.session.user.eid
   163         groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')]
   163         groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')]
   164         self.execute('DELETE X in_group Y WHERE X eid %s' % ueid)
   164         self.execute('DELETE X in_group Y WHERE X eid %s' % ueid)
   185             self.failUnless(self.schema['state_of'].inlined)
   185             self.failUnless(self.schema['state_of'].inlined)
   186             self.commit()
   186             self.commit()
   187             self.failIf(self.schema['state_of'].inlined)
   187             self.failIf(self.schema['state_of'].inlined)
   188             self.failIf(self.index_exists('State', 'state_of'))
   188             self.failIf(self.index_exists('State', 'state_of'))
   189             rset = self.execute('Any X, Y WHERE X state_of Y')
   189             rset = self.execute('Any X, Y WHERE X state_of Y')
   190             self.assertEquals(len(rset), 2) # user states
   190             self.assertEqual(len(rset), 2) # user states
   191         except:
   191         except:
   192             import traceback
   192             import traceback
   193             traceback.print_exc()
   193             traceback.print_exc()
   194         finally:
   194         finally:
   195             self.execute('SET X inlined TRUE WHERE X name "state_of"')
   195             self.execute('SET X inlined TRUE WHERE X name "state_of"')
   196             self.failIf(self.schema['state_of'].inlined)
   196             self.failIf(self.schema['state_of'].inlined)
   197             self.commit()
   197             self.commit()
   198             self.failUnless(self.schema['state_of'].inlined)
   198             self.failUnless(self.schema['state_of'].inlined)
   199             self.failUnless(self.index_exists('State', 'state_of'))
   199             self.failUnless(self.index_exists('State', 'state_of'))
   200             rset = self.execute('Any X, Y WHERE X state_of Y')
   200             rset = self.execute('Any X, Y WHERE X state_of Y')
   201             self.assertEquals(len(rset), 2)
   201             self.assertEqual(len(rset), 2)
   202 
   202 
   203     def test_indexed_change(self):
   203     def test_indexed_change(self):
   204         self.session.set_pool()
   204         self.session.set_pool()
   205         dbhelper = self.session.pool.source('system').dbhelper
   205         dbhelper = self.session.pool.source('system').dbhelper
   206         sqlcursor = self.session.pool['system']
   206         sqlcursor = self.session.pool['system']
   315 
   315 
   316     def test_update_constraint(self):
   316     def test_update_constraint(self):
   317         rdef = self.schema['Transition'].rdef('type')
   317         rdef = self.schema['Transition'].rdef('type')
   318         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   318         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   319         if not getattr(cstr, 'eid', None):
   319         if not getattr(cstr, 'eid', None):
   320             self.skip('start me alone') # bug in schema reloading, constraint's eid not restored
   320             self.skipTest('start me alone') # bug in schema reloading, constraint's eid not restored
   321         self.execute('SET X value %(v)s WHERE X eid %(x)s',
   321         self.execute('SET X value %(v)s WHERE X eid %(x)s',
   322                      {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"})
   322                      {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"})
   323         self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X '
   323         self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X '
   324                      'WHERE CT name %(ct)s, EDEF eid %(x)s',
   324                      'WHERE CT name %(ct)s, EDEF eid %(x)s',
   325                      {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid})
   325                      {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid})
   326         self.commit()
   326         self.commit()
   327         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   327         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   328         self.assertEquals(cstr.values, (u'normal', u'auto', u'new'))
   328         self.assertEqual(cstr.values, (u'normal', u'auto', u'new'))
   329         self.execute('INSERT Transition T: T name "hop", T type "new"')
   329         self.execute('INSERT Transition T: T name "hop", T type "new"')
   330 
   330 
   331 if __name__ == '__main__':
   331 if __name__ == '__main__':
   332     unittest_main()
   332     unittest_main()