hooks/test/unittest_syncschema.py
changeset 7791 31bb51ea5485
parent 7398 26695dd703d8
child 7845 2172978be237
equal deleted inserted replaced
7790:7e16e056eecb 7791:31bb51ea5485
    58     def test_base(self):
    58     def test_base(self):
    59         schema = self.repo.schema
    59         schema = self.repo.schema
    60         self.session.set_cnxset()
    60         self.session.set_cnxset()
    61         dbhelper = self.session.cnxset.source('system').dbhelper
    61         dbhelper = self.session.cnxset.source('system').dbhelper
    62         sqlcursor = self.session.cnxset['system']
    62         sqlcursor = self.session.cnxset['system']
    63         self.failIf(schema.has_entity('Societe2'))
    63         self.assertFalse(schema.has_entity('Societe2'))
    64         self.failIf(schema.has_entity('concerne2'))
    64         self.assertFalse(schema.has_entity('concerne2'))
    65         # schema should be update on insertion (after commit)
    65         # schema should be update on insertion (after commit)
    66         eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0]
    66         eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0]
    67         self._set_perms(eeid)
    67         self._set_perms(eeid)
    68         self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE')
    68         self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE')
    69         self.failIf(schema.has_entity('Societe2'))
    69         self.assertFalse(schema.has_entity('Societe2'))
    70         self.failIf(schema.has_entity('concerne2'))
    70         self.assertFalse(schema.has_entity('concerne2'))
    71         # have to commit before adding definition relations
    71         # have to commit before adding definition relations
    72         self.commit()
    72         self.commit()
    73         self.failUnless(schema.has_entity('Societe2'))
    73         self.assertTrue(schema.has_entity('Societe2'))
    74         self.failUnless(schema.has_relation('concerne2'))
    74         self.assertTrue(schema.has_relation('concerne2'))
    75         attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", '
    75         attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", '
    76                                '   X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
    76                                '   X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
    77                                'WHERE RT name "name", E name "Societe2", F name "String"')[0][0]
    77                                'WHERE RT name "name", E name "Societe2", F name "String"')[0][0]
    78         self._set_attr_perms(attreid)
    78         self._set_attr_perms(attreid)
    79         concerne2_rdef_eid = self.execute(
    79         concerne2_rdef_eid = self.execute(
    80             'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
    80             'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
    81             'WHERE RT name "concerne2", E name "Societe2"')[0][0]
    81             'WHERE RT name "concerne2", E name "Societe2"')[0][0]
    82         self._set_perms(concerne2_rdef_eid)
    82         self._set_perms(concerne2_rdef_eid)
    83         self.failIf('name' in schema['Societe2'].subject_relations())
    83         self.assertFalse('name' in schema['Societe2'].subject_relations())
    84         self.failIf('concerne2' in schema['Societe2'].subject_relations())
    84         self.assertFalse('concerne2' in schema['Societe2'].subject_relations())
    85         self.failIf(self.index_exists('Societe2', 'name'))
    85         self.assertFalse(self.index_exists('Societe2', 'name'))
    86         self.commit()
    86         self.commit()
    87         self.failUnless('name' in schema['Societe2'].subject_relations())
    87         self.assertTrue('name' in schema['Societe2'].subject_relations())
    88         self.failUnless('concerne2' in schema['Societe2'].subject_relations())
    88         self.assertTrue('concerne2' in schema['Societe2'].subject_relations())
    89         self.failUnless(self.index_exists('Societe2', 'name'))
    89         self.assertTrue(self.index_exists('Societe2', 'name'))
    90         # now we should be able to insert and query Societe2
    90         # now we should be able to insert and query Societe2
    91         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
    91         s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0]
    92         self.execute('Societe2 X WHERE X name "logilab"')
    92         self.execute('Societe2 X WHERE X name "logilab"')
    93         self.execute('SET X concerne2 X WHERE X name "logilab"')
    93         self.execute('SET X concerne2 X WHERE X name "logilab"')
    94         rset = self.execute('Any X WHERE X concerne2 Y')
    94         rset = self.execute('Any X WHERE X concerne2 Y')
    99                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
    99                                'WHERE RT name "concerne2", E name "CWUser"')[0][0]
   100         self._set_perms(rdefeid)
   100         self._set_perms(rdefeid)
   101         self.commit()
   101         self.commit()
   102         self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid})
   102         self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid})
   103         self.commit()
   103         self.commit()
   104         self.failUnless('concerne2' in schema['CWUser'].subject_relations())
   104         self.assertTrue('concerne2' in schema['CWUser'].subject_relations())
   105         self.failIf('concerne2' in schema['Societe2'].subject_relations())
   105         self.assertFalse('concerne2' in schema['Societe2'].subject_relations())
   106         self.failIf(self.execute('Any X WHERE X concerne2 Y'))
   106         self.assertFalse(self.execute('Any X WHERE X concerne2 Y'))
   107         # schema should be cleaned on delete (after commit)
   107         # schema should be cleaned on delete (after commit)
   108         self.execute('DELETE CWEType X WHERE X name "Societe2"')
   108         self.execute('DELETE CWEType X WHERE X name "Societe2"')
   109         self.execute('DELETE CWRType X WHERE X name "concerne2"')
   109         self.execute('DELETE CWRType X WHERE X name "concerne2"')
   110         self.failUnless(self.index_exists('Societe2', 'name'))
   110         self.assertTrue(self.index_exists('Societe2', 'name'))
   111         self.failUnless(schema.has_entity('Societe2'))
   111         self.assertTrue(schema.has_entity('Societe2'))
   112         self.failUnless(schema.has_relation('concerne2'))
   112         self.assertTrue(schema.has_relation('concerne2'))
   113         self.commit()
   113         self.commit()
   114         self.failIf(self.index_exists('Societe2', 'name'))
   114         self.assertFalse(self.index_exists('Societe2', 'name'))
   115         self.failIf(schema.has_entity('Societe2'))
   115         self.assertFalse(schema.has_entity('Societe2'))
   116         self.failIf(schema.has_entity('concerne2'))
   116         self.assertFalse(schema.has_entity('concerne2'))
   117         self.failIf('concerne2' in schema['CWUser'].subject_relations())
   117         self.assertFalse('concerne2' in schema['CWUser'].subject_relations())
   118 
   118 
   119     def test_is_instance_of_insertions(self):
   119     def test_is_instance_of_insertions(self):
   120         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
   120         seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0]
   121         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
   121         is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)]
   122         self.assertEqual(is_etypes, ['Transition'])
   122         self.assertEqual(is_etypes, ['Transition'])
   123         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
   123         instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)]
   124         self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
   124         self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition'])
   125         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
   125         snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')]
   126         self.failIf('subdiv' in snames)
   126         self.assertFalse('subdiv' in snames)
   127         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
   127         snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')]
   128         self.failUnless('subdiv' in snames)
   128         self.assertTrue('subdiv' in snames)
   129 
   129 
   130 
   130 
   131     def test_perms_synchronization_1(self):
   131     def test_perms_synchronization_1(self):
   132         schema = self.repo.schema
   132         schema = self.repo.schema
   133         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
   133         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users')))
   134         self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
   134         self.assertTrue(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0])
   135         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   135         self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   136         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
   136         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', )))
   137         self.commit()
   137         self.commit()
   138         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',)))
   138         self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',)))
   139         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   139         self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"')
   171 
   171 
   172     def test_uninline_relation(self):
   172     def test_uninline_relation(self):
   173         self.session.set_cnxset()
   173         self.session.set_cnxset()
   174         dbhelper = self.session.cnxset.source('system').dbhelper
   174         dbhelper = self.session.cnxset.source('system').dbhelper
   175         sqlcursor = self.session.cnxset['system']
   175         sqlcursor = self.session.cnxset['system']
   176         self.failUnless(self.schema['state_of'].inlined)
   176         self.assertTrue(self.schema['state_of'].inlined)
   177         try:
   177         try:
   178             self.execute('SET X inlined FALSE WHERE X name "state_of"')
   178             self.execute('SET X inlined FALSE WHERE X name "state_of"')
   179             self.failUnless(self.schema['state_of'].inlined)
   179             self.assertTrue(self.schema['state_of'].inlined)
   180             self.commit()
   180             self.commit()
   181             self.failIf(self.schema['state_of'].inlined)
   181             self.assertFalse(self.schema['state_of'].inlined)
   182             self.failIf(self.index_exists('State', 'state_of'))
   182             self.assertFalse(self.index_exists('State', 'state_of'))
   183             rset = self.execute('Any X, Y WHERE X state_of Y')
   183             rset = self.execute('Any X, Y WHERE X state_of Y')
   184             self.assertEqual(len(rset), 2) # user states
   184             self.assertEqual(len(rset), 2) # user states
   185         except:
   185         except:
   186             import traceback
   186             import traceback
   187             traceback.print_exc()
   187             traceback.print_exc()
   188         finally:
   188         finally:
   189             self.execute('SET X inlined TRUE WHERE X name "state_of"')
   189             self.execute('SET X inlined TRUE WHERE X name "state_of"')
   190             self.failIf(self.schema['state_of'].inlined)
   190             self.assertFalse(self.schema['state_of'].inlined)
   191             self.commit()
   191             self.commit()
   192             self.failUnless(self.schema['state_of'].inlined)
   192             self.assertTrue(self.schema['state_of'].inlined)
   193             self.failUnless(self.index_exists('State', 'state_of'))
   193             self.assertTrue(self.index_exists('State', 'state_of'))
   194             rset = self.execute('Any X, Y WHERE X state_of Y')
   194             rset = self.execute('Any X, Y WHERE X state_of Y')
   195             self.assertEqual(len(rset), 2)
   195             self.assertEqual(len(rset), 2)
   196 
   196 
   197     def test_indexed_change(self):
   197     def test_indexed_change(self):
   198         self.session.set_cnxset()
   198         self.session.set_cnxset()
   199         dbhelper = self.session.cnxset.source('system').dbhelper
   199         dbhelper = self.session.cnxset.source('system').dbhelper
   200         sqlcursor = self.session.cnxset['system']
   200         sqlcursor = self.session.cnxset['system']
   201         try:
   201         try:
   202             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
   202             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
   203             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
   203             self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed)
   204             self.failUnless(self.index_exists('Workflow', 'name'))
   204             self.assertTrue(self.index_exists('Workflow', 'name'))
   205             self.commit()
   205             self.commit()
   206             self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed)
   206             self.assertFalse(self.schema['name'].rdef('Workflow', 'String').indexed)
   207             self.failIf(self.index_exists('Workflow', 'name'))
   207             self.assertFalse(self.index_exists('Workflow', 'name'))
   208         finally:
   208         finally:
   209             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"')
   209             self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"')
   210             self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed)
   210             self.assertFalse(self.schema['name'].rdef('Workflow', 'String').indexed)
   211             self.failIf(self.index_exists('Workflow', 'name'))
   211             self.assertFalse(self.index_exists('Workflow', 'name'))
   212             self.commit()
   212             self.commit()
   213             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
   213             self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed)
   214             self.failUnless(self.index_exists('Workflow', 'name'))
   214             self.assertTrue(self.index_exists('Workflow', 'name'))
   215 
   215 
   216     def test_unique_change(self):
   216     def test_unique_change(self):
   217         self.session.set_cnxset()
   217         self.session.set_cnxset()
   218         dbhelper = self.session.cnxset.source('system').dbhelper
   218         dbhelper = self.session.cnxset.source('system').dbhelper
   219         sqlcursor = self.session.cnxset['system']
   219         sqlcursor = self.session.cnxset['system']
   220         try:
   220         try:
   221             self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
   221             self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
   222                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   222                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   223                          'RT name "name", E name "Workflow"')
   223                          'RT name "name", E name "Workflow"')
   224             self.failIf(self.schema['Workflow'].has_unique_values('name'))
   224             self.assertFalse(self.schema['Workflow'].has_unique_values('name'))
   225             self.failIf(self.index_exists('Workflow', 'name', unique=True))
   225             self.assertFalse(self.index_exists('Workflow', 'name', unique=True))
   226             self.commit()
   226             self.commit()
   227             self.failUnless(self.schema['Workflow'].has_unique_values('name'))
   227             self.assertTrue(self.schema['Workflow'].has_unique_values('name'))
   228             self.failUnless(self.index_exists('Workflow', 'name', unique=True))
   228             self.assertTrue(self.index_exists('Workflow', 'name', unique=True))
   229         finally:
   229         finally:
   230             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
   230             self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, '
   231                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   231                          'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   232                          'RT name "name", E name "Workflow"')
   232                          'RT name "name", E name "Workflow"')
   233             self.failUnless(self.schema['Workflow'].has_unique_values('name'))
   233             self.assertTrue(self.schema['Workflow'].has_unique_values('name'))
   234             self.failUnless(self.index_exists('Workflow', 'name', unique=True))
   234             self.assertTrue(self.index_exists('Workflow', 'name', unique=True))
   235             self.commit()
   235             self.commit()
   236             self.failIf(self.schema['Workflow'].has_unique_values('name'))
   236             self.assertFalse(self.schema['Workflow'].has_unique_values('name'))
   237             self.failIf(self.index_exists('Workflow', 'name', unique=True))
   237             self.assertFalse(self.index_exists('Workflow', 'name', unique=True))
   238 
   238 
   239     def test_required_change_1(self):
   239     def test_required_change_1(self):
   240         self.execute('SET DEF cardinality "?1" '
   240         self.execute('SET DEF cardinality "?1" '
   241                      'WHERE DEF relation_type RT, DEF from_entity E,'
   241                      'WHERE DEF relation_type RT, DEF from_entity E,'
   242                      'RT name "title", E name "Bookmark"')
   242                      'RT name "title", E name "Bookmark"')
   265                                'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0]
   265                                'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0]
   266         assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"',
   266         assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"',
   267                      {'x': attreid})
   267                      {'x': attreid})
   268         self.commit()
   268         self.commit()
   269         self.schema.rebuild_infered_relations()
   269         self.schema.rebuild_infered_relations()
   270         self.failUnless('Transition' in self.schema['messageid'].subjects())
   270         self.assertTrue('Transition' in self.schema['messageid'].subjects())
   271         self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects())
   271         self.assertTrue('WorkflowTransition' in self.schema['messageid'].subjects())
   272         self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"')
   272         self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"')
   273 
   273 
   274     def test_change_fulltextindexed(self):
   274     def test_change_fulltextindexed(self):
   275         req = self.request()
   275         req = self.request()
   276         target = req.create_entity(u'Email', messageid=u'1234',
   276         target = req.create_entity(u'Email', messageid=u'1234',
   281         assert req.execute('SET A fulltextindexed FALSE '
   281         assert req.execute('SET A fulltextindexed FALSE '
   282                             'WHERE E is CWEType, E name "Email", A is CWAttribute,'
   282                             'WHERE E is CWEType, E name "Email", A is CWAttribute,'
   283                             'A from_entity E, A relation_type R, R name "subject"')
   283                             'A from_entity E, A relation_type R, R name "subject"')
   284         self.commit()
   284         self.commit()
   285         rset = req.execute('Any X WHERE X has_text "rick.roll"')
   285         rset = req.execute('Any X WHERE X has_text "rick.roll"')
   286         self.failIf(rset)
   286         self.assertFalse(rset)
   287         assert req.execute('SET A fulltextindexed TRUE '
   287         assert req.execute('SET A fulltextindexed TRUE '
   288                            'WHERE A from_entity E, A relation_type R, '
   288                            'WHERE A from_entity E, A relation_type R, '
   289                            'E name "Email", R name "subject"')
   289                            'E name "Email", R name "subject"')
   290         self.commit()
   290         self.commit()
   291         rset = req.execute('Any X WHERE X has_text "rick.roll"')
   291         rset = req.execute('Any X WHERE X has_text "rick.roll"')