96 # now we should be able to insert and query Societe2 |
96 # now we should be able to insert and query Societe2 |
97 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
97 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
98 self.execute('Societe2 X WHERE X name "logilab"') |
98 self.execute('Societe2 X WHERE X name "logilab"') |
99 self.execute('SET X concerne2 X WHERE X name "logilab"') |
99 self.execute('SET X concerne2 X WHERE X name "logilab"') |
100 rset = self.execute('Any X WHERE X concerne2 Y') |
100 rset = self.execute('Any X WHERE X concerne2 Y') |
101 self.assertEquals(rset.rows, [[s2eid]]) |
101 self.assertEqual(rset.rows, [[s2eid]]) |
102 # check that when a relation definition is deleted, existing relations are deleted |
102 # check that when a relation definition is deleted, existing relations are deleted |
103 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
103 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
104 ' X from_entity E, X to_entity E ' |
104 ' X from_entity E, X to_entity E ' |
105 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
105 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
106 self._set_perms(rdefeid) |
106 self._set_perms(rdefeid) |
123 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
123 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
124 |
124 |
125 def test_is_instance_of_insertions(self): |
125 def test_is_instance_of_insertions(self): |
126 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
126 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
127 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
127 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
128 self.assertEquals(is_etypes, ['Transition']) |
128 self.assertEqual(is_etypes, ['Transition']) |
129 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
129 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
130 self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
130 self.assertEqual(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
131 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
131 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
132 self.failIf('subdiv' in snames) |
132 self.failIf('subdiv' in snames) |
133 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
133 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
134 self.failUnless('subdiv' in snames) |
134 self.failUnless('subdiv' in snames) |
135 |
135 |
136 |
136 |
137 def test_perms_synchronization_1(self): |
137 def test_perms_synchronization_1(self): |
138 schema = self.repo.schema |
138 schema = self.repo.schema |
139 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
139 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
140 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
140 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
141 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
141 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
142 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
142 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
143 self.commit() |
143 self.commit() |
144 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers',))) |
144 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',))) |
145 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
145 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
146 self.commit() |
146 self.commit() |
147 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',))) |
147 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users',))) |
148 |
148 |
149 def test_perms_synchronization_2(self): |
149 def test_perms_synchronization_2(self): |
150 schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')] |
150 schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')] |
151 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
151 self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
152 self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
152 self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
153 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
153 self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
154 self.commit() |
154 self.commit() |
155 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
155 self.assertEqual(schema.get_groups('read'), set(('managers', 'users'))) |
156 self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
156 self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
157 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
157 self.assertEqual(schema.get_groups('read'), set(('managers', 'users'))) |
158 self.commit() |
158 self.commit() |
159 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
159 self.assertEqual(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
160 |
160 |
161 def test_nonregr_user_edit_itself(self): |
161 def test_nonregr_user_edit_itself(self): |
162 ueid = self.session.user.eid |
162 ueid = self.session.user.eid |
163 groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] |
163 groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] |
164 self.execute('DELETE X in_group Y WHERE X eid %s' % ueid) |
164 self.execute('DELETE X in_group Y WHERE X eid %s' % ueid) |
185 self.failUnless(self.schema['state_of'].inlined) |
185 self.failUnless(self.schema['state_of'].inlined) |
186 self.commit() |
186 self.commit() |
187 self.failIf(self.schema['state_of'].inlined) |
187 self.failIf(self.schema['state_of'].inlined) |
188 self.failIf(self.index_exists('State', 'state_of')) |
188 self.failIf(self.index_exists('State', 'state_of')) |
189 rset = self.execute('Any X, Y WHERE X state_of Y') |
189 rset = self.execute('Any X, Y WHERE X state_of Y') |
190 self.assertEquals(len(rset), 2) # user states |
190 self.assertEqual(len(rset), 2) # user states |
191 except: |
191 except: |
192 import traceback |
192 import traceback |
193 traceback.print_exc() |
193 traceback.print_exc() |
194 finally: |
194 finally: |
195 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
195 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
196 self.failIf(self.schema['state_of'].inlined) |
196 self.failIf(self.schema['state_of'].inlined) |
197 self.commit() |
197 self.commit() |
198 self.failUnless(self.schema['state_of'].inlined) |
198 self.failUnless(self.schema['state_of'].inlined) |
199 self.failUnless(self.index_exists('State', 'state_of')) |
199 self.failUnless(self.index_exists('State', 'state_of')) |
200 rset = self.execute('Any X, Y WHERE X state_of Y') |
200 rset = self.execute('Any X, Y WHERE X state_of Y') |
201 self.assertEquals(len(rset), 2) |
201 self.assertEqual(len(rset), 2) |
202 |
202 |
203 def test_indexed_change(self): |
203 def test_indexed_change(self): |
204 self.session.set_pool() |
204 self.session.set_pool() |
205 dbhelper = self.session.pool.source('system').dbhelper |
205 dbhelper = self.session.pool.source('system').dbhelper |
206 sqlcursor = self.session.pool['system'] |
206 sqlcursor = self.session.pool['system'] |
315 |
315 |
316 def test_update_constraint(self): |
316 def test_update_constraint(self): |
317 rdef = self.schema['Transition'].rdef('type') |
317 rdef = self.schema['Transition'].rdef('type') |
318 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
318 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
319 if not getattr(cstr, 'eid', None): |
319 if not getattr(cstr, 'eid', None): |
320 self.skip('start me alone') # bug in schema reloading, constraint's eid not restored |
320 self.skipTest('start me alone') # bug in schema reloading, constraint's eid not restored |
321 self.execute('SET X value %(v)s WHERE X eid %(x)s', |
321 self.execute('SET X value %(v)s WHERE X eid %(x)s', |
322 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
322 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
323 self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X ' |
323 self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X ' |
324 'WHERE CT name %(ct)s, EDEF eid %(x)s', |
324 'WHERE CT name %(ct)s, EDEF eid %(x)s', |
325 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
325 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
326 self.commit() |
326 self.commit() |
327 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
327 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
328 self.assertEquals(cstr.values, (u'normal', u'auto', u'new')) |
328 self.assertEqual(cstr.values, (u'normal', u'auto', u'new')) |
329 self.execute('INSERT Transition T: T name "hop", T type "new"') |
329 self.execute('INSERT Transition T: T name "hop", T type "new"') |
330 |
330 |
331 if __name__ == '__main__': |
331 if __name__ == '__main__': |
332 unittest_main() |
332 unittest_main() |