34 super(SchemaModificationHooksTC, self).setUp() |
34 super(SchemaModificationHooksTC, self).setUp() |
35 self.repo.fill_schema() |
35 self.repo.fill_schema() |
36 self.__class__.schema_eids = schema_eids_idx(self.repo.schema) |
36 self.__class__.schema_eids = schema_eids_idx(self.repo.schema) |
37 |
37 |
38 def index_exists(self, etype, attr, unique=False): |
38 def index_exists(self, etype, attr, unique=False): |
39 self.session.set_pool() |
39 self.session.set_cnxset() |
40 dbhelper = self.session.pool.source('system').dbhelper |
40 dbhelper = self.session.cnxset.source('system').dbhelper |
41 sqlcursor = self.session.pool['system'] |
41 sqlcursor = self.session.cnxset['system'] |
42 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
42 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
43 |
43 |
44 def _set_perms(self, eid): |
44 def _set_perms(self, eid): |
45 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
45 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
46 {'x': eid}) |
46 {'x': eid}) |
55 self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
55 self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
56 {'x': eid}) |
56 {'x': eid}) |
57 |
57 |
58 def test_base(self): |
58 def test_base(self): |
59 schema = self.repo.schema |
59 schema = self.repo.schema |
60 self.session.set_pool() |
60 self.session.set_cnxset() |
61 dbhelper = self.session.pool.source('system').dbhelper |
61 dbhelper = self.session.cnxset.source('system').dbhelper |
62 sqlcursor = self.session.pool['system'] |
62 sqlcursor = self.session.cnxset['system'] |
63 self.failIf(schema.has_entity('Societe2')) |
63 self.failIf(schema.has_entity('Societe2')) |
64 self.failIf(schema.has_entity('concerne2')) |
64 self.failIf(schema.has_entity('concerne2')) |
65 # schema should be update on insertion (after commit) |
65 # schema should be update on insertion (after commit) |
66 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
66 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
67 self._set_perms(eeid) |
67 self._set_perms(eeid) |
168 self.execute('Any X WHERE X is CWEType, X name "CWEType"') |
168 self.execute('Any X WHERE X is CWEType, X name "CWEType"') |
169 |
169 |
170 # schema modification hooks tests ######################################### |
170 # schema modification hooks tests ######################################### |
171 |
171 |
172 def test_uninline_relation(self): |
172 def test_uninline_relation(self): |
173 self.session.set_pool() |
173 self.session.set_cnxset() |
174 dbhelper = self.session.pool.source('system').dbhelper |
174 dbhelper = self.session.cnxset.source('system').dbhelper |
175 sqlcursor = self.session.pool['system'] |
175 sqlcursor = self.session.cnxset['system'] |
176 self.failUnless(self.schema['state_of'].inlined) |
176 self.failUnless(self.schema['state_of'].inlined) |
177 try: |
177 try: |
178 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
178 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
179 self.failUnless(self.schema['state_of'].inlined) |
179 self.failUnless(self.schema['state_of'].inlined) |
180 self.commit() |
180 self.commit() |
193 self.failUnless(self.index_exists('State', 'state_of')) |
193 self.failUnless(self.index_exists('State', 'state_of')) |
194 rset = self.execute('Any X, Y WHERE X state_of Y') |
194 rset = self.execute('Any X, Y WHERE X state_of Y') |
195 self.assertEqual(len(rset), 2) |
195 self.assertEqual(len(rset), 2) |
196 |
196 |
197 def test_indexed_change(self): |
197 def test_indexed_change(self): |
198 self.session.set_pool() |
198 self.session.set_cnxset() |
199 dbhelper = self.session.pool.source('system').dbhelper |
199 dbhelper = self.session.cnxset.source('system').dbhelper |
200 sqlcursor = self.session.pool['system'] |
200 sqlcursor = self.session.cnxset['system'] |
201 try: |
201 try: |
202 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
202 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
203 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
203 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
204 self.failUnless(self.index_exists('Workflow', 'name')) |
204 self.failUnless(self.index_exists('Workflow', 'name')) |
205 self.commit() |
205 self.commit() |
212 self.commit() |
212 self.commit() |
213 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
213 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
214 self.failUnless(self.index_exists('Workflow', 'name')) |
214 self.failUnless(self.index_exists('Workflow', 'name')) |
215 |
215 |
216 def test_unique_change(self): |
216 def test_unique_change(self): |
217 self.session.set_pool() |
217 self.session.set_cnxset() |
218 dbhelper = self.session.pool.source('system').dbhelper |
218 dbhelper = self.session.cnxset.source('system').dbhelper |
219 sqlcursor = self.session.pool['system'] |
219 sqlcursor = self.session.cnxset['system'] |
220 try: |
220 try: |
221 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
221 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
222 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
222 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
223 'RT name "name", E name "Workflow"') |
223 'RT name "name", E name "Workflow"') |
224 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
224 self.failIf(self.schema['Workflow'].has_unique_values('name')) |