--- a/hooks/test/unittest_hooks.py Tue Dec 08 19:16:56 2009 +0100
+++ b/hooks/test/unittest_hooks.py Tue Dec 08 19:17:52 2009 +0100
@@ -112,31 +112,34 @@
self.assertEquals(rset.get_entity(0, 0).reverse_parts[0].messageid, '<2345>')
def test_unsatisfied_constraints(self):
- self.execute('INSERT CWRelation X: X from_entity FE, X relation_type RT, X to_entity TE '
- 'WHERE FE name "CWUser", RT name "in_group", TE name "String"')
+ releid = self.execute('INSERT CWRelation X: X from_entity FE, X relation_type RT, X to_entity TE '
+ 'WHERE FE name "CWUser", RT name "in_group", TE name "String"')[0][0]
+ self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"',
+ {'x': releid}, 'x')
ex = self.assertRaises(ValidationError,
self.commit)
- self.assertEquals(str(ex), '612 (to_entity): constraint O final FALSE failed')
+ self.assertEquals(ex.errors, {'to_entity': 'RQLConstraint O final FALSE failed'})
def test_html_tidy_hook(self):
- entity = self.add_entity('Workflow', name=u'wf1', description_format=u'text/html',
+ req = self.request()
+ entity = req.create_entity('Workflow', name=u'wf1', description_format=u'text/html',
description=u'yo')
self.assertEquals(entity.description, u'yo')
- entity = self.add_entity('Workflow', name=u'wf2', description_format=u'text/html',
+ entity = req.create_entity('Workflow', name=u'wf2', description_format=u'text/html',
description=u'<b>yo')
self.assertEquals(entity.description, u'<b>yo</b>')
- entity = self.add_entity('Workflow', name=u'wf3', description_format=u'text/html',
+ entity = req.create_entity('Workflow', name=u'wf3', description_format=u'text/html',
description=u'<b>yo</b>')
self.assertEquals(entity.description, u'<b>yo</b>')
- entity = self.add_entity('Workflow', name=u'wf4', description_format=u'text/html',
+ entity = req.create_entity('Workflow', name=u'wf4', description_format=u'text/html',
description=u'<b>R&D</b>')
self.assertEquals(entity.description, u'<b>R&D</b>')
- entity = self.add_entity('Workflow', name=u'wf5', description_format=u'text/html',
+ entity = req.create_entity('Workflow', name=u'wf5', description_format=u'text/html',
description=u"<div>c'est <b>l'été")
self.assertEquals(entity.description, u"<div>c'est <b>l'été</b></div>")
def test_nonregr_html_tidy_hook_no_update(self):
- entity = self.add_entity('Workflow', name=u'wf1', description_format=u'text/html',
+ entity = self.request().create_entity('Workflow', name=u'wf1', description_format=u'text/html',
description=u'yo')
entity.set_attributes(name=u'wf2')
self.assertEquals(entity.description, u'yo')
@@ -146,23 +149,23 @@
def test_metadata_cwuri(self):
- entity = self.add_entity('Workflow', name=u'wf1')
+ entity = self.request().create_entity('Workflow', name=u'wf1')
self.assertEquals(entity.cwuri, self.repo.config['base-url'] + 'eid/%s' % entity.eid)
def test_metadata_creation_modification_date(self):
_now = datetime.now()
- entity = self.add_entity('Workflow', name=u'wf1')
+ entity = self.request().create_entity('Workflow', name=u'wf1')
self.assertEquals((entity.creation_date - _now).seconds, 0)
self.assertEquals((entity.modification_date - _now).seconds, 0)
def test_metadata_created_by(self):
- entity = self.add_entity('Bookmark', title=u'wf1', path=u'/view')
+ entity = self.request().create_entity('Bookmark', title=u'wf1', path=u'/view')
self.commit() # fire operations
self.assertEquals(len(entity.created_by), 1) # make sure we have only one creator
self.assertEquals(entity.created_by[0].eid, self.session.user.eid)
def test_metadata_owned_by(self):
- entity = self.add_entity('Bookmark', title=u'wf1', path=u'/view')
+ entity = self.request().create_entity('Bookmark', title=u'wf1', path=u'/view')
self.commit() # fire operations
self.assertEquals(len(entity.owned_by), 1) # make sure we have only one owner
self.assertEquals(entity.owned_by[0].eid, self.session.user.eid)
@@ -281,6 +284,14 @@
sqlcursor = self.session.pool['system']
return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique)
+ def _set_perms(self, eid):
+ self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup',
+ {'x': eid}, 'x')
+ self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"',
+ {'x': eid}, 'x')
+ self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"',
+ {'x': eid}, 'x')
+
def test_base(self):
schema = self.repo.schema
self.session.set_pool()
@@ -289,25 +300,23 @@
self.failIf(schema.has_entity('Societe2'))
self.failIf(schema.has_entity('concerne2'))
# schema should be update on insertion (after commit)
- self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')
+ eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0]
+ self._set_perms(eeid)
self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symetric FALSE')
self.failIf(schema.has_entity('Societe2'))
self.failIf(schema.has_entity('concerne2'))
- self.execute('SET X read_permission G WHERE X is CWEType, X name "Societe2", G is CWGroup')
- self.execute('SET X read_permission G WHERE X is CWRType, X name "concerne2", G is CWGroup')
- self.execute('SET X add_permission G WHERE X is CWEType, X name "Societe2", G is CWGroup, G name "managers"')
- self.execute('SET X add_permission G WHERE X is CWRType, X name "concerne2", G is CWGroup, G name "managers"')
- self.execute('SET X delete_permission G WHERE X is CWEType, X name "Societe2", G is CWGroup, G name "owners"')
- self.execute('SET X delete_permission G WHERE X is CWRType, X name "concerne2", G is CWGroup, G name "owners"')
# have to commit before adding definition relations
self.commit()
self.failUnless(schema.has_entity('Societe2'))
self.failUnless(schema.has_relation('concerne2'))
- self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
- 'WHERE RT name "name", E name "Societe2", F name "String"')
+ attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", '
+ ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
+ 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0]
+ self._set_perms(attreid)
concerne2_rdef_eid = self.execute(
'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
'WHERE RT name "concerne2", E name "Societe2"')[0][0]
+ self._set_perms(concerne2_rdef_eid)
self.failIf('name' in schema['Societe2'].subject_relations())
self.failIf('concerne2' in schema['Societe2'].subject_relations())
self.failIf(self.index_exists('Societe2', 'name'))
@@ -322,8 +331,10 @@
rset = self.execute('Any X WHERE X concerne2 Y')
self.assertEquals(rset.rows, [[s2eid]])
# check that when a relation definition is deleted, existing relations are deleted
- self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E '
- 'WHERE RT name "concerne2", E name "CWUser"')
+ rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, '
+ ' X from_entity E, X to_entity E '
+ 'WHERE RT name "concerne2", E name "CWUser"')[0][0]
+ self._set_perms(rdefeid)
self.commit()
self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x')
self.commit()
@@ -482,13 +493,15 @@
def test_add_attribute_to_base_class(self):
- self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
- 'WHERE RT name "nom", E name "BaseTransition", F name "String"')
+ attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F '
+ 'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0]
+ assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"',
+ {'x': attreid}, 'x')
self.commit()
self.schema.rebuild_infered_relations()
- self.failUnless('Transition' in self.schema['nom'].subjects())
- self.failUnless('WorkflowTransition' in self.schema['nom'].subjects())
- self.execute('Any X WHERE X is_instance_of BaseTransition, X nom "hop"')
+ self.failUnless('Transition' in self.schema['messageid'].subjects())
+ self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects())
+ self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"')
if __name__ == '__main__':
unittest_main()