58 def test_base(self): |
58 def test_base(self): |
59 schema = self.repo.schema |
59 schema = self.repo.schema |
60 self.session.set_cnxset() |
60 self.session.set_cnxset() |
61 dbhelper = self.session.cnxset.source('system').dbhelper |
61 dbhelper = self.session.cnxset.source('system').dbhelper |
62 sqlcursor = self.session.cnxset['system'] |
62 sqlcursor = self.session.cnxset['system'] |
63 self.failIf(schema.has_entity('Societe2')) |
63 self.assertFalse(schema.has_entity('Societe2')) |
64 self.failIf(schema.has_entity('concerne2')) |
64 self.assertFalse(schema.has_entity('concerne2')) |
65 # schema should be update on insertion (after commit) |
65 # schema should be update on insertion (after commit) |
66 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
66 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
67 self._set_perms(eeid) |
67 self._set_perms(eeid) |
68 self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE') |
68 self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE') |
69 self.failIf(schema.has_entity('Societe2')) |
69 self.assertFalse(schema.has_entity('Societe2')) |
70 self.failIf(schema.has_entity('concerne2')) |
70 self.assertFalse(schema.has_entity('concerne2')) |
71 # have to commit before adding definition relations |
71 # have to commit before adding definition relations |
72 self.commit() |
72 self.commit() |
73 self.failUnless(schema.has_entity('Societe2')) |
73 self.assertTrue(schema.has_entity('Societe2')) |
74 self.failUnless(schema.has_relation('concerne2')) |
74 self.assertTrue(schema.has_relation('concerne2')) |
75 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", ' |
75 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", ' |
76 ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
76 ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
77 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0] |
77 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0] |
78 self._set_attr_perms(attreid) |
78 self._set_attr_perms(attreid) |
79 concerne2_rdef_eid = self.execute( |
79 concerne2_rdef_eid = self.execute( |
80 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
80 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
81 'WHERE RT name "concerne2", E name "Societe2"')[0][0] |
81 'WHERE RT name "concerne2", E name "Societe2"')[0][0] |
82 self._set_perms(concerne2_rdef_eid) |
82 self._set_perms(concerne2_rdef_eid) |
83 self.failIf('name' in schema['Societe2'].subject_relations()) |
83 self.assertFalse('name' in schema['Societe2'].subject_relations()) |
84 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
84 self.assertFalse('concerne2' in schema['Societe2'].subject_relations()) |
85 self.failIf(self.index_exists('Societe2', 'name')) |
85 self.assertFalse(self.index_exists('Societe2', 'name')) |
86 self.commit() |
86 self.commit() |
87 self.failUnless('name' in schema['Societe2'].subject_relations()) |
87 self.assertTrue('name' in schema['Societe2'].subject_relations()) |
88 self.failUnless('concerne2' in schema['Societe2'].subject_relations()) |
88 self.assertTrue('concerne2' in schema['Societe2'].subject_relations()) |
89 self.failUnless(self.index_exists('Societe2', 'name')) |
89 self.assertTrue(self.index_exists('Societe2', 'name')) |
90 # now we should be able to insert and query Societe2 |
90 # now we should be able to insert and query Societe2 |
91 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
91 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
92 self.execute('Societe2 X WHERE X name "logilab"') |
92 self.execute('Societe2 X WHERE X name "logilab"') |
93 self.execute('SET X concerne2 X WHERE X name "logilab"') |
93 self.execute('SET X concerne2 X WHERE X name "logilab"') |
94 rset = self.execute('Any X WHERE X concerne2 Y') |
94 rset = self.execute('Any X WHERE X concerne2 Y') |
99 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
99 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
100 self._set_perms(rdefeid) |
100 self._set_perms(rdefeid) |
101 self.commit() |
101 self.commit() |
102 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}) |
102 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}) |
103 self.commit() |
103 self.commit() |
104 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
104 self.assertTrue('concerne2' in schema['CWUser'].subject_relations()) |
105 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
105 self.assertFalse('concerne2' in schema['Societe2'].subject_relations()) |
106 self.failIf(self.execute('Any X WHERE X concerne2 Y')) |
106 self.assertFalse(self.execute('Any X WHERE X concerne2 Y')) |
107 # schema should be cleaned on delete (after commit) |
107 # schema should be cleaned on delete (after commit) |
108 self.execute('DELETE CWEType X WHERE X name "Societe2"') |
108 self.execute('DELETE CWEType X WHERE X name "Societe2"') |
109 self.execute('DELETE CWRType X WHERE X name "concerne2"') |
109 self.execute('DELETE CWRType X WHERE X name "concerne2"') |
110 self.failUnless(self.index_exists('Societe2', 'name')) |
110 self.assertTrue(self.index_exists('Societe2', 'name')) |
111 self.failUnless(schema.has_entity('Societe2')) |
111 self.assertTrue(schema.has_entity('Societe2')) |
112 self.failUnless(schema.has_relation('concerne2')) |
112 self.assertTrue(schema.has_relation('concerne2')) |
113 self.commit() |
113 self.commit() |
114 self.failIf(self.index_exists('Societe2', 'name')) |
114 self.assertFalse(self.index_exists('Societe2', 'name')) |
115 self.failIf(schema.has_entity('Societe2')) |
115 self.assertFalse(schema.has_entity('Societe2')) |
116 self.failIf(schema.has_entity('concerne2')) |
116 self.assertFalse(schema.has_entity('concerne2')) |
117 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
117 self.assertFalse('concerne2' in schema['CWUser'].subject_relations()) |
118 |
118 |
119 def test_is_instance_of_insertions(self): |
119 def test_is_instance_of_insertions(self): |
120 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
120 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
121 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
121 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
122 self.assertEqual(is_etypes, ['Transition']) |
122 self.assertEqual(is_etypes, ['Transition']) |
123 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
123 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
124 self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
124 self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
125 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
125 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
126 self.failIf('subdiv' in snames) |
126 self.assertFalse('subdiv' in snames) |
127 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
127 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
128 self.failUnless('subdiv' in snames) |
128 self.assertTrue('subdiv' in snames) |
129 |
129 |
130 |
130 |
131 def test_perms_synchronization_1(self): |
131 def test_perms_synchronization_1(self): |
132 schema = self.repo.schema |
132 schema = self.repo.schema |
133 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
133 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
134 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
134 self.assertTrue(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
135 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
135 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
136 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
136 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
137 self.commit() |
137 self.commit() |
138 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',))) |
138 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',))) |
139 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
139 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
171 |
171 |
172 def test_uninline_relation(self): |
172 def test_uninline_relation(self): |
173 self.session.set_cnxset() |
173 self.session.set_cnxset() |
174 dbhelper = self.session.cnxset.source('system').dbhelper |
174 dbhelper = self.session.cnxset.source('system').dbhelper |
175 sqlcursor = self.session.cnxset['system'] |
175 sqlcursor = self.session.cnxset['system'] |
176 self.failUnless(self.schema['state_of'].inlined) |
176 self.assertTrue(self.schema['state_of'].inlined) |
177 try: |
177 try: |
178 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
178 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
179 self.failUnless(self.schema['state_of'].inlined) |
179 self.assertTrue(self.schema['state_of'].inlined) |
180 self.commit() |
180 self.commit() |
181 self.failIf(self.schema['state_of'].inlined) |
181 self.assertFalse(self.schema['state_of'].inlined) |
182 self.failIf(self.index_exists('State', 'state_of')) |
182 self.assertFalse(self.index_exists('State', 'state_of')) |
183 rset = self.execute('Any X, Y WHERE X state_of Y') |
183 rset = self.execute('Any X, Y WHERE X state_of Y') |
184 self.assertEqual(len(rset), 2) # user states |
184 self.assertEqual(len(rset), 2) # user states |
185 except: |
185 except: |
186 import traceback |
186 import traceback |
187 traceback.print_exc() |
187 traceback.print_exc() |
188 finally: |
188 finally: |
189 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
189 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
190 self.failIf(self.schema['state_of'].inlined) |
190 self.assertFalse(self.schema['state_of'].inlined) |
191 self.commit() |
191 self.commit() |
192 self.failUnless(self.schema['state_of'].inlined) |
192 self.assertTrue(self.schema['state_of'].inlined) |
193 self.failUnless(self.index_exists('State', 'state_of')) |
193 self.assertTrue(self.index_exists('State', 'state_of')) |
194 rset = self.execute('Any X, Y WHERE X state_of Y') |
194 rset = self.execute('Any X, Y WHERE X state_of Y') |
195 self.assertEqual(len(rset), 2) |
195 self.assertEqual(len(rset), 2) |
196 |
196 |
197 def test_indexed_change(self): |
197 def test_indexed_change(self): |
198 self.session.set_cnxset() |
198 self.session.set_cnxset() |
199 dbhelper = self.session.cnxset.source('system').dbhelper |
199 dbhelper = self.session.cnxset.source('system').dbhelper |
200 sqlcursor = self.session.cnxset['system'] |
200 sqlcursor = self.session.cnxset['system'] |
201 try: |
201 try: |
202 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
202 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
203 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
203 self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) |
204 self.failUnless(self.index_exists('Workflow', 'name')) |
204 self.assertTrue(self.index_exists('Workflow', 'name')) |
205 self.commit() |
205 self.commit() |
206 self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) |
206 self.assertFalse(self.schema['name'].rdef('Workflow', 'String').indexed) |
207 self.failIf(self.index_exists('Workflow', 'name')) |
207 self.assertFalse(self.index_exists('Workflow', 'name')) |
208 finally: |
208 finally: |
209 self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"') |
209 self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"') |
210 self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) |
210 self.assertFalse(self.schema['name'].rdef('Workflow', 'String').indexed) |
211 self.failIf(self.index_exists('Workflow', 'name')) |
211 self.assertFalse(self.index_exists('Workflow', 'name')) |
212 self.commit() |
212 self.commit() |
213 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
213 self.assertTrue(self.schema['name'].rdef('Workflow', 'String').indexed) |
214 self.failUnless(self.index_exists('Workflow', 'name')) |
214 self.assertTrue(self.index_exists('Workflow', 'name')) |
215 |
215 |
216 def test_unique_change(self): |
216 def test_unique_change(self): |
217 self.session.set_cnxset() |
217 self.session.set_cnxset() |
218 dbhelper = self.session.cnxset.source('system').dbhelper |
218 dbhelper = self.session.cnxset.source('system').dbhelper |
219 sqlcursor = self.session.cnxset['system'] |
219 sqlcursor = self.session.cnxset['system'] |
220 try: |
220 try: |
221 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
221 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
222 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
222 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
223 'RT name "name", E name "Workflow"') |
223 'RT name "name", E name "Workflow"') |
224 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
224 self.assertFalse(self.schema['Workflow'].has_unique_values('name')) |
225 self.failIf(self.index_exists('Workflow', 'name', unique=True)) |
225 self.assertFalse(self.index_exists('Workflow', 'name', unique=True)) |
226 self.commit() |
226 self.commit() |
227 self.failUnless(self.schema['Workflow'].has_unique_values('name')) |
227 self.assertTrue(self.schema['Workflow'].has_unique_values('name')) |
228 self.failUnless(self.index_exists('Workflow', 'name', unique=True)) |
228 self.assertTrue(self.index_exists('Workflow', 'name', unique=True)) |
229 finally: |
229 finally: |
230 self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, ' |
230 self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, ' |
231 'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
231 'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
232 'RT name "name", E name "Workflow"') |
232 'RT name "name", E name "Workflow"') |
233 self.failUnless(self.schema['Workflow'].has_unique_values('name')) |
233 self.assertTrue(self.schema['Workflow'].has_unique_values('name')) |
234 self.failUnless(self.index_exists('Workflow', 'name', unique=True)) |
234 self.assertTrue(self.index_exists('Workflow', 'name', unique=True)) |
235 self.commit() |
235 self.commit() |
236 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
236 self.assertFalse(self.schema['Workflow'].has_unique_values('name')) |
237 self.failIf(self.index_exists('Workflow', 'name', unique=True)) |
237 self.assertFalse(self.index_exists('Workflow', 'name', unique=True)) |
238 |
238 |
239 def test_required_change_1(self): |
239 def test_required_change_1(self): |
240 self.execute('SET DEF cardinality "?1" ' |
240 self.execute('SET DEF cardinality "?1" ' |
241 'WHERE DEF relation_type RT, DEF from_entity E,' |
241 'WHERE DEF relation_type RT, DEF from_entity E,' |
242 'RT name "title", E name "Bookmark"') |
242 'RT name "title", E name "Bookmark"') |