|
1 # -*- coding: iso-8859-1 -*- |
|
2 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
14 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 """unit tests for module cubicweb.server.repository""" |
|
20 |
|
21 import threading |
|
22 import time |
|
23 import logging |
|
24 |
|
25 from six.moves import range |
|
26 |
|
27 from yams.constraints import UniqueConstraint |
|
28 from yams import register_base_type, unregister_base_type |
|
29 |
|
30 from logilab.database import get_db_helper |
|
31 |
|
32 from cubicweb import (BadConnectionId, ValidationError, |
|
33 UnknownEid, AuthenticationError, Unauthorized, QueryError) |
|
34 from cubicweb.predicates import is_instance |
|
35 from cubicweb.schema import RQLConstraint |
|
36 from cubicweb.devtools.testlib import CubicWebTC |
|
37 from cubicweb.devtools.repotest import tuplify |
|
38 from cubicweb.server import hook |
|
39 from cubicweb.server.sqlutils import SQL_PREFIX |
|
40 from cubicweb.server.hook import Hook |
|
41 from cubicweb.server.sources import native |
|
42 from cubicweb.server.session import SessionClosedError |
|
43 |
|
44 |
|
45 class RepositoryTC(CubicWebTC): |
|
46 """ singleton providing access to a persistent storage for entities |
|
47 and relation |
|
48 """ |
|
49 |
|
50 def test_unique_together_constraint(self): |
|
51 with self.admin_access.repo_cnx() as cnx: |
|
52 cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"') |
|
53 with self.assertRaises(ValidationError) as wraperr: |
|
54 cnx.execute('INSERT Societe S: S nom "Logilab", S type "SSLL", S cp "75013"') |
|
55 self.assertEqual( |
|
56 {'cp': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
57 'nom': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
58 'type': u'%(KEY-rtype)s is part of violated unicity constraint', |
|
59 '': u'some relations violate a unicity constraint'}, |
|
60 wraperr.exception.args[1]) |
|
61 |
|
62 def test_unique_together_schema(self): |
|
63 person = self.repo.schema.eschema('Personne') |
|
64 self.assertEqual(len(person._unique_together), 1) |
|
65 self.assertItemsEqual(person._unique_together[0], |
|
66 ('nom', 'prenom', 'inline2')) |
|
67 |
|
68 def test_all_entities_have_owner(self): |
|
69 with self.admin_access.repo_cnx() as cnx: |
|
70 self.assertFalse(cnx.execute('Any X WHERE NOT X owned_by U')) |
|
71 |
|
72 def test_all_entities_have_is(self): |
|
73 with self.admin_access.repo_cnx() as cnx: |
|
74 self.assertFalse(cnx.execute('Any X WHERE NOT X is ET')) |
|
75 |
|
76 def test_all_entities_have_cw_source(self): |
|
77 with self.admin_access.repo_cnx() as cnx: |
|
78 self.assertFalse(cnx.execute('Any X WHERE NOT X cw_source S')) |
|
79 |
|
80 def test_connect(self): |
|
81 cnxid = self.repo.connect(self.admlogin, password=self.admpassword) |
|
82 self.assertTrue(cnxid) |
|
83 self.repo.close(cnxid) |
|
84 self.assertRaises(AuthenticationError, |
|
85 self.repo.connect, self.admlogin, password='nimportnawak') |
|
86 self.assertRaises(AuthenticationError, |
|
87 self.repo.connect, self.admlogin, password='') |
|
88 self.assertRaises(AuthenticationError, |
|
89 self.repo.connect, self.admlogin, password=None) |
|
90 self.assertRaises(AuthenticationError, |
|
91 self.repo.connect, None, password=None) |
|
92 self.assertRaises(AuthenticationError, |
|
93 self.repo.connect, self.admlogin) |
|
94 self.assertRaises(AuthenticationError, |
|
95 self.repo.connect, None) |
|
96 |
|
97 def test_login_upassword_accent(self): |
|
98 with self.admin_access.repo_cnx() as cnx: |
|
99 cnx.execute('INSERT CWUser X: X login %(login)s, X upassword %(passwd)s, ' |
|
100 'X in_group G WHERE G name "users"', |
|
101 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
|
102 cnx.commit() |
|
103 repo = self.repo |
|
104 cnxid = repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8')) |
|
105 self.assertTrue(cnxid) |
|
106 repo.close(cnxid) |
|
107 |
|
108 def test_rollback_on_execute_validation_error(self): |
|
109 class ValidationErrorAfterHook(Hook): |
|
110 __regid__ = 'valerror-after-hook' |
|
111 __select__ = Hook.__select__ & is_instance('CWGroup') |
|
112 events = ('after_update_entity',) |
|
113 def __call__(self): |
|
114 raise ValidationError(self.entity.eid, {}) |
|
115 |
|
116 with self.admin_access.repo_cnx() as cnx: |
|
117 with self.temporary_appobjects(ValidationErrorAfterHook): |
|
118 self.assertRaises(ValidationError, |
|
119 cnx.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') |
|
120 self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) |
|
121 with self.assertRaises(QueryError) as cm: |
|
122 cnx.commit() |
|
123 self.assertEqual(str(cm.exception), 'transaction must be rolled back') |
|
124 cnx.rollback() |
|
125 self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) |
|
126 |
|
127 def test_rollback_on_execute_unauthorized(self): |
|
128 class UnauthorizedAfterHook(Hook): |
|
129 __regid__ = 'unauthorized-after-hook' |
|
130 __select__ = Hook.__select__ & is_instance('CWGroup') |
|
131 events = ('after_update_entity',) |
|
132 def __call__(self): |
|
133 raise Unauthorized() |
|
134 |
|
135 with self.admin_access.repo_cnx() as cnx: |
|
136 with self.temporary_appobjects(UnauthorizedAfterHook): |
|
137 self.assertRaises(Unauthorized, |
|
138 cnx.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') |
|
139 self.assertTrue(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) |
|
140 with self.assertRaises(QueryError) as cm: |
|
141 cnx.commit() |
|
142 self.assertEqual(str(cm.exception), 'transaction must be rolled back') |
|
143 cnx.rollback() |
|
144 self.assertFalse(cnx.execute('Any X WHERE X is CWGroup, X name "toto"')) |
|
145 |
|
146 |
|
147 def test_close(self): |
|
148 repo = self.repo |
|
149 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
150 self.assertTrue(cnxid) |
|
151 repo.close(cnxid) |
|
152 |
|
153 |
|
154 def test_initial_schema(self): |
|
155 schema = self.repo.schema |
|
156 # check order of attributes is respected |
|
157 notin = set(('eid', 'is', 'is_instance_of', 'identity', |
|
158 'creation_date', 'modification_date', 'cwuri', |
|
159 'owned_by', 'created_by', 'cw_source', |
|
160 'update_permission', 'read_permission', |
|
161 'add_permission', 'in_basket')) |
|
162 self.assertListEqual(['relation_type', |
|
163 'from_entity', 'to_entity', |
|
164 'constrained_by', |
|
165 'cardinality', 'ordernum', 'formula', |
|
166 'indexed', 'fulltextindexed', 'internationalizable', |
|
167 'defaultval', 'extra_props', |
|
168 'description', 'description_format'], |
|
169 [r.type |
|
170 for r in schema.eschema('CWAttribute').ordered_relations() |
|
171 if r.type not in notin]) |
|
172 |
|
173 self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name') |
|
174 self.assertEqual(schema.eschema('State').main_attribute(), 'name') |
|
175 |
|
176 constraints = schema.rschema('name').rdef('CWEType', 'String').constraints |
|
177 self.assertEqual(len(constraints), 2) |
|
178 for cstr in constraints[:]: |
|
179 if isinstance(cstr, UniqueConstraint): |
|
180 constraints.remove(cstr) |
|
181 break |
|
182 else: |
|
183 self.fail('unique constraint not found') |
|
184 sizeconstraint = constraints[0] |
|
185 self.assertEqual(sizeconstraint.min, None) |
|
186 self.assertEqual(sizeconstraint.max, 64) |
|
187 |
|
188 constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints |
|
189 self.assertEqual(len(constraints), 1) |
|
190 cstr = constraints[0] |
|
191 self.assertIsInstance(cstr, RQLConstraint) |
|
192 self.assertEqual(cstr.expression, 'O final TRUE') |
|
193 |
|
194 ownedby = schema.rschema('owned_by') |
|
195 self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) |
|
196 |
|
197 def test_internal_api(self): |
|
198 repo = self.repo |
|
199 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
|
200 session = repo._get_session(cnxid) |
|
201 with session.new_cnx() as cnx: |
|
202 self.assertEqual(repo.type_and_source_from_eid(2, cnx), |
|
203 ('CWGroup', None, 'system')) |
|
204 self.assertEqual(repo.type_from_eid(2, cnx), 'CWGroup') |
|
205 repo.close(cnxid) |
|
206 |
|
207 def test_public_api(self): |
|
208 self.assertEqual(self.repo.get_schema(), self.repo.schema) |
|
209 self.assertEqual(self.repo.source_defs(), {'system': {'type': 'native', |
|
210 'uri': 'system', |
|
211 'use-cwuri-as-url': False} |
|
212 }) |
|
213 # .properties() return a result set |
|
214 self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
|
215 |
|
216 def test_schema_is_relation(self): |
|
217 with self.admin_access.repo_cnx() as cnx: |
|
218 no_is_rset = cnx.execute('Any X WHERE NOT X is ET') |
|
219 self.assertFalse(no_is_rset, no_is_rset.description) |
|
220 |
|
221 def test_delete_if_singlecard1(self): |
|
222 with self.admin_access.repo_cnx() as cnx: |
|
223 note = cnx.create_entity('Affaire') |
|
224 p1 = cnx.create_entity('Personne', nom=u'toto') |
|
225 cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
|
226 {'x': note.eid, 'p': p1.eid}) |
|
227 rset = cnx.execute('Any P WHERE A todo_by P, A eid %(x)s', |
|
228 {'x': note.eid}) |
|
229 self.assertEqual(len(rset), 1) |
|
230 p2 = cnx.create_entity('Personne', nom=u'tutu') |
|
231 cnx.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
|
232 {'x': note.eid, 'p': p2.eid}) |
|
233 rset = cnx.execute('Any P WHERE A todo_by P, A eid %(x)s', |
|
234 {'x': note.eid}) |
|
235 self.assertEqual(len(rset), 1) |
|
236 self.assertEqual(rset.rows[0][0], p2.eid) |
|
237 |
|
238 def test_delete_if_object_inlined_singlecard(self): |
|
239 with self.admin_access.repo_cnx() as cnx: |
|
240 c = cnx.create_entity('Card', title=u'Carte') |
|
241 cnx.create_entity('Personne', nom=u'Vincent', fiche=c) |
|
242 cnx.create_entity('Personne', nom=u'Florent', fiche=c) |
|
243 cnx.commit() |
|
244 self.assertEqual(len(c.reverse_fiche), 1) |
|
245 |
|
246 def test_delete_computed_relation_nonregr(self): |
|
247 with self.admin_access.repo_cnx() as cnx: |
|
248 c = cnx.create_entity('Personne', nom=u'Adam', login_user=cnx.user.eid) |
|
249 cnx.commit() |
|
250 c.cw_delete() |
|
251 cnx.commit() |
|
252 |
|
253 def test_cw_set_in_before_update(self): |
|
254 # local hook |
|
255 class DummyBeforeHook(Hook): |
|
256 __regid__ = 'dummy-before-hook' |
|
257 __select__ = Hook.__select__ & is_instance('EmailAddress') |
|
258 events = ('before_update_entity',) |
|
259 def __call__(self): |
|
260 # safety belt: avoid potential infinite recursion if the test |
|
261 # fails (i.e. RuntimeError not raised) |
|
262 pendings = self._cw.transaction_data.setdefault('pending', set()) |
|
263 if self.entity.eid not in pendings: |
|
264 pendings.add(self.entity.eid) |
|
265 self.entity.cw_set(alias=u'foo') |
|
266 |
|
267 with self.admin_access.repo_cnx() as cnx: |
|
268 with self.temporary_appobjects(DummyBeforeHook): |
|
269 addr = cnx.create_entity('EmailAddress', address=u'a@b.fr') |
|
270 addr.cw_set(address=u'a@b.com') |
|
271 rset = cnx.execute('Any A,AA WHERE X eid %(x)s, X address A, X alias AA', |
|
272 {'x': addr.eid}) |
|
273 self.assertEqual(rset.rows, [[u'a@b.com', u'foo']]) |
|
274 |
|
275 def test_cw_set_in_before_add(self): |
|
276 # local hook |
|
277 class DummyBeforeHook(Hook): |
|
278 __regid__ = 'dummy-before-hook' |
|
279 __select__ = Hook.__select__ & is_instance('EmailAddress') |
|
280 events = ('before_add_entity',) |
|
281 def __call__(self): |
|
282 # cw_set is forbidden within before_add_entity() |
|
283 self.entity.cw_set(alias=u'foo') |
|
284 |
|
285 with self.admin_access.repo_cnx() as cnx: |
|
286 with self.temporary_appobjects(DummyBeforeHook): |
|
287 # XXX will fail with python -O |
|
288 self.assertRaises(AssertionError, cnx.create_entity, |
|
289 'EmailAddress', address=u'a@b.fr') |
|
290 |
|
291 def test_multiple_edit_cw_set(self): |
|
292 """make sure cw_edited doesn't get cluttered |
|
293 by previous entities on multiple set |
|
294 """ |
|
295 # local hook |
|
296 class DummyBeforeHook(Hook): |
|
297 _test = self # keep reference to test instance |
|
298 __regid__ = 'dummy-before-hook' |
|
299 __select__ = Hook.__select__ & is_instance('Affaire') |
|
300 events = ('before_update_entity',) |
|
301 def __call__(self): |
|
302 # invoiced attribute shouldn't be considered "edited" before the hook |
|
303 self._test.assertFalse('invoiced' in self.entity.cw_edited, |
|
304 'cw_edited cluttered by previous update') |
|
305 self.entity.cw_edited['invoiced'] = 10 |
|
306 |
|
307 with self.admin_access.repo_cnx() as cnx: |
|
308 with self.temporary_appobjects(DummyBeforeHook): |
|
309 cnx.create_entity('Affaire', ref=u'AFF01') |
|
310 cnx.create_entity('Affaire', ref=u'AFF02') |
|
311 cnx.execute('SET A duration 10 WHERE A is Affaire') |
|
312 |
|
313 |
|
314 def test_user_friendly_error(self): |
|
315 from cubicweb.entities.adapters import IUserFriendlyUniqueTogether |
|
316 class MyIUserFriendlyUniqueTogether(IUserFriendlyUniqueTogether): |
|
317 __select__ = IUserFriendlyUniqueTogether.__select__ & is_instance('Societe') |
|
318 def raise_user_exception(self): |
|
319 raise ValidationError(self.entity.eid, {'hip': 'hop'}) |
|
320 |
|
321 with self.admin_access.repo_cnx() as cnx: |
|
322 with self.temporary_appobjects(MyIUserFriendlyUniqueTogether): |
|
323 s = cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'75013') |
|
324 cnx.commit() |
|
325 with self.assertRaises(ValidationError) as cm: |
|
326 cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'75013') |
|
327 self.assertEqual(cm.exception.errors, {'hip': 'hop'}) |
|
328 cnx.rollback() |
|
329 cnx.create_entity('Societe', nom=u'Logilab', type=u'ssll', cp=u'31400') |
|
330 with self.assertRaises(ValidationError) as cm: |
|
331 s.cw_set(cp=u'31400') |
|
332 self.assertEqual(cm.exception.entity, s.eid) |
|
333 self.assertEqual(cm.exception.errors, {'hip': 'hop'}) |
|
334 cnx.rollback() |
|
335 |
|
336 def test_attribute_cache(self): |
|
337 with self.admin_access.repo_cnx() as cnx: |
|
338 bk = cnx.create_entity('Bookmark', title=u'index', path=u'/') |
|
339 cnx.commit() |
|
340 self.assertEqual(bk.title, 'index') |
|
341 bk.cw_set(title=u'root') |
|
342 self.assertEqual(bk.title, 'root') |
|
343 cnx.commit() |
|
344 self.assertEqual(bk.title, 'root') |
|
345 |
|
346 class SchemaDeserialTC(CubicWebTC): |
|
347 |
|
348 appid = 'data-schemaserial' |
|
349 |
|
350 @classmethod |
|
351 def setUpClass(cls): |
|
352 register_base_type('BabarTestType', ('jungle_speed',)) |
|
353 helper = get_db_helper('sqlite') |
|
354 helper.TYPE_MAPPING['BabarTestType'] = 'TEXT' |
|
355 helper.TYPE_CONVERTERS['BabarTestType'] = lambda x: '"%s"' % x |
|
356 super(SchemaDeserialTC, cls).setUpClass() |
|
357 |
|
358 |
|
359 @classmethod |
|
360 def tearDownClass(cls): |
|
361 unregister_base_type('BabarTestType') |
|
362 helper = get_db_helper('sqlite') |
|
363 helper.TYPE_MAPPING.pop('BabarTestType', None) |
|
364 helper.TYPE_CONVERTERS.pop('BabarTestType', None) |
|
365 super(SchemaDeserialTC, cls).tearDownClass() |
|
366 |
|
367 def test_deserialization_base(self): |
|
368 """Check the following deserialization |
|
369 |
|
370 * all CWEtype has name |
|
371 * Final type |
|
372 * CWUniqueTogetherConstraint |
|
373 * _unique_together__ content""" |
|
374 origshema = self.repo.schema |
|
375 try: |
|
376 self.repo.config.repairing = True # avoid versions checking |
|
377 self.repo.set_schema(self.repo.deserialize_schema()) |
|
378 table = SQL_PREFIX + 'CWEType' |
|
379 namecol = SQL_PREFIX + 'name' |
|
380 finalcol = SQL_PREFIX + 'final' |
|
381 with self.admin_access.repo_cnx() as cnx: |
|
382 cu = cnx.system_sql('SELECT %s FROM %s WHERE %s is NULL' |
|
383 % (namecol, table, finalcol)) |
|
384 self.assertEqual(cu.fetchall(), []) |
|
385 cu = cnx.system_sql('SELECT %s FROM %s ' |
|
386 'WHERE %s=%%(final)s ORDER BY %s' |
|
387 % (namecol, table, finalcol, namecol), |
|
388 {'final': True}) |
|
389 self.assertEqual(cu.fetchall(), |
|
390 [(u'BabarTestType',), |
|
391 (u'BigInt',), (u'Boolean',), (u'Bytes',), |
|
392 (u'Date',), (u'Datetime',), |
|
393 (u'Decimal',),(u'Float',), |
|
394 (u'Int',), |
|
395 (u'Interval',), (u'Password',), |
|
396 (u'String',), |
|
397 (u'TZDatetime',), (u'TZTime',), (u'Time',)]) |
|
398 sql = ("SELECT etype.cw_eid, etype.cw_name, cstr.cw_eid, rel.eid_to " |
|
399 "FROM cw_CWUniqueTogetherConstraint as cstr, " |
|
400 " relations_relation as rel, " |
|
401 " cw_CWEType as etype " |
|
402 "WHERE cstr.cw_eid = rel.eid_from " |
|
403 " AND cstr.cw_constraint_of = etype.cw_eid " |
|
404 " AND etype.cw_name = 'Personne' " |
|
405 ";") |
|
406 cu = cnx.system_sql(sql) |
|
407 rows = cu.fetchall() |
|
408 self.assertEqual(len(rows), 3) |
|
409 person = self.repo.schema.eschema('Personne') |
|
410 self.assertEqual(len(person._unique_together), 1) |
|
411 self.assertItemsEqual(person._unique_together[0], |
|
412 ('nom', 'prenom', 'inline2')) |
|
413 |
|
414 finally: |
|
415 self.repo.set_schema(origshema) |
|
416 |
|
417 def test_custom_attribute_param(self): |
|
418 origshema = self.repo.schema |
|
419 try: |
|
420 self.repo.config.repairing = True # avoid versions checking |
|
421 self.repo.set_schema(self.repo.deserialize_schema()) |
|
422 pes = self.repo.schema['Personne'] |
|
423 attr = pes.rdef('custom_field_of_jungle') |
|
424 self.assertIn('jungle_speed', vars(attr)) |
|
425 self.assertEqual(42, attr.jungle_speed) |
|
426 finally: |
|
427 self.repo.set_schema(origshema) |
|
428 |
|
429 |
|
430 |
|
431 class DataHelpersTC(CubicWebTC): |
|
432 |
|
433 def test_type_from_eid(self): |
|
434 with self.admin_access.repo_cnx() as cnx: |
|
435 self.assertEqual(self.repo.type_from_eid(2, cnx), 'CWGroup') |
|
436 |
|
437 def test_type_from_eid_raise(self): |
|
438 with self.admin_access.repo_cnx() as cnx: |
|
439 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, cnx) |
|
440 |
|
441 def test_add_delete_info(self): |
|
442 with self.admin_access.repo_cnx() as cnx: |
|
443 entity = self.repo.vreg['etypes'].etype_class('Personne')(cnx) |
|
444 entity.eid = -1 |
|
445 entity.complete = lambda x: None |
|
446 self.repo.add_info(cnx, entity, self.repo.system_source) |
|
447 cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1') |
|
448 data = cu.fetchall() |
|
449 self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None)]) |
|
450 self.repo._delete_cascade_multi(cnx, [entity]) |
|
451 self.repo.system_source.delete_info_multi(cnx, [entity]) |
|
452 cu = cnx.system_sql('SELECT * FROM entities WHERE eid = -1') |
|
453 data = cu.fetchall() |
|
454 self.assertEqual(data, []) |
|
455 |
|
456 |
|
457 class FTITC(CubicWebTC): |
|
458 |
|
459 def test_fulltext_container_entity(self): |
|
460 with self.admin_access.repo_cnx() as cnx: |
|
461 assert self.schema.rschema('use_email').fulltext_container == 'subject' |
|
462 toto = cnx.create_entity('EmailAddress', address=u'toto@logilab.fr') |
|
463 cnx.commit() |
|
464 rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
|
465 self.assertEqual(rset.rows, []) |
|
466 cnx.user.cw_set(use_email=toto) |
|
467 cnx.commit() |
|
468 rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
|
469 self.assertEqual(rset.rows, [[cnx.user.eid]]) |
|
470 cnx.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', |
|
471 {'y': toto.eid}) |
|
472 cnx.commit() |
|
473 rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
|
474 self.assertEqual(rset.rows, []) |
|
475 tutu = cnx.create_entity('EmailAddress', address=u'tutu@logilab.fr') |
|
476 cnx.user.cw_set(use_email=tutu) |
|
477 cnx.commit() |
|
478 rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
|
479 self.assertEqual(rset.rows, [[cnx.user.eid]]) |
|
480 tutu.cw_set(address=u'hip@logilab.fr') |
|
481 cnx.commit() |
|
482 rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
|
483 self.assertEqual(rset.rows, []) |
|
484 rset = cnx.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) |
|
485 self.assertEqual(rset.rows, [[cnx.user.eid]]) |
|
486 |
|
487 def test_no_uncessary_ftiindex_op(self): |
|
488 with self.admin_access.repo_cnx() as cnx: |
|
489 cnx.create_entity('Workflow', |
|
490 name=u'dummy workflow', |
|
491 description=u'huuuuu') |
|
492 self.assertFalse(any(x for x in cnx.pending_operations |
|
493 if isinstance(x, native.FTIndexEntityOp))) |
|
494 |
|
495 |
|
496 class DBInitTC(CubicWebTC): |
|
497 |
|
498 def test_versions_inserted(self): |
|
499 with self.admin_access.repo_cnx() as cnx: |
|
500 inserted = [r[0] |
|
501 for r in cnx.execute('Any K ORDERBY K ' |
|
502 'WHERE P pkey K, P pkey ~= "system.version.%"')] |
|
503 self.assertEqual(inserted, |
|
504 [u'system.version.basket', |
|
505 u'system.version.card', |
|
506 u'system.version.comment', |
|
507 u'system.version.cubicweb', |
|
508 u'system.version.file', |
|
509 u'system.version.localperms', |
|
510 u'system.version.tag']) |
|
511 |
|
512 CALLED = [] |
|
513 |
|
514 class InlineRelHooksTC(CubicWebTC): |
|
515 """test relation hooks are called for inlined relations |
|
516 """ |
|
517 def setUp(self): |
|
518 CubicWebTC.setUp(self) |
|
519 CALLED[:] = () |
|
520 |
|
521 def test_inline_relation(self): |
|
522 """make sure <event>_relation hooks are called for inlined relation""" |
|
523 |
|
524 class EcritParHook(hook.Hook): |
|
525 __regid__ = 'inlinedrelhook' |
|
526 __select__ = hook.Hook.__select__ & hook.match_rtype('ecrit_par') |
|
527 events = ('before_add_relation', 'after_add_relation', |
|
528 'before_delete_relation', 'after_delete_relation') |
|
529 def __call__(self): |
|
530 CALLED.append((self.event, self.eidfrom, self.rtype, self.eidto)) |
|
531 |
|
532 with self.temporary_appobjects(EcritParHook): |
|
533 with self.admin_access.repo_cnx() as cnx: |
|
534 eidp = cnx.execute('INSERT Personne X: X nom "toto"')[0][0] |
|
535 eidn = cnx.execute('INSERT Note X: X type "T"')[0][0] |
|
536 cnx.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
537 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
|
538 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
|
539 CALLED[:] = () |
|
540 cnx.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
|
541 self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), |
|
542 ('after_delete_relation', eidn, 'ecrit_par', eidp)]) |
|
543 CALLED[:] = () |
|
544 eidn = cnx.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] |
|
545 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
|
546 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
|
547 |
|
548 def test_unique_contraint(self): |
|
549 with self.admin_access.repo_cnx() as cnx: |
|
550 toto = cnx.create_entity('Personne', nom=u'toto') |
|
551 a01 = cnx.create_entity('Affaire', ref=u'A01', todo_by=toto) |
|
552 cnx.commit() |
|
553 cnx.create_entity('Note', type=u'todo', inline1=a01) |
|
554 cnx.commit() |
|
555 cnx.create_entity('Note', type=u'todo', inline1=a01) |
|
556 with self.assertRaises(ValidationError) as cm: |
|
557 cnx.commit() |
|
558 self.assertEqual(cm.exception.errors, |
|
559 {'inline1-subject': u'RQLUniqueConstraint S type T, S inline1 A1, ' |
|
560 'A1 todo_by C, Y type T, Y inline1 A2, A2 todo_by C failed'}) |
|
561 |
|
562 def test_add_relations_at_creation_with_del_existing_rel(self): |
|
563 with self.admin_access.repo_cnx() as cnx: |
|
564 person = cnx.create_entity('Personne', |
|
565 nom=u'Toto', |
|
566 prenom=u'Lanturlu', |
|
567 sexe=u'M') |
|
568 users_rql = 'Any U WHERE U is CWGroup, U name "users"' |
|
569 users = cnx.execute(users_rql).get_entity(0, 0) |
|
570 cnx.create_entity('CWUser', |
|
571 login=u'Toto', |
|
572 upassword=u'firstname', |
|
573 firstname=u'firstname', |
|
574 surname=u'surname', |
|
575 reverse_login_user=person, |
|
576 in_group=users) |
|
577 cnx.commit() |
|
578 |
|
579 |
|
580 class PerformanceTest(CubicWebTC): |
|
581 def setUp(self): |
|
582 super(PerformanceTest, self).setUp() |
|
583 logger = logging.getLogger('cubicweb.session') |
|
584 #logger.handlers = [logging.StreamHandler(sys.stdout)] |
|
585 logger.setLevel(logging.INFO) |
|
586 self.info = logger.info |
|
587 |
|
588 def tearDown(self): |
|
589 super(PerformanceTest, self).tearDown() |
|
590 logger = logging.getLogger('cubicweb.session') |
|
591 logger.setLevel(logging.CRITICAL) |
|
592 |
|
593 def test_composite_deletion(self): |
|
594 with self.admin_access.repo_cnx() as cnx: |
|
595 personnes = [] |
|
596 t0 = time.time() |
|
597 for i in range(2000): |
|
598 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
599 personnes.append(p) |
|
600 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') |
|
601 for j in range(0, 2000, 100): |
|
602 abraham.cw_set(personne_composite=personnes[j:j+100]) |
|
603 t1 = time.time() |
|
604 self.info('creation: %.2gs', (t1 - t0)) |
|
605 cnx.commit() |
|
606 t2 = time.time() |
|
607 self.info('commit creation: %.2gs', (t2 - t1)) |
|
608 cnx.execute('DELETE Personne P WHERE P eid %(eid)s', {'eid': abraham.eid}) |
|
609 t3 = time.time() |
|
610 self.info('deletion: %.2gs', (t3 - t2)) |
|
611 cnx.commit() |
|
612 t4 = time.time() |
|
613 self.info("commit deletion: %2gs", (t4 - t3)) |
|
614 |
|
615 def test_add_relation_non_inlined(self): |
|
616 with self.admin_access.repo_cnx() as cnx: |
|
617 personnes = [] |
|
618 for i in range(2000): |
|
619 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
620 personnes.append(p) |
|
621 cnx.commit() |
|
622 t0 = time.time() |
|
623 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M', |
|
624 personne_composite=personnes[:100]) |
|
625 t1 = time.time() |
|
626 self.info('creation: %.2gs', (t1 - t0)) |
|
627 for j in range(100, 2000, 100): |
|
628 abraham.cw_set(personne_composite=personnes[j:j+100]) |
|
629 t2 = time.time() |
|
630 self.info('more relations: %.2gs', (t2-t1)) |
|
631 cnx.commit() |
|
632 t3 = time.time() |
|
633 self.info('commit creation: %.2gs', (t3 - t2)) |
|
634 |
|
635 def test_add_relation_inlined(self): |
|
636 with self.admin_access.repo_cnx() as cnx: |
|
637 personnes = [] |
|
638 for i in range(2000): |
|
639 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
640 personnes.append(p) |
|
641 cnx.commit() |
|
642 t0 = time.time() |
|
643 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M', |
|
644 personne_inlined=personnes[:100]) |
|
645 t1 = time.time() |
|
646 self.info('creation: %.2gs', (t1 - t0)) |
|
647 for j in range(100, 2000, 100): |
|
648 abraham.cw_set(personne_inlined=personnes[j:j+100]) |
|
649 t2 = time.time() |
|
650 self.info('more relations: %.2gs', (t2-t1)) |
|
651 cnx.commit() |
|
652 t3 = time.time() |
|
653 self.info('commit creation: %.2gs', (t3 - t2)) |
|
654 |
|
655 |
|
656 def test_session_add_relation(self): |
|
657 """ to be compared with test_session_add_relations""" |
|
658 with self.admin_access.repo_cnx() as cnx: |
|
659 personnes = [] |
|
660 for i in range(2000): |
|
661 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
662 personnes.append(p) |
|
663 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') |
|
664 cnx.commit() |
|
665 t0 = time.time() |
|
666 add_relation = cnx.add_relation |
|
667 for p in personnes: |
|
668 add_relation(abraham.eid, 'personne_composite', p.eid) |
|
669 cnx.commit() |
|
670 t1 = time.time() |
|
671 self.info('add relation: %.2gs', t1-t0) |
|
672 |
|
673 def test_session_add_relations (self): |
|
674 """ to be compared with test_session_add_relation""" |
|
675 with self.admin_access.repo_cnx() as cnx: |
|
676 personnes = [] |
|
677 for i in range(2000): |
|
678 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
679 personnes.append(p) |
|
680 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') |
|
681 cnx.commit() |
|
682 t0 = time.time() |
|
683 add_relations = cnx.add_relations |
|
684 relations = [('personne_composite', [(abraham.eid, p.eid) for p in personnes])] |
|
685 add_relations(relations) |
|
686 cnx.commit() |
|
687 t1 = time.time() |
|
688 self.info('add relations: %.2gs', t1-t0) |
|
689 |
|
690 def test_session_add_relation_inlined(self): |
|
691 """ to be compared with test_session_add_relations""" |
|
692 with self.admin_access.repo_cnx() as cnx: |
|
693 personnes = [] |
|
694 for i in range(2000): |
|
695 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
696 personnes.append(p) |
|
697 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') |
|
698 cnx.commit() |
|
699 t0 = time.time() |
|
700 add_relation = cnx.add_relation |
|
701 for p in personnes: |
|
702 add_relation(abraham.eid, 'personne_inlined', p.eid) |
|
703 cnx.commit() |
|
704 t1 = time.time() |
|
705 self.info('add relation (inlined): %.2gs', t1-t0) |
|
706 |
|
707 def test_session_add_relations_inlined (self): |
|
708 """ to be compared with test_session_add_relation""" |
|
709 with self.admin_access.repo_cnx() as cnx: |
|
710 personnes = [] |
|
711 for i in range(2000): |
|
712 p = cnx.create_entity('Personne', nom=u'Doe%03d'%i, prenom=u'John', sexe=u'M') |
|
713 personnes.append(p) |
|
714 abraham = cnx.create_entity('Personne', nom=u'Abraham', prenom=u'John', sexe=u'M') |
|
715 cnx.commit() |
|
716 t0 = time.time() |
|
717 add_relations = cnx.add_relations |
|
718 relations = [('personne_inlined', [(abraham.eid, p.eid) for p in personnes])] |
|
719 add_relations(relations) |
|
720 cnx.commit() |
|
721 t1 = time.time() |
|
722 self.info('add relations (inlined): %.2gs', t1-t0) |
|
723 |
|
724 def test_optional_relation_reset_1(self): |
|
725 with self.admin_access.repo_cnx() as cnx: |
|
726 p1 = cnx.create_entity('Personne', nom=u'Vincent') |
|
727 p2 = cnx.create_entity('Personne', nom=u'Florent') |
|
728 w = cnx.create_entity('Affaire', ref=u'wc') |
|
729 w.cw_set(todo_by=[p1,p2]) |
|
730 w.cw_clear_all_caches() |
|
731 cnx.commit() |
|
732 self.assertEqual(len(w.todo_by), 1) |
|
733 self.assertEqual(w.todo_by[0].eid, p2.eid) |
|
734 |
|
735 def test_optional_relation_reset_2(self): |
|
736 with self.admin_access.repo_cnx() as cnx: |
|
737 p1 = cnx.create_entity('Personne', nom=u'Vincent') |
|
738 p2 = cnx.create_entity('Personne', nom=u'Florent') |
|
739 w = cnx.create_entity('Affaire', ref=u'wc') |
|
740 w.cw_set(todo_by=p1) |
|
741 cnx.commit() |
|
742 w.cw_set(todo_by=p2) |
|
743 w.cw_clear_all_caches() |
|
744 cnx.commit() |
|
745 self.assertEqual(len(w.todo_by), 1) |
|
746 self.assertEqual(w.todo_by[0].eid, p2.eid) |
|
747 |
|
748 |
|
749 if __name__ == '__main__': |
|
750 from logilab.common.testlib import unittest_main |
|
751 unittest_main() |