36 self.repo.set_schema(self.repo.deserialize_schema(), resetvreg=False) |
36 self.repo.set_schema(self.repo.deserialize_schema(), resetvreg=False) |
37 self.__class__.schema_eids = schema_eids_idx(self.repo.schema) |
37 self.__class__.schema_eids = schema_eids_idx(self.repo.schema) |
38 |
38 |
39 def index_exists(self, etype, attr, unique=False): |
39 def index_exists(self, etype, attr, unique=False): |
40 self.session.set_cnxset() |
40 self.session.set_cnxset() |
41 dbhelper = self.session.cnxset.source('system').dbhelper |
41 dbhelper = self.repo.system_source.dbhelper |
42 sqlcursor = self.session.cnxset['system'] |
42 sqlcursor = self.session.cnxset.cu |
43 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
43 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
44 |
44 |
45 def _set_perms(self, eid): |
45 def _set_perms(self, eid): |
46 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
46 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
47 {'x': eid}) |
47 {'x': eid}) |
57 {'x': eid}) |
57 {'x': eid}) |
58 |
58 |
59 def test_base(self): |
59 def test_base(self): |
60 schema = self.repo.schema |
60 schema = self.repo.schema |
61 self.session.set_cnxset() |
61 self.session.set_cnxset() |
62 dbhelper = self.session.cnxset.source('system').dbhelper |
62 dbhelper = self.repo.system_source.dbhelper |
63 sqlcursor = self.session.cnxset['system'] |
63 sqlcursor = self.session.cnxset.cu |
64 self.assertFalse(schema.has_entity('Societe2')) |
64 self.assertFalse(schema.has_entity('Societe2')) |
65 self.assertFalse(schema.has_entity('concerne2')) |
65 self.assertFalse(schema.has_entity('concerne2')) |
66 # schema should be update on insertion (after commit) |
66 # schema should be update on insertion (after commit) |
67 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
67 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
68 self._set_perms(eeid) |
68 self._set_perms(eeid) |
198 |
198 |
199 # schema modification hooks tests ######################################### |
199 # schema modification hooks tests ######################################### |
200 |
200 |
201 def test_uninline_relation(self): |
201 def test_uninline_relation(self): |
202 self.session.set_cnxset() |
202 self.session.set_cnxset() |
203 dbhelper = self.session.cnxset.source('system').dbhelper |
203 dbhelper = self.repo.system_source.dbhelper |
204 sqlcursor = self.session.cnxset['system'] |
204 sqlcursor = self.session.cnxset.cu |
205 self.assertTrue(self.schema['state_of'].inlined) |
205 self.assertTrue(self.schema['state_of'].inlined) |
206 try: |
206 try: |
207 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
207 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
208 self.assertTrue(self.schema['state_of'].inlined) |
208 self.assertTrue(self.schema['state_of'].inlined) |
209 self.commit() |
209 self.commit() |
223 rset = self.execute('Any X, Y WHERE X state_of Y') |
223 rset = self.execute('Any X, Y WHERE X state_of Y') |
224 self.assertEqual(len(rset), 2) |
224 self.assertEqual(len(rset), 2) |
225 |
225 |
226 def test_indexed_change(self): |
226 def test_indexed_change(self): |
227 self.session.set_cnxset() |
227 self.session.set_cnxset() |
228 dbhelper = self.session.cnxset.source('system').dbhelper |
228 dbhelper = self.repo.system_source.dbhelper |
229 sqlcursor = self.session.cnxset['system'] |
229 sqlcursor = self.session.cnxset.cu |
230 try: |
230 try: |
231 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
231 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
232 self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) |
232 self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) |
233 self.assertTrue(self.index_exists('Workflow', 'name')) |
233 self.assertTrue(self.index_exists('Workflow', 'name')) |
234 self.commit() |
234 self.commit() |
242 self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) |
242 self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) |
243 self.assertTrue(self.index_exists('Workflow', 'name')) |
243 self.assertTrue(self.index_exists('Workflow', 'name')) |
244 |
244 |
245 def test_unique_change(self): |
245 def test_unique_change(self): |
246 self.session.set_cnxset() |
246 self.session.set_cnxset() |
247 dbhelper = self.session.cnxset.source('system').dbhelper |
247 dbhelper = self.repo.system_source.dbhelper |
248 sqlcursor = self.session.cnxset['system'] |
248 sqlcursor = self.session.cnxset.cu |
249 try: |
249 try: |
250 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
250 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
251 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
251 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
252 'RT name "name", E name "Workflow"') |
252 'RT name "name", E name "Workflow"') |
253 self.assertFalse(self.schema['Workflow'].has_unique_values('name')) |
253 self.assertFalse(self.schema['Workflow'].has_unique_values('name')) |