hooks/test/unittest_syncschema.py
brancholdstable
changeset 6665 90f2f20367bc
parent 6412 370357e68837
child 6781 5062d86d6ffe
equal deleted inserted replaced
6018:f4d1d5d9ccbb 6665:90f2f20367bc
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
    14 # details.
    14 # details.
    15 #
    15 #
    16 # You should have received a copy of the GNU Lesser General Public License along
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """cubicweb.server.hooks.syncschema unit and functional tests"""
       
    19 
    18 from logilab.common.testlib import TestCase, unittest_main
    20 from logilab.common.testlib import TestCase, unittest_main
    19 
    21 
    20 from cubicweb import ValidationError
    22 from cubicweb import ValidationError
    21 from cubicweb.devtools.testlib import CubicWebTC
    23 from cubicweb.devtools.testlib import CubicWebTC
    22 from cubicweb.server.sqlutils import SQL_PREFIX
    24 from cubicweb.server.sqlutils import SQL_PREFIX
    96         # now we should be able to insert and query Societe2
    98         # now we should be able to insert and query Societe2
    97         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
    99         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
    98         self.execute('Societe2 X WHERE X name "logilab"')
   100         self.execute('Societe2 X WHERE X name "logilab"')
    99         self.execute('SET X concerne2 X WHERE X name "logilab"')
   101         self.execute('SET X concerne2 X WHERE X name "logilab"')
   100         rset = self.execute('Any X WHERE X concerne2 Y')
   102         rset = self.execute('Any X WHERE X concerne2 Y')
   101         self.assertEquals(rset.rows, [[s2eid]])
   103         self.assertEqual(rset.rows, [[s2eid]])
   102         # check that when a relation definition is deleted, existing relations are deleted
   104         # check that when a relation definition is deleted, existing relations are deleted
   103         rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, '
   105         rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, '
   104                                '   X from_entity E, X to_entity E '
   106                                '   X from_entity E, X to_entity E '
   105                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
   107                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
   106         self._set_perms(rdefeid)
   108         self._set_perms(rdefeid)
   123         self.failIf('concerne2' in schema['CWUser'].subject_relations())
   125         self.failIf('concerne2' in schema['CWUser'].subject_relations())
   124 
   126 
   125     def test_is_instance_of_insertions(self):
   127     def test_is_instance_of_insertions(self):
   126         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
   128         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
   127         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
   129         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
   128         self.assertEquals(is_etypes, ['Transition'])
   130         self.assertEqual(is_etypes, ['Transition'])
   129         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
   131         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
   130         self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
   132         self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
   131         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
   133         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
   132         self.failIf('subdiv' in snames)
   134         self.failIf('subdiv' in snames)
   133         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
   135         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
   134         self.failUnless('subdiv' in snames)
   136         self.failUnless('subdiv' in snames)
   135 
   137 
   136 
   138 
   137     def test_perms_synchronization_1(self):
   139     def test_perms_synchronization_1(self):
   138         schema = self.repo.schema
   140         schema = self.repo.schema
   139         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
   141         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
   140         self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
   142         self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
   141         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   143         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   142         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
   144         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
   143         self.commit()
   145         self.commit()
   144         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers',)))
   146         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',)))
   145         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   147         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   146         self.commit()
   148         self.commit()
   147         self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',)))
   149         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users',)))
   148 
   150 
   149     def test_perms_synchronization_2(self):
   151     def test_perms_synchronization_2(self):
   150         schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')]
   152         schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')]
   151         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   153         self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   152         self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   154         self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   153         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   155         self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   154         self.commit()
   156         self.commit()
   155         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
   157         self.assertEqual(schema.get_groups('read'), set(('managers', 'users')))
   156         self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   158         self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"')
   157         self.assertEquals(schema.get_groups('read'), set(('managers', 'users')))
   159         self.assertEqual(schema.get_groups('read'), set(('managers', 'users')))
   158         self.commit()
   160         self.commit()
   159         self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   161         self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests')))
   160 
   162 
   161     def test_nonregr_user_edit_itself(self):
   163     def test_nonregr_user_edit_itself(self):
   162         ueid = self.session.user.eid
   164         ueid = self.session.user.eid
   163         groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')]
   165         groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')]
   164         self.execute('DELETE X in_group Y WHERE X eid %s' % ueid)
   166         self.execute('DELETE X in_group Y WHERE X eid %s' % ueid)
   185             self.failUnless(self.schema['state_of'].inlined)
   187             self.failUnless(self.schema['state_of'].inlined)
   186             self.commit()
   188             self.commit()
   187             self.failIf(self.schema['state_of'].inlined)
   189             self.failIf(self.schema['state_of'].inlined)
   188             self.failIf(self.index_exists('State', 'state_of'))
   190             self.failIf(self.index_exists('State', 'state_of'))
   189             rset = self.execute('Any X, Y WHERE X state_of Y')
   191             rset = self.execute('Any X, Y WHERE X state_of Y')
   190             self.assertEquals(len(rset), 2) # user states
   192             self.assertEqual(len(rset), 2) # user states
       
   193         except:
       
   194             import traceback
       
   195             traceback.print_exc()
   191         finally:
   196         finally:
   192             self.execute('SET X inlined TRUE WHERE X name "state_of"')
   197             self.execute('SET X inlined TRUE WHERE X name "state_of"')
   193             self.failIf(self.schema['state_of'].inlined)
   198             self.failIf(self.schema['state_of'].inlined)
   194             self.commit()
   199             self.commit()
   195             self.failUnless(self.schema['state_of'].inlined)
   200             self.failUnless(self.schema['state_of'].inlined)
   196             self.failUnless(self.index_exists('State', 'state_of'))
   201             self.failUnless(self.index_exists('State', 'state_of'))
   197             rset = self.execute('Any X, Y WHERE X state_of Y')
   202             rset = self.execute('Any X, Y WHERE X state_of Y')
   198             self.assertEquals(len(rset), 2)
   203             self.assertEqual(len(rset), 2)
   199 
   204 
   200     def test_indexed_change(self):
   205     def test_indexed_change(self):
   201         self.session.set_pool()
   206         self.session.set_pool()
   202         dbhelper = self.session.pool.source('system').dbhelper
   207         dbhelper = self.session.pool.source('system').dbhelper
   203         sqlcursor = self.session.pool['system']
   208         sqlcursor = self.session.pool['system']
   253                      'WHERE DEF relation_type RT, DEF from_entity E,'
   258                      'WHERE DEF relation_type RT, DEF from_entity E,'
   254                      'RT name "surname", E name "CWUser"')
   259                      'RT name "surname", E name "CWUser"')
   255         self.commit()
   260         self.commit()
   256         # should not be able anymore to add cwuser without surname
   261         # should not be able anymore to add cwuser without surname
   257         self.assertRaises(ValidationError, self.create_user, "toto")
   262         self.assertRaises(ValidationError, self.create_user, "toto")
       
   263         self.rollback()
   258         self.execute('SET DEF cardinality "?1" '
   264         self.execute('SET DEF cardinality "?1" '
   259                      'WHERE DEF relation_type RT, DEF from_entity E,'
   265                      'WHERE DEF relation_type RT, DEF from_entity E,'
   260                      'RT name "surname", E name "CWUser"')
   266                      'RT name "surname", E name "CWUser"')
   261         self.commit()
   267         self.commit()
   262 
   268 
   312 
   318 
   313     def test_update_constraint(self):
   319     def test_update_constraint(self):
   314         rdef = self.schema['Transition'].rdef('type')
   320         rdef = self.schema['Transition'].rdef('type')
   315         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   321         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   316         if not getattr(cstr, 'eid', None):
   322         if not getattr(cstr, 'eid', None):
   317             self.skip('start me alone') # bug in schema reloading, constraint's eid not restored
   323             self.skipTest('start me alone') # bug in schema reloading, constraint's eid not restored
   318         self.execute('SET X value %(v)s WHERE X eid %(x)s',
   324         self.execute('SET X value %(v)s WHERE X eid %(x)s',
   319                      {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"})
   325                      {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"})
   320         self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X '
   326         self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X '
   321                      'WHERE CT name %(ct)s, EDEF eid %(x)s',
   327                      'WHERE CT name %(ct)s, EDEF eid %(x)s',
   322                      {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid})
   328                      {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid})
   323         self.commit()
   329         self.commit()
   324         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   330         cstr = rdef.constraint_by_type('StaticVocabularyConstraint')
   325         self.assertEquals(cstr.values, (u'normal', u'auto', u'new'))
   331         self.assertEqual(cstr.values, (u'normal', u'auto', u'new'))
   326         self.execute('INSERT Transition T: T name "hop", T type "new"')
   332         self.execute('INSERT Transition T: T name "hop", T type "new"')
   327 
   333 
   328 if __name__ == '__main__':
   334 if __name__ == '__main__':
   329     unittest_main()
   335     unittest_main()