hooks/test/unittest_syncschema.py
changeset 7398 26695dd703d8
parent 7244 a918f76441ce
child 7791 31bb51ea5485
child 7815 2a164a9cf81c
equal deleted inserted replaced
7397:6a9e66d788b3 7398:26695dd703d8
    34         super(SchemaModificationHooksTC, self).setUp()
    34         super(SchemaModificationHooksTC, self).setUp()
    35         self.repo.fill_schema()
    35         self.repo.fill_schema()
    36         self.__class__.schema_eids = schema_eids_idx(self.repo.schema)
    36         self.__class__.schema_eids = schema_eids_idx(self.repo.schema)
    37 
    37 
    38     def index_exists(self, etype, attr, unique=False):
    38     def index_exists(self, etype, attr, unique=False):
    39         self.session.set_pool()
    39         self.session.set_cnxset()
    40         dbhelper = self.session.pool.source('system').dbhelper
    40         dbhelper = self.session.cnxset.source('system').dbhelper
    41         sqlcursor = self.session.pool['system']
    41         sqlcursor = self.session.cnxset['system']
    42         return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
    42         return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
    43 
    43 
    44     def _set_perms(self, eid):
    44     def _set_perms(self, eid):
    45         self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup',
    45         self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup',
    46                      {'x': eid})
    46                      {'x': eid})
    55         self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"',
    55         self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"',
    56                      {'x': eid})
    56                      {'x': eid})
    57 
    57 
    58     def test_base(self):
    58     def test_base(self):
    59         schema = self.repo.schema
    59         schema = self.repo.schema
    60         self.session.set_pool()
    60         self.session.set_cnxset()
    61         dbhelper = self.session.pool.source('system').dbhelper
    61         dbhelper = self.session.cnxset.source('system').dbhelper
    62         sqlcursor = self.session.pool['system']
    62         sqlcursor = self.session.cnxset['system']
    63         self.failIf(schema.has_entity('Societe2'))
    63         self.failIf(schema.has_entity('Societe2'))
    64         self.failIf(schema.has_entity('concerne2'))
    64         self.failIf(schema.has_entity('concerne2'))
    65         # schema should be update on insertion (after commit)
    65         # schema should be update on insertion (after commit)
    66         eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0]
    66         eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0]
    67         self._set_perms(eeid)
    67         self._set_perms(eeid)
   168         self.execute('Any X WHERE X is CWEType, X name "CWEType"')
   168         self.execute('Any X WHERE X is CWEType, X name "CWEType"')
   169 
   169 
   170     # schema modification hooks tests #########################################
   170     # schema modification hooks tests #########################################
   171 
   171 
   172     def test_uninline_relation(self):
   172     def test_uninline_relation(self):
   173         self.session.set_pool()
   173         self.session.set_cnxset()
   174         dbhelper = self.session.pool.source('system').dbhelper
   174         dbhelper = self.session.cnxset.source('system').dbhelper
   175         sqlcursor = self.session.pool['system']
   175         sqlcursor = self.session.cnxset['system']
   176         self.failUnless(self.schema['state_of'].inlined)
   176         self.failUnless(self.schema['state_of'].inlined)
   177         try:
   177         try:
   178             self.execute('SET X inlined FALSE WHERE X name "state_of"')
   178             self.execute('SET X inlined FALSE WHERE X name "state_of"')
   179             self.failUnless(self.schema['state_of'].inlined)
   179             self.failUnless(self.schema['state_of'].inlined)
   180             self.commit()
   180             self.commit()
   193             self.failUnless(self.index_exists('State', 'state_of'))
   193             self.failUnless(self.index_exists('State', 'state_of'))
   194             rset = self.execute('Any X, Y WHERE X state_of Y')
   194             rset = self.execute('Any X, Y WHERE X state_of Y')
   195             self.assertEqual(len(rset), 2)
   195             self.assertEqual(len(rset), 2)
   196 
   196 
   197     def test_indexed_change(self):
   197     def test_indexed_change(self):
   198         self.session.set_pool()
   198         self.session.set_cnxset()
   199         dbhelper = self.session.pool.source('system').dbhelper
   199         dbhelper = self.session.cnxset.source('system').dbhelper
   200         sqlcursor = self.session.pool['system']
   200         sqlcursor = self.session.cnxset['system']
   201         try:
   201         try:
   202             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
   202             self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"')
   203             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
   203             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
   204             self.failUnless(self.index_exists('Workflow', 'name'))
   204             self.failUnless(self.index_exists('Workflow', 'name'))
   205             self.commit()
   205             self.commit()
   212             self.commit()
   212             self.commit()
   213             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
   213             self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed)
   214             self.failUnless(self.index_exists('Workflow', 'name'))
   214             self.failUnless(self.index_exists('Workflow', 'name'))
   215 
   215 
   216     def test_unique_change(self):
   216     def test_unique_change(self):
   217         self.session.set_pool()
   217         self.session.set_cnxset()
   218         dbhelper = self.session.pool.source('system').dbhelper
   218         dbhelper = self.session.cnxset.source('system').dbhelper
   219         sqlcursor = self.session.pool['system']
   219         sqlcursor = self.session.cnxset['system']
   220         try:
   220         try:
   221             self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
   221             self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X '
   222                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   222                          'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,'
   223                          'RT name "name", E name "Workflow"')
   223                          'RT name "name", E name "Workflow"')
   224             self.failIf(self.schema['Workflow'].has_unique_values('name'))
   224             self.failIf(self.schema['Workflow'].has_unique_values('name'))