96 # now we should be able to insert and query Societe2 |
98 # now we should be able to insert and query Societe2 |
97 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
99 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
98 self.execute('Societe2 X WHERE X name "logilab"') |
100 self.execute('Societe2 X WHERE X name "logilab"') |
99 self.execute('SET X concerne2 X WHERE X name "logilab"') |
101 self.execute('SET X concerne2 X WHERE X name "logilab"') |
100 rset = self.execute('Any X WHERE X concerne2 Y') |
102 rset = self.execute('Any X WHERE X concerne2 Y') |
101 self.assertEquals(rset.rows, [[s2eid]]) |
103 self.assertEqual(rset.rows, [[s2eid]]) |
102 # check that when a relation definition is deleted, existing relations are deleted |
104 # check that when a relation definition is deleted, existing relations are deleted |
103 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
105 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
104 ' X from_entity E, X to_entity E ' |
106 ' X from_entity E, X to_entity E ' |
105 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
107 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
106 self._set_perms(rdefeid) |
108 self._set_perms(rdefeid) |
123 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
125 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
124 |
126 |
125 def test_is_instance_of_insertions(self): |
127 def test_is_instance_of_insertions(self): |
126 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
128 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
127 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
129 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
128 self.assertEquals(is_etypes, ['Transition']) |
130 self.assertEqual(is_etypes, ['Transition']) |
129 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
131 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
130 self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
132 self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
131 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
133 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
132 self.failIf('subdiv' in snames) |
134 self.failIf('subdiv' in snames) |
133 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
135 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
134 self.failUnless('subdiv' in snames) |
136 self.failUnless('subdiv' in snames) |
135 |
137 |
136 |
138 |
137 def test_perms_synchronization_1(self): |
139 def test_perms_synchronization_1(self): |
138 schema = self.repo.schema |
140 schema = self.repo.schema |
139 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
141 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
140 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
142 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
141 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
143 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
142 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
144 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
143 self.commit() |
145 self.commit() |
144 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers',))) |
146 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',))) |
145 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
147 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
146 self.commit() |
148 self.commit() |
147 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',))) |
149 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users',))) |
148 |
150 |
149 def test_perms_synchronization_2(self): |
151 def test_perms_synchronization_2(self): |
150 schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')] |
152 schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')] |
151 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
153 self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
152 self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
154 self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
153 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
155 self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
154 self.commit() |
156 self.commit() |
155 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
157 self.assertEqual(schema.get_groups('read'), set(('managers', 'users'))) |
156 self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
158 self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
157 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
159 self.assertEqual(schema.get_groups('read'), set(('managers', 'users'))) |
158 self.commit() |
160 self.commit() |
159 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
161 self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
160 |
162 |
161 def test_nonregr_user_edit_itself(self): |
163 def test_nonregr_user_edit_itself(self): |
162 ueid = self.session.user.eid |
164 ueid = self.session.user.eid |
163 groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] |
165 groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] |
164 self.execute('DELETE X in_group Y WHERE X eid %s' % ueid) |
166 self.execute('DELETE X in_group Y WHERE X eid %s' % ueid) |
185 self.failUnless(self.schema['state_of'].inlined) |
187 self.failUnless(self.schema['state_of'].inlined) |
186 self.commit() |
188 self.commit() |
187 self.failIf(self.schema['state_of'].inlined) |
189 self.failIf(self.schema['state_of'].inlined) |
188 self.failIf(self.index_exists('State', 'state_of')) |
190 self.failIf(self.index_exists('State', 'state_of')) |
189 rset = self.execute('Any X, Y WHERE X state_of Y') |
191 rset = self.execute('Any X, Y WHERE X state_of Y') |
190 self.assertEquals(len(rset), 2) # user states |
192 self.assertEqual(len(rset), 2) # user states |
|
193 except: |
|
194 import traceback |
|
195 traceback.print_exc() |
191 finally: |
196 finally: |
192 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
197 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
193 self.failIf(self.schema['state_of'].inlined) |
198 self.failIf(self.schema['state_of'].inlined) |
194 self.commit() |
199 self.commit() |
195 self.failUnless(self.schema['state_of'].inlined) |
200 self.failUnless(self.schema['state_of'].inlined) |
196 self.failUnless(self.index_exists('State', 'state_of')) |
201 self.failUnless(self.index_exists('State', 'state_of')) |
197 rset = self.execute('Any X, Y WHERE X state_of Y') |
202 rset = self.execute('Any X, Y WHERE X state_of Y') |
198 self.assertEquals(len(rset), 2) |
203 self.assertEqual(len(rset), 2) |
199 |
204 |
200 def test_indexed_change(self): |
205 def test_indexed_change(self): |
201 self.session.set_pool() |
206 self.session.set_pool() |
202 dbhelper = self.session.pool.source('system').dbhelper |
207 dbhelper = self.session.pool.source('system').dbhelper |
203 sqlcursor = self.session.pool['system'] |
208 sqlcursor = self.session.pool['system'] |
312 |
318 |
313 def test_update_constraint(self): |
319 def test_update_constraint(self): |
314 rdef = self.schema['Transition'].rdef('type') |
320 rdef = self.schema['Transition'].rdef('type') |
315 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
321 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
316 if not getattr(cstr, 'eid', None): |
322 if not getattr(cstr, 'eid', None): |
317 self.skip('start me alone') # bug in schema reloading, constraint's eid not restored |
323 self.skipTest('start me alone') # bug in schema reloading, constraint's eid not restored |
318 self.execute('SET X value %(v)s WHERE X eid %(x)s', |
324 self.execute('SET X value %(v)s WHERE X eid %(x)s', |
319 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
325 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
320 self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X ' |
326 self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X ' |
321 'WHERE CT name %(ct)s, EDEF eid %(x)s', |
327 'WHERE CT name %(ct)s, EDEF eid %(x)s', |
322 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
328 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
323 self.commit() |
329 self.commit() |
324 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
330 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
325 self.assertEquals(cstr.values, (u'normal', u'auto', u'new')) |
331 self.assertEqual(cstr.values, (u'normal', u'auto', u'new')) |
326 self.execute('INSERT Transition T: T name "hop", T type "new"') |
332 self.execute('INSERT Transition T: T name "hop", T type "new"') |
327 |
333 |
328 if __name__ == '__main__': |
334 if __name__ == '__main__': |
329 unittest_main() |
335 unittest_main() |