30 sqlcursor = self.session.pool['system'] |
30 sqlcursor = self.session.pool['system'] |
31 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
31 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
32 |
32 |
33 def _set_perms(self, eid): |
33 def _set_perms(self, eid): |
34 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
34 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
35 {'x': eid}, 'x') |
35 {'x': eid}) |
36 self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
36 self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
37 {'x': eid}, 'x') |
37 {'x': eid}) |
38 self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"', |
38 self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"', |
39 {'x': eid}, 'x') |
39 {'x': eid}) |
40 |
40 |
41 def _set_attr_perms(self, eid): |
41 def _set_attr_perms(self, eid): |
42 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
42 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
43 {'x': eid}, 'x') |
43 {'x': eid}) |
44 self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
44 self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
45 {'x': eid}, 'x') |
45 {'x': eid}) |
46 |
46 |
47 def test_base(self): |
47 def test_base(self): |
48 schema = self.repo.schema |
48 schema = self.repo.schema |
49 self.session.set_pool() |
49 self.session.set_pool() |
50 dbhelper = self.session.pool.source('system').dbhelper |
50 dbhelper = self.session.pool.source('system').dbhelper |
86 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
86 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
87 ' X from_entity E, X to_entity E ' |
87 ' X from_entity E, X to_entity E ' |
88 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
88 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
89 self._set_perms(rdefeid) |
89 self._set_perms(rdefeid) |
90 self.commit() |
90 self.commit() |
91 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x') |
91 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}) |
92 self.commit() |
92 self.commit() |
93 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
93 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
94 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
94 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
95 self.failIf(self.execute('Any X WHERE X concerne2 Y')) |
95 self.failIf(self.execute('Any X WHERE X concerne2 Y')) |
96 # schema should be cleaned on delete (after commit) |
96 # schema should be cleaned on delete (after commit) |
246 |
246 |
247 def test_add_attribute_to_base_class(self): |
247 def test_add_attribute_to_base_class(self): |
248 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
248 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
249 'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0] |
249 'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0] |
250 assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
250 assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
251 {'x': attreid}, 'x') |
251 {'x': attreid}) |
252 self.commit() |
252 self.commit() |
253 self.schema.rebuild_infered_relations() |
253 self.schema.rebuild_infered_relations() |
254 self.failUnless('Transition' in self.schema['messageid'].subjects()) |
254 self.failUnless('Transition' in self.schema['messageid'].subjects()) |
255 self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects()) |
255 self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects()) |
256 self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"') |
256 self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"') |
297 rdef = self.schema['Transition'].rdef('type') |
297 rdef = self.schema['Transition'].rdef('type') |
298 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
298 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
299 if not getattr(cstr, 'eid', None): |
299 if not getattr(cstr, 'eid', None): |
300 self.skip('start me alone') # bug in schema reloading, constraint's eid not restored |
300 self.skip('start me alone') # bug in schema reloading, constraint's eid not restored |
301 self.execute('SET X value %(v)s WHERE X eid %(x)s', |
301 self.execute('SET X value %(v)s WHERE X eid %(x)s', |
302 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}, 'x') |
302 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
303 self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X ' |
303 self.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, EDEF constrained_by X ' |
304 'WHERE CT name %(ct)s, EDEF eid %(x)s', |
304 'WHERE CT name %(ct)s, EDEF eid %(x)s', |
305 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}, 'x') |
305 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
306 self.commit() |
306 self.commit() |
307 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
307 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
308 self.assertEquals(cstr.values, (u'normal', u'auto', u'new')) |
308 self.assertEquals(cstr.values, (u'normal', u'auto', u'new')) |
309 self.execute('INSERT Transition T: T name "hop", T type "new"') |
309 self.execute('INSERT Transition T: T name "hop", T type "new"') |
310 |
310 |