15 # |
15 # |
16 # You should have received a copy of the GNU Lesser General Public License along |
16 # You should have received a copy of the GNU Lesser General Public License along |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """cubicweb.server.hooks.syncschema unit and functional tests""" |
18 """cubicweb.server.hooks.syncschema unit and functional tests""" |
19 |
19 |
20 from logilab.common.testlib import unittest_main |
|
21 |
|
22 from yams.constraints import BoundaryConstraint |
20 from yams.constraints import BoundaryConstraint |
|
21 |
23 from cubicweb import ValidationError, Binary |
22 from cubicweb import ValidationError, Binary |
24 from cubicweb.schema import META_RTYPES |
23 from cubicweb.schema import META_RTYPES |
25 from cubicweb.devtools import startpgcluster, stoppgcluster, PostgresApptestConfiguration |
24 from cubicweb.devtools import startpgcluster, stoppgcluster, PostgresApptestConfiguration |
26 from cubicweb.devtools.testlib import CubicWebTC |
25 from cubicweb.devtools.testlib import CubicWebTC |
27 from cubicweb.server.sqlutils import SQL_PREFIX |
26 from cubicweb.server.sqlutils import SQL_PREFIX |
85 self.assertTrue(schema.has_entity('Societe2')) |
84 self.assertTrue(schema.has_entity('Societe2')) |
86 self.assertTrue(schema.has_relation('concerne2')) |
85 self.assertTrue(schema.has_relation('concerne2')) |
87 attreid = cnx.execute('INSERT CWAttribute X: X cardinality "11", ' |
86 attreid = cnx.execute('INSERT CWAttribute X: X cardinality "11", ' |
88 'X defaultval %(default)s, X indexed TRUE, ' |
87 'X defaultval %(default)s, X indexed TRUE, ' |
89 'X relation_type RT, X from_entity E, X to_entity F ' |
88 'X relation_type RT, X from_entity E, X to_entity F ' |
90 'WHERE RT name "name", E name "Societe2", ' |
89 'WHERE RT name "name", E name "Societe2", ' |
91 'F name "String"', |
90 'F name "String"', |
92 {'default': Binary.zpickle('noname')})[0][0] |
91 {'default': Binary.zpickle('noname')})[0][0] |
93 self._set_attr_perms(cnx, attreid) |
92 self._set_attr_perms(cnx, attreid) |
94 concerne2_rdef_eid = cnx.execute( |
93 concerne2_rdef_eid = cnx.execute( |
95 'INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
94 'INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
109 cnx.execute('SET X concerne2 X WHERE X name "logilab"') |
108 cnx.execute('SET X concerne2 X WHERE X name "logilab"') |
110 rset = cnx.execute('Any X WHERE X concerne2 Y') |
109 rset = cnx.execute('Any X WHERE X concerne2 Y') |
111 self.assertEqual(rset.rows, [[s2eid]]) |
110 self.assertEqual(rset.rows, [[s2eid]]) |
112 # check that when a relation definition is deleted, existing relations are deleted |
111 # check that when a relation definition is deleted, existing relations are deleted |
113 rdefeid = cnx.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
112 rdefeid = cnx.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
114 ' X from_entity E, X to_entity E ' |
113 ' X from_entity E, X to_entity E ' |
115 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
114 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
116 self._set_perms(cnx, rdefeid) |
115 self._set_perms(cnx, rdefeid) |
117 cnx.commit() |
116 cnx.commit() |
118 cnx.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}) |
117 cnx.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}) |
119 cnx.commit() |
118 cnx.commit() |
120 self.assertIn('concerne2', schema['CWUser'].subject_relations()) |
119 self.assertIn('concerne2', schema['CWUser'].subject_relations()) |
134 |
133 |
135 def test_metartype_with_nordefs(self): |
134 def test_metartype_with_nordefs(self): |
136 with self.admin_access.repo_cnx() as cnx: |
135 with self.admin_access.repo_cnx() as cnx: |
137 META_RTYPES.add('custom_meta') |
136 META_RTYPES.add('custom_meta') |
138 cnx.execute('INSERT CWRType X: X name "custom_meta", X description "", ' |
137 cnx.execute('INSERT CWRType X: X name "custom_meta", X description "", ' |
139 'X final FALSE, X symmetric FALSE') |
138 'X final FALSE, X symmetric FALSE') |
140 cnx.commit() |
139 cnx.commit() |
141 eeid = cnx.execute('INSERT CWEType X: X name "NEWEtype", ' |
140 eeid = cnx.execute('INSERT CWEType X: X name "NEWEtype", ' |
142 'X description "", X final FALSE')[0][0] |
141 'X description "", X final FALSE')[0][0] |
143 self._set_perms(cnx, eeid) |
142 self._set_perms(cnx, eeid) |
144 cnx.commit() |
143 cnx.commit() |
145 META_RTYPES.remove('custom_meta') |
144 META_RTYPES.remove('custom_meta') |
146 |
145 |
147 def test_metartype_with_somerdefs(self): |
146 def test_metartype_with_somerdefs(self): |
148 with self.admin_access.repo_cnx() as cnx: |
147 with self.admin_access.repo_cnx() as cnx: |
149 META_RTYPES.add('custom_meta') |
148 META_RTYPES.add('custom_meta') |
150 cnx.execute('INSERT CWRType X: X name "custom_meta", X description "", ' |
149 cnx.execute('INSERT CWRType X: X name "custom_meta", X description "", ' |
151 'X final FALSE, X symmetric FALSE') |
150 'X final FALSE, X symmetric FALSE') |
152 cnx.commit() |
151 cnx.commit() |
153 rdefeid = cnx.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
152 rdefeid = cnx.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
154 ' X from_entity E, X to_entity E ' |
153 ' X from_entity E, X to_entity E ' |
155 'WHERE RT name "custom_meta", E name "CWUser"')[0][0] |
154 'WHERE RT name "custom_meta", E name "CWUser"')[0][0] |
156 self._set_perms(cnx, rdefeid) |
155 self._set_perms(cnx, rdefeid) |
157 cnx.commit() |
156 cnx.commit() |
158 eeid = cnx.execute('INSERT CWEType X: X name "NEWEtype", ' |
157 eeid = cnx.execute('INSERT CWEType X: X name "NEWEtype", ' |
159 'X description "", X final FALSE')[0][0] |
158 'X description "", X final FALSE')[0][0] |
160 self._set_perms(cnx, eeid) |
159 self._set_perms(cnx, eeid) |
161 cnx.commit() |
160 cnx.commit() |
162 META_RTYPES.remove('custom_meta') |
161 META_RTYPES.remove('custom_meta') |
163 |
162 |
164 def test_is_instance_of_insertions(self): |
163 def test_is_instance_of_insertions(self): |
176 self.assertNotIn('subdiv', snames) |
175 self.assertNotIn('subdiv', snames) |
177 snames = [name for name, in cnx.execute('Any N WHERE S is_instance_of BaseTransition, ' |
176 snames = [name for name, in cnx.execute('Any N WHERE S is_instance_of BaseTransition, ' |
178 'S name N')] |
177 'S name N')] |
179 self.assertIn('subdiv', snames) |
178 self.assertIn('subdiv', snames) |
180 |
179 |
181 |
|
182 def test_perms_synchronization_1(self): |
180 def test_perms_synchronization_1(self): |
183 with self.admin_access.repo_cnx() as cnx: |
181 with self.admin_access.repo_cnx() as cnx: |
184 schema = self.repo.schema |
182 schema = self.repo.schema |
185 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
183 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
186 self.assertTrue(cnx.execute('Any X, Y WHERE X is CWEType, X name "CWUser", ' |
184 self.assertTrue(cnx.execute('Any X, Y WHERE X is CWEType, X name "CWUser", ' |
187 'Y is CWGroup, Y name "users"')[0]) |
185 'Y is CWGroup, Y name "users"')[0]) |
188 cnx.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
186 cnx.execute('DELETE X read_permission Y ' |
|
187 'WHERE X is CWEType, X name "CWUser", Y name "users"') |
189 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
188 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
190 cnx.commit() |
189 cnx.commit() |
191 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',))) |
190 self.assertEqual(schema['CWUser'].get_groups('read'), set(('managers',))) |
192 cnx.execute('SET X read_permission Y WHERE X is CWEType, ' |
191 cnx.execute('SET X read_permission Y WHERE X is CWEType, ' |
193 'X name "CWUser", Y name "users"') |
192 'X name "CWUser", Y name "users"') |
226 cnx.commit() |
225 cnx.commit() |
227 eeid = cnx.execute('Any X WHERE X is CWEType, X name "CWEType"')[0][0] |
226 eeid = cnx.execute('Any X WHERE X is CWEType, X name "CWEType"')[0][0] |
228 cnx.execute('DELETE X read_permission Y WHERE X eid %s' % eeid) |
227 cnx.execute('DELETE X read_permission Y WHERE X eid %s' % eeid) |
229 cnx.execute('SET X final FALSE WHERE X eid %s' % eeid) |
228 cnx.execute('SET X final FALSE WHERE X eid %s' % eeid) |
230 cnx.execute('SET X read_permission Y WHERE X eid %s, Y eid in (%s, %s)' |
229 cnx.execute('SET X read_permission Y WHERE X eid %s, Y eid in (%s, %s)' |
231 % (eeid, groupeids[0], groupeids[1])) |
230 % (eeid, groupeids[0], groupeids[1])) |
232 cnx.commit() |
231 cnx.commit() |
233 cnx.execute('Any X WHERE X is CWEType, X name "CWEType"') |
232 cnx.execute('Any X WHERE X is CWEType, X name "CWEType"') |
234 |
233 |
235 # schema modification hooks tests ######################################### |
234 # schema modification hooks tests ######################################### |
236 |
235 |
242 self.assertTrue(self.schema['state_of'].inlined) |
241 self.assertTrue(self.schema['state_of'].inlined) |
243 cnx.commit() |
242 cnx.commit() |
244 self.assertFalse(self.schema['state_of'].inlined) |
243 self.assertFalse(self.schema['state_of'].inlined) |
245 self.assertFalse(self.index_exists(cnx, 'State', 'state_of')) |
244 self.assertFalse(self.index_exists(cnx, 'State', 'state_of')) |
246 rset = cnx.execute('Any X, Y WHERE X state_of Y') |
245 rset = cnx.execute('Any X, Y WHERE X state_of Y') |
247 self.assertEqual(len(rset), 2) # user states |
246 self.assertEqual(len(rset), 2) # user states |
248 finally: |
247 finally: |
249 cnx.execute('SET X inlined TRUE WHERE X name "state_of"') |
248 cnx.execute('SET X inlined TRUE WHERE X name "state_of"') |
250 self.assertFalse(self.schema['state_of'].inlined) |
249 self.assertFalse(self.schema['state_of'].inlined) |
251 cnx.commit() |
250 cnx.commit() |
252 self.assertTrue(self.schema['state_of'].inlined) |
251 self.assertTrue(self.schema['state_of'].inlined) |
291 self.assertFalse(self.index_exists(cnx, 'Workflow', 'name', unique=True)) |
290 self.assertFalse(self.index_exists(cnx, 'Workflow', 'name', unique=True)) |
292 |
291 |
293 def test_required_change_1(self): |
292 def test_required_change_1(self): |
294 with self.admin_access.repo_cnx() as cnx: |
293 with self.admin_access.repo_cnx() as cnx: |
295 cnx.execute('SET DEF cardinality "?1" ' |
294 cnx.execute('SET DEF cardinality "?1" ' |
296 'WHERE DEF relation_type RT, DEF from_entity E,' |
295 'WHERE DEF relation_type RT, DEF from_entity E,' |
297 'RT name "title", E name "Bookmark"') |
296 'RT name "title", E name "Bookmark"') |
298 cnx.commit() |
297 cnx.commit() |
299 # should now be able to add bookmark without title |
298 # should now be able to add bookmark without title |
300 cnx.execute('INSERT Bookmark X: X path "/view"') |
299 cnx.execute('INSERT Bookmark X: X path "/view"') |
301 cnx.commit() |
300 cnx.commit() |
302 |
301 |
303 def test_required_change_2(self): |
302 def test_required_change_2(self): |
304 with self.admin_access.repo_cnx() as cnx: |
303 with self.admin_access.repo_cnx() as cnx: |
305 cnx.execute('SET DEF cardinality "11" ' |
304 cnx.execute('SET DEF cardinality "11" ' |
306 'WHERE DEF relation_type RT, DEF from_entity E,' |
305 'WHERE DEF relation_type RT, DEF from_entity E,' |
307 'RT name "surname", E name "CWUser"') |
306 'RT name "surname", E name "CWUser"') |
308 cnx.execute('SET U surname "Doe" WHERE U surname NULL') |
307 cnx.execute('SET U surname "Doe" WHERE U surname NULL') |
309 cnx.commit() |
308 cnx.commit() |
310 # should not be able anymore to add cwuser without surname |
309 # should not be able anymore to add cwuser without surname |
311 self.assertRaises(ValidationError, self.create_user, cnx, "toto") |
310 self.assertRaises(ValidationError, self.create_user, cnx, "toto") |
312 cnx.rollback() |
311 cnx.rollback() |
313 cnx.execute('SET DEF cardinality "?1" ' |
312 cnx.execute('SET DEF cardinality "?1" ' |
314 'WHERE DEF relation_type RT, DEF from_entity E,' |
313 'WHERE DEF relation_type RT, DEF from_entity E,' |
315 'RT name "surname", E name "CWUser"') |
314 'RT name "surname", E name "CWUser"') |
316 cnx.commit() |
315 cnx.commit() |
317 |
316 |
318 def test_add_attribute_to_base_class(self): |
317 def test_add_attribute_to_base_class(self): |
319 with self.admin_access.repo_cnx() as cnx: |
318 with self.admin_access.repo_cnx() as cnx: |
320 attreid = cnx.execute('INSERT CWAttribute X: X cardinality "11", X defaultval %(default)s, ' |
319 attreid = cnx.execute( |
321 'X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
320 'INSERT CWAttribute X: X cardinality "11", X defaultval %(default)s, ' |
322 'WHERE RT name "messageid", E name "BaseTransition", F name "String"', |
321 'X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
323 {'default': Binary.zpickle('noname')})[0][0] |
322 'WHERE RT name "messageid", E name "BaseTransition", F name "String"', |
|
323 {'default': Binary.zpickle('noname')})[0][0] |
324 assert cnx.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
324 assert cnx.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
325 {'x': attreid}) |
325 {'x': attreid}) |
326 cnx.commit() |
326 cnx.commit() |
327 self.schema.rebuild_infered_relations() |
327 self.schema.rebuild_infered_relations() |
328 self.assertIn('Transition', self.schema['messageid'].subjects()) |
328 self.assertIn('Transition', self.schema['messageid'].subjects()) |
355 target.cw_set(reverse_use_email=cnx.user) |
355 target.cw_set(reverse_use_email=cnx.user) |
356 cnx.commit() |
356 cnx.commit() |
357 rset = cnx.execute('Any X WHERE X has_text "rick.roll"') |
357 rset = cnx.execute('Any X WHERE X has_text "rick.roll"') |
358 self.assertIn(cnx.user.eid, [item[0] for item in rset]) |
358 self.assertIn(cnx.user.eid, [item[0] for item in rset]) |
359 assert cnx.execute('SET R fulltext_container NULL ' |
359 assert cnx.execute('SET R fulltext_container NULL ' |
360 'WHERE R name "use_email"') |
360 'WHERE R name "use_email"') |
361 cnx.commit() |
361 cnx.commit() |
362 rset = cnx.execute('Any X WHERE X has_text "rick.roll"') |
362 rset = cnx.execute('Any X WHERE X has_text "rick.roll"') |
363 self.assertIn(target.eid, [item[0] for item in rset]) |
363 self.assertIn(target.eid, [item[0] for item in rset]) |
364 assert cnx.execute('SET R fulltext_container "subject" ' |
364 assert cnx.execute('SET R fulltext_container "subject" ' |
365 'WHERE R name "use_email"') |
365 'WHERE R name "use_email"') |
366 cnx.commit() |
366 cnx.commit() |
367 rset = cnx.execute('Any X WHERE X has_text "rick.roll"') |
367 rset = cnx.execute('Any X WHERE X has_text "rick.roll"') |
368 self.assertIn(cnx.user.eid, [item[0] for item in rset]) |
368 self.assertIn(cnx.user.eid, [item[0] for item in rset]) |
369 |
369 |
370 def test_update_constraint(self): |
370 def test_update_constraint(self): |
373 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
373 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
374 if not getattr(cstr, 'eid', None): |
374 if not getattr(cstr, 'eid', None): |
375 # bug in schema reloading, constraint's eid not restored |
375 # bug in schema reloading, constraint's eid not restored |
376 self.skipTest('start me alone') |
376 self.skipTest('start me alone') |
377 cnx.execute('SET X value %(v)s WHERE X eid %(x)s', |
377 cnx.execute('SET X value %(v)s WHERE X eid %(x)s', |
378 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
378 {'x': cstr.eid, 'v': u"u'normal', u'auto', u'new'"}) |
379 cnx.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, ' |
379 cnx.execute('INSERT CWConstraint X: X value %(value)s, X cstrtype CT, ' |
380 'EDEF constrained_by X WHERE CT name %(ct)s, EDEF eid %(x)s', |
380 'EDEF constrained_by X WHERE CT name %(ct)s, EDEF eid %(x)s', |
381 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
381 {'ct': 'SizeConstraint', 'value': u'max=10', 'x': rdef.eid}) |
382 cnx.commit() |
382 cnx.commit() |
383 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
383 cstr = rdef.constraint_by_type('StaticVocabularyConstraint') |
384 self.assertEqual(cstr.values, (u'normal', u'auto', u'new')) |
384 self.assertEqual(cstr.values, (u'normal', u'auto', u'new')) |
385 cnx.execute('INSERT Transition T: T name "hop", T type "new"') |
385 cnx.execute('INSERT Transition T: T name "hop", T type "new"') |
386 |
386 |