|
1 from logilab.common.testlib import TestCase, unittest_main |
|
2 from cubicweb.devtools.testlib import CubicWebTC |
|
3 |
|
4 ################# |
|
5 # <required ?> # |
|
6 ################# |
|
7 |
|
8 |
|
9 from datetime import datetime |
|
10 |
|
11 from cubicweb import (ConnectionError, ValidationError, AuthenticationError, |
|
12 BadConnectionId) |
|
13 from cubicweb.devtools.testlib import get_versions |
|
14 |
|
15 from cubicweb.server.sqlutils import SQL_PREFIX |
|
16 from cubicweb.server.repository import Repository |
|
17 |
|
18 orig_get_versions = Repository.get_versions |
|
19 ################# |
|
20 # </required ?> # |
|
21 ################# |
|
22 |
|
23 def setup_module(*args): |
|
24 Repository.get_versions = get_versions |
|
25 |
|
26 def teardown_module(*args): |
|
27 Repository.get_versions = orig_get_versions |
|
28 |
|
29 class SchemaModificationHooksTC(CubicWebTC): |
|
30 |
|
31 @classmethod |
|
32 def init_config(cls, config): |
|
33 super(SchemaModificationHooksTC, cls).init_config(config) |
|
34 config._cubes = None |
|
35 cls.repo.fill_schema() |
|
36 |
|
37 def index_exists(self, etype, attr, unique=False): |
|
38 self.session.set_pool() |
|
39 dbhelper = self.session.pool.source('system').dbhelper |
|
40 sqlcursor = self.session.pool['system'] |
|
41 return dbhelper.index_exists(sqlcursor, SQL_PREFIX + etype, SQL_PREFIX + attr, unique=unique) |
|
42 |
|
43 def _set_perms(self, eid): |
|
44 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
|
45 {'x': eid}, 'x') |
|
46 self.execute('SET X add_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
|
47 {'x': eid}, 'x') |
|
48 self.execute('SET X delete_permission G WHERE X eid %(x)s, G is CWGroup, G name "owners"', |
|
49 {'x': eid}, 'x') |
|
50 |
|
51 def _set_attr_perms(self, eid): |
|
52 self.execute('SET X read_permission G WHERE X eid %(x)s, G is CWGroup', |
|
53 {'x': eid}, 'x') |
|
54 self.execute('SET X update_permission G WHERE X eid %(x)s, G is CWGroup, G name "managers"', |
|
55 {'x': eid}, 'x') |
|
56 |
|
57 def test_base(self): |
|
58 schema = self.repo.schema |
|
59 self.session.set_pool() |
|
60 dbhelper = self.session.pool.source('system').dbhelper |
|
61 sqlcursor = self.session.pool['system'] |
|
62 self.failIf(schema.has_entity('Societe2')) |
|
63 self.failIf(schema.has_entity('concerne2')) |
|
64 # schema should be update on insertion (after commit) |
|
65 eeid = self.execute('INSERT CWEType X: X name "Societe2", X description "", X final FALSE')[0][0] |
|
66 self._set_perms(eeid) |
|
67 self.execute('INSERT CWRType X: X name "concerne2", X description "", X final FALSE, X symmetric FALSE') |
|
68 self.failIf(schema.has_entity('Societe2')) |
|
69 self.failIf(schema.has_entity('concerne2')) |
|
70 # have to commit before adding definition relations |
|
71 self.commit() |
|
72 self.failUnless(schema.has_entity('Societe2')) |
|
73 self.failUnless(schema.has_relation('concerne2')) |
|
74 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", ' |
|
75 ' X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
|
76 'WHERE RT name "name", E name "Societe2", F name "String"')[0][0] |
|
77 self._set_attr_perms(attreid) |
|
78 concerne2_rdef_eid = self.execute( |
|
79 'INSERT CWRelation X: X cardinality "**", X relation_type RT, X from_entity E, X to_entity E ' |
|
80 'WHERE RT name "concerne2", E name "Societe2"')[0][0] |
|
81 self._set_perms(concerne2_rdef_eid) |
|
82 self.failIf('name' in schema['Societe2'].subject_relations()) |
|
83 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
|
84 self.failIf(self.index_exists('Societe2', 'name')) |
|
85 self.commit() |
|
86 self.failUnless('name' in schema['Societe2'].subject_relations()) |
|
87 self.failUnless('concerne2' in schema['Societe2'].subject_relations()) |
|
88 self.failUnless(self.index_exists('Societe2', 'name')) |
|
89 # now we should be able to insert and query Societe2 |
|
90 s2eid = self.execute('INSERT Societe2 X: X name "logilab"')[0][0] |
|
91 self.execute('Societe2 X WHERE X name "logilab"') |
|
92 self.execute('SET X concerne2 X WHERE X name "logilab"') |
|
93 rset = self.execute('Any X WHERE X concerne2 Y') |
|
94 self.assertEquals(rset.rows, [[s2eid]]) |
|
95 # check that when a relation definition is deleted, existing relations are deleted |
|
96 rdefeid = self.execute('INSERT CWRelation X: X cardinality "**", X relation_type RT, ' |
|
97 ' X from_entity E, X to_entity E ' |
|
98 'WHERE RT name "concerne2", E name "CWUser"')[0][0] |
|
99 self._set_perms(rdefeid) |
|
100 self.commit() |
|
101 self.execute('DELETE CWRelation X WHERE X eid %(x)s', {'x': concerne2_rdef_eid}, 'x') |
|
102 self.commit() |
|
103 self.failUnless('concerne2' in schema['CWUser'].subject_relations()) |
|
104 self.failIf('concerne2' in schema['Societe2'].subject_relations()) |
|
105 self.failIf(self.execute('Any X WHERE X concerne2 Y')) |
|
106 # schema should be cleaned on delete (after commit) |
|
107 self.execute('DELETE CWEType X WHERE X name "Societe2"') |
|
108 self.execute('DELETE CWRType X WHERE X name "concerne2"') |
|
109 self.failUnless(self.index_exists('Societe2', 'name')) |
|
110 self.failUnless(schema.has_entity('Societe2')) |
|
111 self.failUnless(schema.has_relation('concerne2')) |
|
112 self.commit() |
|
113 self.failIf(self.index_exists('Societe2', 'name')) |
|
114 self.failIf(schema.has_entity('Societe2')) |
|
115 self.failIf(schema.has_entity('concerne2')) |
|
116 self.failIf('concerne2' in schema['CWUser'].subject_relations()) |
|
117 |
|
118 def test_is_instance_of_insertions(self): |
|
119 seid = self.execute('INSERT Transition T: T name "subdiv"')[0][0] |
|
120 is_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is ET, ET name ETN' % seid)] |
|
121 self.assertEquals(is_etypes, ['Transition']) |
|
122 instanceof_etypes = [etype for etype, in self.execute('Any ETN WHERE X eid %s, X is_instance_of ET, ET name ETN' % seid)] |
|
123 self.assertEquals(sorted(instanceof_etypes), ['BaseTransition', 'Transition']) |
|
124 snames = [name for name, in self.execute('Any N WHERE S is BaseTransition, S name N')] |
|
125 self.failIf('subdiv' in snames) |
|
126 snames = [name for name, in self.execute('Any N WHERE S is_instance_of BaseTransition, S name N')] |
|
127 self.failUnless('subdiv' in snames) |
|
128 |
|
129 |
|
130 def test_perms_synchronization_1(self): |
|
131 schema = self.repo.schema |
|
132 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users'))) |
|
133 self.failUnless(self.execute('Any X, Y WHERE X is CWEType, X name "CWUser", Y is CWGroup, Y name "users"')[0]) |
|
134 self.execute('DELETE X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
|
135 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users', ))) |
|
136 self.commit() |
|
137 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', ))) |
|
138 self.execute('SET X read_permission Y WHERE X is CWEType, X name "CWUser", Y name "users"') |
|
139 self.commit() |
|
140 self.assertEquals(schema['CWUser'].get_groups('read'), set(('managers', 'users',))) |
|
141 |
|
142 def test_perms_synchronization_2(self): |
|
143 schema = self.repo.schema['in_group'].rdefs[('CWUser', 'CWGroup')] |
|
144 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
|
145 self.execute('DELETE X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
|
146 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
|
147 self.commit() |
|
148 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
|
149 self.execute('SET X read_permission Y WHERE X relation_type RT, RT name "in_group", Y name "guests"') |
|
150 self.assertEquals(schema.get_groups('read'), set(('managers', 'users'))) |
|
151 self.commit() |
|
152 self.assertEquals(schema.get_groups('read'), set(('managers', 'users', 'guests'))) |
|
153 |
|
154 def test_nonregr_user_edit_itself(self): |
|
155 ueid = self.session.user.eid |
|
156 groupeids = [eid for eid, in self.execute('CWGroup G WHERE G name in ("managers", "users")')] |
|
157 self.execute('DELETE X in_group Y WHERE X eid %s' % ueid) |
|
158 self.execute('SET X surname "toto" WHERE X eid %s' % ueid) |
|
159 self.execute('SET X in_group Y WHERE X eid %s, Y name "managers"' % ueid) |
|
160 self.commit() |
|
161 eeid = self.execute('Any X WHERE X is CWEType, X name "CWEType"')[0][0] |
|
162 self.execute('DELETE X read_permission Y WHERE X eid %s' % eeid) |
|
163 self.execute('SET X final FALSE WHERE X eid %s' % eeid) |
|
164 self.execute('SET X read_permission Y WHERE X eid %s, Y eid in (%s, %s)' |
|
165 % (eeid, groupeids[0], groupeids[1])) |
|
166 self.commit() |
|
167 self.execute('Any X WHERE X is CWEType, X name "CWEType"') |
|
168 |
|
169 # schema modification hooks tests ######################################### |
|
170 |
|
171 def test_uninline_relation(self): |
|
172 self.session.set_pool() |
|
173 dbhelper = self.session.pool.source('system').dbhelper |
|
174 sqlcursor = self.session.pool['system'] |
|
175 self.failUnless(self.schema['state_of'].inlined) |
|
176 try: |
|
177 self.execute('SET X inlined FALSE WHERE X name "state_of"') |
|
178 self.failUnless(self.schema['state_of'].inlined) |
|
179 self.commit() |
|
180 self.failIf(self.schema['state_of'].inlined) |
|
181 self.failIf(self.index_exists('State', 'state_of')) |
|
182 rset = self.execute('Any X, Y WHERE X state_of Y') |
|
183 self.assertEquals(len(rset), 2) # user states |
|
184 finally: |
|
185 self.execute('SET X inlined TRUE WHERE X name "state_of"') |
|
186 self.failIf(self.schema['state_of'].inlined) |
|
187 self.commit() |
|
188 self.failUnless(self.schema['state_of'].inlined) |
|
189 self.failUnless(self.index_exists('State', 'state_of')) |
|
190 rset = self.execute('Any X, Y WHERE X state_of Y') |
|
191 self.assertEquals(len(rset), 2) |
|
192 |
|
193 def test_indexed_change(self): |
|
194 self.session.set_pool() |
|
195 dbhelper = self.session.pool.source('system').dbhelper |
|
196 sqlcursor = self.session.pool['system'] |
|
197 try: |
|
198 self.execute('SET X indexed FALSE WHERE X relation_type R, R name "name"') |
|
199 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
200 self.failUnless(self.index_exists('Workflow', 'name')) |
|
201 self.commit() |
|
202 self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
203 self.failIf(self.index_exists('Workflow', 'name')) |
|
204 finally: |
|
205 self.execute('SET X indexed TRUE WHERE X relation_type R, R name "name"') |
|
206 self.failIf(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
207 self.failIf(self.index_exists('Workflow', 'name')) |
|
208 self.commit() |
|
209 self.failUnless(self.schema['name'].rdef('Workflow', 'String').indexed) |
|
210 self.failUnless(self.index_exists('Workflow', 'name')) |
|
211 |
|
212 def test_unique_change(self): |
|
213 self.session.set_pool() |
|
214 dbhelper = self.session.pool.source('system').dbhelper |
|
215 sqlcursor = self.session.pool['system'] |
|
216 try: |
|
217 self.execute('INSERT CWConstraint X: X cstrtype CT, DEF constrained_by X ' |
|
218 'WHERE CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
|
219 'RT name "name", E name "Workflow"') |
|
220 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
|
221 self.failIf(self.index_exists('Workflow', 'name', unique=True)) |
|
222 self.commit() |
|
223 self.failUnless(self.schema['Workflow'].has_unique_values('name')) |
|
224 self.failUnless(self.index_exists('Workflow', 'name', unique=True)) |
|
225 finally: |
|
226 self.execute('DELETE DEF constrained_by X WHERE X cstrtype CT, ' |
|
227 'CT name "UniqueConstraint", DEF relation_type RT, DEF from_entity E,' |
|
228 'RT name "name", E name "Workflow"') |
|
229 self.failUnless(self.schema['Workflow'].has_unique_values('name')) |
|
230 self.failUnless(self.index_exists('Workflow', 'name', unique=True)) |
|
231 self.commit() |
|
232 self.failIf(self.schema['Workflow'].has_unique_values('name')) |
|
233 self.failIf(self.index_exists('Workflow', 'name', unique=True)) |
|
234 |
|
235 def test_required_change_1(self): |
|
236 self.execute('SET DEF cardinality "?1" ' |
|
237 'WHERE DEF relation_type RT, DEF from_entity E,' |
|
238 'RT name "title", E name "Bookmark"') |
|
239 self.commit() |
|
240 # should now be able to add bookmark without title |
|
241 self.execute('INSERT Bookmark X: X path "/view"') |
|
242 self.commit() |
|
243 |
|
244 def test_required_change_2(self): |
|
245 self.execute('SET DEF cardinality "11" ' |
|
246 'WHERE DEF relation_type RT, DEF from_entity E,' |
|
247 'RT name "surname", E name "CWUser"') |
|
248 self.commit() |
|
249 # should not be able anymore to add cwuser without surname |
|
250 self.assertRaises(ValidationError, self.create_user, "toto") |
|
251 self.execute('SET DEF cardinality "?1" ' |
|
252 'WHERE DEF relation_type RT, DEF from_entity E,' |
|
253 'RT name "surname", E name "CWUser"') |
|
254 self.commit() |
|
255 |
|
256 |
|
257 def test_add_attribute_to_base_class(self): |
|
258 attreid = self.execute('INSERT CWAttribute X: X cardinality "11", X defaultval "noname", X indexed TRUE, X relation_type RT, X from_entity E, X to_entity F ' |
|
259 'WHERE RT name "messageid", E name "BaseTransition", F name "String"')[0][0] |
|
260 assert self.execute('SET X read_permission Y WHERE X eid %(x)s, Y name "managers"', |
|
261 {'x': attreid}, 'x') |
|
262 self.commit() |
|
263 self.schema.rebuild_infered_relations() |
|
264 self.failUnless('Transition' in self.schema['messageid'].subjects()) |
|
265 self.failUnless('WorkflowTransition' in self.schema['messageid'].subjects()) |
|
266 self.execute('Any X WHERE X is_instance_of BaseTransition, X messageid "hop"') |
|
267 |
|
268 def test_change_fulltextindexed(self): |
|
269 target = self.request().create_entity(u'EmailAddress', address=u'rick.roll@dance.com') |
|
270 self.commit() |
|
271 rset = self.execute('Any X Where X has_text "rick.roll"') |
|
272 self.assertIn(target.eid, [item[0] for item in rset]) |
|
273 |
|
274 assert self.execute('''SET A fulltextindexed False |
|
275 WHERE E is CWEType, |
|
276 E name "EmailAddress", |
|
277 A is CWAttribute, |
|
278 A from_entity E, |
|
279 A relation_type R, |
|
280 R name "address" |
|
281 ''') |
|
282 self.commit() |
|
283 rset = self.execute('Any X Where X has_text "rick.roll"') |
|
284 self.assertNotIn(target.eid, [item[0] for item in rset]) |
|
285 |