110 rset = self.execute('Any X WHERE X is EmailPart') |
110 rset = self.execute('Any X WHERE X is EmailPart') |
111 self.assertEquals(len(rset), 1) |
111 self.assertEquals(len(rset), 1) |
112 self.assertEquals(rset.get_entity(0, 0).reverse_parts[0].messageid, '<2345>') |
112 self.assertEquals(rset.get_entity(0, 0).reverse_parts[0].messageid, '<2345>') |
113 |
113 |
114 def test_unsatisfied_constraints(self): |
114 def test_unsatisfied_constraints(self): |
115 self.execute('INSERT CWRelation X: X from_entity FE, X relation_type RT, X to_entity TE ' |
115 releid = self.execute('INSERT CWRelation X: X from_entity FE, X relation_type RT, X to_entity TE ' |
116 'WHERE FE name "CWUser", RT name "in_group", TE name "String"') |
116 'WHERE FE name "CWUser", RT name "in_group", TE name "String"')[0][0] |
|
117 self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
|
118 {'x': releid}, 'x') |
117 ex = self.assertRaises(ValidationError, |
119 ex = self.assertRaises(ValidationError, |
118 self.commit) |
120 self.commit) |
119 self.assertEquals(str(ex), '612 (to_entity): constraint O final FALSE failed') |
121 self.assertEquals(ex.errors, {'to_entity': 'RQLConstraint O final FALSE failed'}) |
120 |
122 |
121 def test_html_tidy_hook(self): |
123 def test_html_tidy_hook(self): |
122 entity = self.add_entity('Workflow', name=u'wf1', description_format=u'text/html', |
124 req = self.request() |
|
125 entity = req.create_entity('Workflow', name=u'wf1', description_format=u'text/html', |
123 description=u'yo') |
126 description=u'yo') |
124 self.assertEquals(entity.description, u'yo') |
127 self.assertEquals(entity.description, u'yo') |
125 entity = self.add_entity('Workflow', name=u'wf2', description_format=u'text/html', |
128 entity = req.create_entity('Workflow', name=u'wf2', description_format=u'text/html', |
126 description=u'<b>yo') |
129 description=u'<b>yo') |
127 self.assertEquals(entity.description, u'<b>yo</b>') |
130 self.assertEquals(entity.description, u'<b>yo</b>') |
128 entity = self.add_entity('Workflow', name=u'wf3', description_format=u'text/html', |
131 entity = req.create_entity('Workflow', name=u'wf3', description_format=u'text/html', |
129 description=u'<b>yo</b>') |
132 description=u'<b>yo</b>') |
130 self.assertEquals(entity.description, u'<b>yo</b>') |
133 self.assertEquals(entity.description, u'<b>yo</b>') |
131 entity = self.add_entity('Workflow', name=u'wf4', description_format=u'text/html', |
134 entity = req.create_entity('Workflow', name=u'wf4', description_format=u'text/html', |
132 description=u'<b>R&D</b>') |
135 description=u'<b>R&D</b>') |
133 self.assertEquals(entity.description, u'<b>R&D</b>') |
136 self.assertEquals(entity.description, u'<b>R&D</b>') |
134 entity = self.add_entity('Workflow', name=u'wf5', description_format=u'text/html', |
137 entity = req.create_entity('Workflow', name=u'wf5', description_format=u'text/html', |
135 description=u"<div>c'est <b>l'été") |
138 description=u"<div>c'est <b>l'été") |
136 self.assertEquals(entity.description, u"<div>c'est <b>l'été</b></div>") |
139 self.assertEquals(entity.description, u"<div>c'est <b>l'été</b></div>") |
137 |
140 |
138 def test_nonregr_html_tidy_hook_no_update(self): |
141 def test_nonregr_html_tidy_hook_no_update(self): |
139 entity = self.add_entity('Workflow', name=u'wf1', description_format=u'text/html', |
142 entity = self.request().create_entity('Workflow', name=u'wf1', description_format=u'text/html', |
140 description=u'yo') |
143 description=u'yo') |
141 entity.set_attributes(name=u'wf2') |
144 entity.set_attributes(name=u'wf2') |
142 self.assertEquals(entity.description, u'yo') |
145 self.assertEquals(entity.description, u'yo') |
143 entity.set_attributes(description=u'R&D<p>yo') |
146 entity.set_attributes(description=u'R&D<p>yo') |
144 entity.pop('description') |
147 entity.pop('description') |
145 self.assertEquals(entity.description, u'R&D<p>yo</p>') |
148 self.assertEquals(entity.description, u'R&D<p>yo</p>') |
146 |
149 |
147 |
150 |
148 def test_metadata_cwuri(self): |
151 def test_metadata_cwuri(self): |
149 entity = self.add_entity('Workflow', name=u'wf1') |
152 entity = self.request().create_entity('Workflow', name=u'wf1') |
150 self.assertEquals(entity.cwuri, self.repo.config['base-url'] + 'eid/%s' % entity.eid) |
153 self.assertEquals(entity.cwuri, self.repo.config['base-url'] + 'eid/%s' % entity.eid) |
151 |
154 |
152 def test_metadata_creation_modification_date(self): |
155 def test_metadata_creation_modification_date(self): |
153 _now = datetime.now() |
156 _now = datetime.now() |
154 entity = self.add_entity('Workflow', name=u'wf1') |
157 entity = self.request().create_entity('Workflow', name=u'wf1') |
155 self.assertEquals((entity.creation_date - _now).seconds, 0) |
158 self.assertEquals((entity.creation_date - _now).seconds, 0) |
156 self.assertEquals((entity.modification_date - _now).seconds, 0) |
159 self.assertEquals((entity.modification_date - _now).seconds, 0) |
157 |
160 |
158 def test_metadata_created_by(self): |
161 def test_metadata_created_by(self): |
159 entity = self.add_entity('Bookmark', title=u'wf1', path=u'/view') |
162 entity = self.request().create_entity('Bookmark', title=u'wf1', path=u'/view') |
160 self.commit() # fire operations |
163 self.commit() # fire operations |
161 self.assertEquals(len(entity.created_by), 1) # make sure we have only one creator |
164 self.assertEquals(len(entity.created_by), 1) # make sure we have only one creator |
162 self.assertEquals(entity.created_by[0].eid, self.session.user.eid) |
165 self.assertEquals(entity.created_by[0].eid, self.session.user.eid) |
163 |
166 |
164 def test_metadata_owned_by(self): |
167 def test_metadata_owned_by(self): |
165 entity = self.add_entity('Bookmark', title=u'wf1', path=u'/view') |
168 entity = self.request().create_entity('Bookmark', title=u'wf1', path=u'/view') |
166 self.commit() # fire operations |
169 self.commit() # fire operations |
167 self.assertEquals(len(entity.owned_by), 1) # make sure we have only one owner |
170 self.assertEquals(len(entity.owned_by), 1) # make sure we have only one owner |
168 self.assertEquals(entity.owned_by[0].eid, self.session.user.eid) |
171 self.assertEquals(entity.owned_by[0].eid, self.session.user.eid) |
169 |
172 |
170 def test_user_login_stripped(self): |
173 def test_user_login_stripped(self): |
279 self.session.set_pool() |
282 self.session.set_pool() |
280 dbhelper = self.session.pool.source('system').dbhelper |
283 dbhelper = self.session.pool.source('system').dbhelper |
281 sqlcursor = self.session.pool['system'] |
284 sqlcursor = self.session.pool['system'] |
282 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
285 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
283 |
286 |
|
287 def _set_perms(self, eid): |
|
288 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
|
289 {'x': eid}, 'x') |
|
290 self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
|
291 {'x': eid}, 'x') |
|
292 self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"', |
|
293 {'x': eid}, 'x') |
|
294 |
284 def test_base(self): |
295 def test_base(self): |
285 schema = self.repo.schema |
296 schema = self.repo.schema |
286 self.session.set_pool() |
297 self.session.set_pool() |
287 dbhelper = self.session.pool.source('system').dbhelper |
298 dbhelper = self.session.pool.source('system').dbhelper |
288 sqlcursor = self.session.pool['system'] |
299 sqlcursor = self.session.pool['system'] |
289 self.failIf(schema.has_entity('Societe2')) |
300 self.failIf(schema.has_entity('Societe2')) |
290 self.failIf(schema.has_entity('concerne2')) |
301 self.failIf(schema.has_entity('concerne2')) |
291 # schema should be update on insertion (after commit) |
302 # schema should be update on insertion (after commit) |
292 self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE') |
303 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
|
304 self._set_perms(eeid) |
293 self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symetric FALSE') |
305 self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symetric FALSE') |
294 self.failIf(schema.has_entity('Societe2')) |
306 self.failIf(schema.has_entity('Societe2')) |
295 self.failIf(schema.has_entity('concerne2')) |
307 self.failIf(schema.has_entity('concerne2')) |
296 self.execute('SET X read_permission G WHERE X is CWEType, X name "Societe2", G is CWGroup') |
|
297 self.execute('SET X read_permission G WHERE X is CWRType, X name "concerne2", G is CWGroup') |
|
298 self.execute('SET X add_permission G WHERE X is CWEType, X name "Societe2", G is CWGroup, G name "managers"') |
|
299 self.execute('SET X add_permission G WHERE X is CWRType, X name "concerne2", G is CWGroup, G name "managers"') |
|
300 self.execute('SET X delete_permission G WHERE X is CWEType, X name "Societe2", G is CWGroup, G name "owners"') |
|
301 self.execute('SET X delete_permission G WHERE X is CWRType, X name "concerne2", G is CWGroup, G name "owners"') |
|
302 # have to commit before adding definition relations |
308 # have to commit before adding definition relations |
303 self.commit() |
309 self.commit() |
304 self.failUnless(schema.has_entity('Societe2')) |
310 self.failUnless(schema.has_entity('Societe2')) |
305 self.failUnless(schema.has_relation('concerne2')) |
311 self.failUnless(schema.has_relation('concerne2')) |
306 self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
312 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", ' |
307 'WHERE RT name "name", E name "Societe2", F name "String"') |
313 ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
|
314 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0] |
|
315 self._set_perms(attreid) |
308 concerne2_rdef_eid = self.execute( |
316 concerne2_rdef_eid = self.execute( |
309 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
317 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
310 'WHERE RT name "concerne2", E name "Societe2"')[0][0] |
318 'WHERE RT name "concerne2", E name "Societe2"')[0][0] |
|
319 self._set_perms(concerne2_rdef_eid) |
311 self.failIf('name' in schema['Societe2'].subject_relations()) |
320 self.failIf('name' in schema['Societe2'].subject_relations()) |
312 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
321 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
313 self.failIf(self.index_exists('Societe2', 'name')) |
322 self.failIf(self.index_exists('Societe2', 'name')) |
314 self.commit() |
323 self.commit() |
315 self.failUnless('name' in schema['Societe2'].subject_relations()) |
324 self.failUnless('name' in schema['Societe2'].subject_relations()) |
320 self.execute('Societe2 X WHERE X name "logilab"') |
329 self.execute('Societe2 X WHERE X name "logilab"') |
321 self.execute('SET X concerne2 X WHERE X name "logilab"') |
330 self.execute('SET X concerne2 X WHERE X name "logilab"') |
322 rset = self.execute('Any X WHERE X concerne2 Y') |
331 rset = self.execute('Any X WHERE X concerne2 Y') |
323 self.assertEquals(rset.rows, [[s2eid]]) |
332 self.assertEquals(rset.rows, [[s2eid]]) |
324 # check that when a relation definition is deleted, existing relations are deleted |
333 # check that when a relation definition is deleted, existing relations are deleted |
325 self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
334 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
326 'WHERE RT name "concerne2", E name "CWUser"') |
335 ' X from_entity E, X to_entity E ' |
|
336 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
|
337 self._set_perms(rdefeid) |
327 self.commit() |
338 self.commit() |
328 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x') |
339 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x') |
329 self.commit() |
340 self.commit() |
330 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
341 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
331 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
342 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
480 'RT name "surname", E name "CWUser"') |
491 'RT name "surname", E name "CWUser"') |
481 self.commit() |
492 self.commit() |
482 |
493 |
483 |
494 |
484 def test_add_attribute_to_base_class(self): |
495 def test_add_attribute_to_base_class(self): |
485 self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
496 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
486 'WHERE RT name "nom", E name "BaseTransition", F name "String"') |
497 'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0] |
|
498 assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
|
499 {'x': attreid}, 'x') |
487 self.commit() |
500 self.commit() |
488 self.schema.rebuild_infered_relations() |
501 self.schema.rebuild_infered_relations() |
489 self.failUnless('Transition' in self.schema['nom'].subjects()) |
502 self.failUnless('Transition' in self.schema['messageid'].subjects()) |
490 self.failUnless('WorkflowTransition' in self.schema['nom'].subjects()) |
503 self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects()) |
491 self.execute('Any X WHERE X is_instance_of BaseTransition, X nom "hop"') |
504 self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"') |
492 |
505 |
493 if __name__ == '__main__': |
506 if __name__ == '__main__': |
494 unittest_main() |
507 unittest_main() |