63 namecol = SQL_PREFIX + 'name' |
63 namecol = SQL_PREFIX + 'name' |
64 finalcol = SQL_PREFIX + 'final' |
64 finalcol = SQL_PREFIX + 'final' |
65 self.session.set_pool() |
65 self.session.set_pool() |
66 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( |
66 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( |
67 namecol, table, finalcol)) |
67 namecol, table, finalcol)) |
68 self.assertEquals(cu.fetchall(), []) |
68 self.assertEqual(cu.fetchall(), []) |
69 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' |
69 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' |
70 % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) |
70 % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) |
71 self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), |
71 self.assertEqual(cu.fetchall(), [(u'Boolean',), (u'Bytes',), |
72 (u'Date',), (u'Datetime',), |
72 (u'Date',), (u'Datetime',), |
73 (u'Decimal',),(u'Float',), |
73 (u'Decimal',),(u'Float',), |
74 (u'Int',), |
74 (u'Int',), |
75 (u'Interval',), (u'Password',), |
75 (u'Interval',), (u'Password',), |
76 (u'String',), (u'Time',)]) |
76 (u'String',), (u'Time',)]) |
134 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
134 {'login': u"barnabé", 'passwd': u"héhéhé".encode('UTF8')}) |
135 repo.commit(cnxid) |
135 repo.commit(cnxid) |
136 repo.close(cnxid) |
136 repo.close(cnxid) |
137 self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8'))) |
137 self.assert_(repo.connect(u"barnabé", password=u"héhéhé".encode('UTF8'))) |
138 |
138 |
139 def test_invalid_entity_rollback(self): |
139 def test_rollback_on_commit_error(self): |
140 cnxid = self.repo.connect(self.admlogin, password=self.admpassword) |
140 cnxid = self.repo.connect(self.admlogin, password=self.admpassword) |
141 # no group |
|
142 self.repo.execute(cnxid, |
141 self.repo.execute(cnxid, |
143 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', |
142 'INSERT CWUser X: X login %(login)s, X upassword %(passwd)s', |
144 {'login': u"tutetute", 'passwd': 'tutetute'}) |
143 {'login': u"tutetute", 'passwd': 'tutetute'}) |
145 self.assertRaises(ValidationError, self.repo.commit, cnxid) |
144 self.assertRaises(ValidationError, self.repo.commit, cnxid) |
146 self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) |
145 self.failIf(self.repo.execute(cnxid, 'CWUser X WHERE X login "tutetute"')) |
147 |
146 |
|
147 def test_rollback_on_execute_validation_error(self): |
|
148 class ValidationErrorAfterHook(Hook): |
|
149 __regid__ = 'valerror-after-hook' |
|
150 __select__ = Hook.__select__ & is_instance('CWGroup') |
|
151 events = ('after_update_entity',) |
|
152 def __call__(self): |
|
153 raise ValidationError(self.entity.eid, {}) |
|
154 with self.temporary_appobjects(ValidationErrorAfterHook): |
|
155 self.assertRaises(ValidationError, |
|
156 self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') |
|
157 self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"')) |
|
158 |
|
159 def test_rollback_on_execute_unauthorized(self): |
|
160 class UnauthorizedAfterHook(Hook): |
|
161 __regid__ = 'unauthorized-after-hook' |
|
162 __select__ = Hook.__select__ & is_instance('CWGroup') |
|
163 events = ('after_update_entity',) |
|
164 def __call__(self): |
|
165 raise Unauthorized() |
|
166 with self.temporary_appobjects(UnauthorizedAfterHook): |
|
167 self.assertRaises(Unauthorized, |
|
168 self.execute, 'SET X name "toto" WHERE X is CWGroup, X name "guests"') |
|
169 self.failIf(self.execute('Any X WHERE X is CWGroup, X name "toto"')) |
|
170 |
|
171 |
148 def test_close(self): |
172 def test_close(self): |
149 repo = self.repo |
173 repo = self.repo |
150 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
174 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
151 self.assert_(cnxid) |
175 self.assert_(cnxid) |
152 repo.close(cnxid) |
176 repo.close(cnxid) |
159 def test_shared_data(self): |
183 def test_shared_data(self): |
160 repo = self.repo |
184 repo = self.repo |
161 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
185 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
162 repo.set_shared_data(cnxid, 'data', 4) |
186 repo.set_shared_data(cnxid, 'data', 4) |
163 cnxid2 = repo.connect(self.admlogin, password=self.admpassword) |
187 cnxid2 = repo.connect(self.admlogin, password=self.admpassword) |
164 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
188 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
165 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
189 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) |
166 repo.set_shared_data(cnxid2, 'data', 5) |
190 repo.set_shared_data(cnxid2, 'data', 5) |
167 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
191 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
168 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5) |
192 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5) |
169 repo.get_shared_data(cnxid2, 'data', pop=True) |
193 repo.get_shared_data(cnxid2, 'data', pop=True) |
170 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
194 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
171 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
195 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) |
172 repo.close(cnxid) |
196 repo.close(cnxid) |
173 repo.close(cnxid2) |
197 repo.close(cnxid2) |
174 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
198 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
175 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') |
199 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') |
176 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
200 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
186 def test_transaction_base(self): |
210 def test_transaction_base(self): |
187 repo = self.repo |
211 repo = self.repo |
188 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
212 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
189 # check db state |
213 # check db state |
190 result = repo.execute(cnxid, 'Personne X') |
214 result = repo.execute(cnxid, 'Personne X') |
191 self.assertEquals(result.rowcount, 0) |
215 self.assertEqual(result.rowcount, 0) |
192 # rollback entity insertion |
216 # rollback entity insertion |
193 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
217 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
194 result = repo.execute(cnxid, 'Personne X') |
218 result = repo.execute(cnxid, 'Personne X') |
195 self.assertEquals(result.rowcount, 1) |
219 self.assertEqual(result.rowcount, 1) |
196 repo.rollback(cnxid) |
220 repo.rollback(cnxid) |
197 result = repo.execute(cnxid, 'Personne X') |
221 result = repo.execute(cnxid, 'Personne X') |
198 self.assertEquals(result.rowcount, 0, result.rows) |
222 self.assertEqual(result.rowcount, 0, result.rows) |
199 # commit |
223 # commit |
200 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
224 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
201 repo.commit(cnxid) |
225 repo.commit(cnxid) |
202 result = repo.execute(cnxid, 'Personne X') |
226 result = repo.execute(cnxid, 'Personne X') |
203 self.assertEquals(result.rowcount, 1) |
227 self.assertEqual(result.rowcount, 1) |
204 |
228 |
205 def test_transaction_base2(self): |
229 def test_transaction_base2(self): |
206 repo = self.repo |
230 repo = self.repo |
207 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
231 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
208 # rollback relation insertion |
232 # rollback relation insertion |
209 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
233 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
210 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
234 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
211 self.assertEquals(result.rowcount, 1) |
235 self.assertEqual(result.rowcount, 1) |
212 repo.rollback(cnxid) |
236 repo.rollback(cnxid) |
213 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
237 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
214 self.assertEquals(result.rowcount, 0, result.rows) |
238 self.assertEqual(result.rowcount, 0, result.rows) |
215 |
239 |
216 def test_transaction_base3(self): |
240 def test_transaction_base3(self): |
217 repo = self.repo |
241 repo = self.repo |
218 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
242 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
219 # rollback state change which trigger TrInfo insertion |
243 # rollback state change which trigger TrInfo insertion |
220 session = repo._get_session(cnxid) |
244 session = repo._get_session(cnxid) |
221 session.set_pool() |
245 session.set_pool() |
222 user = session.user |
246 user = session.user |
223 user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') |
247 user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') |
224 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
248 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
225 self.assertEquals(len(rset), 1) |
249 self.assertEqual(len(rset), 1) |
226 repo.rollback(cnxid) |
250 repo.rollback(cnxid) |
227 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
251 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
228 self.assertEquals(len(rset), 0) |
252 self.assertEqual(len(rset), 0) |
229 |
253 |
230 def test_transaction_interleaved(self): |
254 def test_transaction_interleaved(self): |
231 self.skip('implement me') |
255 self.skipTest('implement me') |
232 |
256 |
233 def test_close_kill_processing_request(self): |
257 def test_close_kill_processing_request(self): |
234 repo = self.repo |
258 repo = self.repo |
235 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
259 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
236 repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') |
260 repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') |
244 def run_transaction(): |
268 def run_transaction(): |
245 repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') |
269 repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') |
246 repo.commit(cnxid) |
270 repo.commit(cnxid) |
247 try: |
271 try: |
248 ex = self.assertRaises(Exception, run_transaction) |
272 ex = self.assertRaises(Exception, run_transaction) |
249 self.assertEquals(str(ex), 'try to access pool on a closed session') |
273 self.assertEqual(str(ex), 'try to access pool on a closed session') |
250 finally: |
274 finally: |
251 t.join() |
275 t.join() |
252 |
276 |
253 def test_initial_schema(self): |
277 def test_initial_schema(self): |
254 schema = self.repo.schema |
278 schema = self.repo.schema |
255 # check order of attributes is respected |
279 # check order of attributes is respected |
256 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
280 self.assertListEqual([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
257 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
281 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
258 'creation_date', 'modification_date', 'cwuri', |
282 'creation_date', 'modification_date', 'cwuri', |
259 'owned_by', 'created_by', |
283 'owned_by', 'created_by', |
260 'update_permission', 'read_permission', |
284 'update_permission', 'read_permission', |
261 'in_basket')], |
285 'in_basket')], |
264 'constrained_by', |
288 'constrained_by', |
265 'cardinality', 'ordernum', |
289 'cardinality', 'ordernum', |
266 'indexed', 'fulltextindexed', 'internationalizable', |
290 'indexed', 'fulltextindexed', 'internationalizable', |
267 'defaultval', 'description', 'description_format']) |
291 'defaultval', 'description', 'description_format']) |
268 |
292 |
269 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
293 self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name') |
270 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
294 self.assertEqual(schema.eschema('State').main_attribute(), 'name') |
271 |
295 |
272 constraints = schema.rschema('name').rdef('CWEType', 'String').constraints |
296 constraints = schema.rschema('name').rdef('CWEType', 'String').constraints |
273 self.assertEquals(len(constraints), 2) |
297 self.assertEqual(len(constraints), 2) |
274 for cstr in constraints[:]: |
298 for cstr in constraints[:]: |
275 if isinstance(cstr, UniqueConstraint): |
299 if isinstance(cstr, UniqueConstraint): |
276 constraints.remove(cstr) |
300 constraints.remove(cstr) |
277 break |
301 break |
278 else: |
302 else: |
279 self.fail('unique constraint not found') |
303 self.fail('unique constraint not found') |
280 sizeconstraint = constraints[0] |
304 sizeconstraint = constraints[0] |
281 self.assertEquals(sizeconstraint.min, None) |
305 self.assertEqual(sizeconstraint.min, None) |
282 self.assertEquals(sizeconstraint.max, 64) |
306 self.assertEqual(sizeconstraint.max, 64) |
283 |
307 |
284 constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints |
308 constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints |
285 self.assertEquals(len(constraints), 1) |
309 self.assertEqual(len(constraints), 1) |
286 cstr = constraints[0] |
310 cstr = constraints[0] |
287 self.assert_(isinstance(cstr, RQLConstraint)) |
311 self.assert_(isinstance(cstr, RQLConstraint)) |
288 self.assertEquals(cstr.restriction, 'O final TRUE') |
312 self.assertEqual(cstr.restriction, 'O final TRUE') |
289 |
313 |
290 ownedby = schema.rschema('owned_by') |
314 ownedby = schema.rschema('owned_by') |
291 self.assertEquals(ownedby.objects('CWEType'), ('CWUser',)) |
315 self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) |
292 |
316 |
293 def test_pyro(self): |
317 def test_pyro(self): |
294 import Pyro |
318 import Pyro |
295 Pyro.config.PYRO_MULTITHREADED = 0 |
319 Pyro.config.PYRO_MULTITHREADED = 0 |
296 done = [] |
320 done = [] |
335 |
359 |
336 def test_internal_api(self): |
360 def test_internal_api(self): |
337 repo = self.repo |
361 repo = self.repo |
338 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
362 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
339 session = repo._get_session(cnxid, setpool=True) |
363 session = repo._get_session(cnxid, setpool=True) |
340 self.assertEquals(repo.type_and_source_from_eid(1, session), |
364 self.assertEqual(repo.type_and_source_from_eid(1, session), |
341 ('CWGroup', 'system', None)) |
365 ('CWGroup', 'system', None)) |
342 self.assertEquals(repo.type_from_eid(1, session), 'CWGroup') |
366 self.assertEqual(repo.type_from_eid(1, session), 'CWGroup') |
343 self.assertEquals(repo.source_from_eid(1, session).uri, 'system') |
367 self.assertEqual(repo.source_from_eid(1, session).uri, 'system') |
344 self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None) |
368 self.assertEqual(repo.eid2extid(repo.system_source, 1, session), None) |
345 class dummysource: uri = 'toto' |
369 class dummysource: uri = 'toto' |
346 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
370 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
347 |
371 |
348 def test_public_api(self): |
372 def test_public_api(self): |
349 self.assertEquals(self.repo.get_schema(), self.repo.schema) |
373 self.assertEqual(self.repo.get_schema(), self.repo.schema) |
350 self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
374 self.assertEqual(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
351 # .properties() return a result set |
375 # .properties() return a result set |
352 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
376 self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
353 |
377 |
354 def test_session_api(self): |
378 def test_session_api(self): |
355 repo = self.repo |
379 repo = self.repo |
356 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
380 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
357 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
381 self.assertEqual(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
358 self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
382 self.assertEqual(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
359 repo.close(cnxid) |
383 repo.close(cnxid) |
360 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
384 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
361 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
385 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
362 |
386 |
363 def test_shared_data_api(self): |
387 def test_shared_data_api(self): |
364 repo = self.repo |
388 repo = self.repo |
365 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
389 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
366 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
390 self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) |
367 repo.set_shared_data(cnxid, 'data', 4) |
391 repo.set_shared_data(cnxid, 'data', 4) |
368 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
392 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
369 repo.get_shared_data(cnxid, 'data', pop=True) |
393 repo.get_shared_data(cnxid, 'data', pop=True) |
370 repo.get_shared_data(cnxid, 'whatever', pop=True) |
394 repo.get_shared_data(cnxid, 'whatever', pop=True) |
371 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
395 self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) |
372 repo.close(cnxid) |
396 repo.close(cnxid) |
373 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
397 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
374 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
398 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
375 |
399 |
376 def test_schema_is_relation(self): |
400 def test_schema_is_relation(self): |
392 p1 = self.request().create_entity('Personne', nom=u'toto') |
416 p1 = self.request().create_entity('Personne', nom=u'toto') |
393 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
417 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
394 {'x': note.eid, 'p': p1.eid}) |
418 {'x': note.eid, 'p': p1.eid}) |
395 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
419 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
396 {'x': note.eid}) |
420 {'x': note.eid}) |
397 self.assertEquals(len(rset), 1) |
421 self.assertEqual(len(rset), 1) |
398 p2 = self.request().create_entity('Personne', nom=u'tutu') |
422 p2 = self.request().create_entity('Personne', nom=u'tutu') |
399 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
423 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
400 {'x': note.eid, 'p': p2.eid}) |
424 {'x': note.eid, 'p': p2.eid}) |
401 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
425 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
402 {'x': note.eid}) |
426 {'x': note.eid}) |
403 self.assertEquals(len(rset), 1) |
427 self.assertEqual(len(rset), 1) |
404 self.assertEquals(rset.rows[0][0], p2.eid) |
428 self.assertEqual(rset.rows[0][0], p2.eid) |
405 |
429 |
406 def test_delete_if_object_inlined_singlecard(self): |
430 def test_delete_if_object_inlined_singlecard(self): |
407 req = self.request() |
431 req = self.request() |
408 c = req.create_entity('Card', title=u'Carte') |
432 c = req.create_entity('Card', title=u'Carte') |
409 req.create_entity('Personne', nom=u'Vincent', fiche=c) |
433 req.create_entity('Personne', nom=u'Vincent', fiche=c) |
410 req.create_entity('Personne', nom=u'Florent', fiche=c) |
434 req.create_entity('Personne', nom=u'Florent', fiche=c) |
411 self.commit() |
435 self.commit() |
412 self.assertEquals(len(c.reverse_fiche), 1) |
436 self.assertEqual(len(c.reverse_fiche), 1) |
413 |
437 |
414 def test_set_attributes_in_before_update(self): |
438 def test_set_attributes_in_before_update(self): |
415 # local hook |
439 # local hook |
416 class DummyBeforeHook(Hook): |
440 class DummyBeforeHook(Hook): |
417 __regid__ = 'dummy-before-hook' |
441 __regid__ = 'dummy-before-hook' |
475 self.session.set_pool() |
499 self.session.set_pool() |
476 self.assert_(self.repo.system_source.create_eid(self.session)) |
500 self.assert_(self.repo.system_source.create_eid(self.session)) |
477 |
501 |
478 def test_source_from_eid(self): |
502 def test_source_from_eid(self): |
479 self.session.set_pool() |
503 self.session.set_pool() |
480 self.assertEquals(self.repo.source_from_eid(1, self.session), |
504 self.assertEqual(self.repo.source_from_eid(1, self.session), |
481 self.repo.sources_by_uri['system']) |
505 self.repo.sources_by_uri['system']) |
482 |
506 |
483 def test_source_from_eid_raise(self): |
507 def test_source_from_eid_raise(self): |
484 self.session.set_pool() |
508 self.session.set_pool() |
485 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
509 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
486 |
510 |
487 def test_type_from_eid(self): |
511 def test_type_from_eid(self): |
488 self.session.set_pool() |
512 self.session.set_pool() |
489 self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') |
513 self.assertEqual(self.repo.type_from_eid(1, self.session), 'CWGroup') |
490 |
514 |
491 def test_type_from_eid_raise(self): |
515 def test_type_from_eid_raise(self): |
492 self.session.set_pool() |
516 self.session.set_pool() |
493 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
517 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
494 |
518 |
501 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
525 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
502 data = cu.fetchall() |
526 data = cu.fetchall() |
503 self.assertIsInstance(data[0][3], datetime) |
527 self.assertIsInstance(data[0][3], datetime) |
504 data[0] = list(data[0]) |
528 data[0] = list(data[0]) |
505 data[0][3] = None |
529 data[0][3] = None |
506 self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)]) |
530 self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None, None)]) |
507 self.repo.delete_info(self.session, entity, 'system', None) |
531 self.repo.delete_info(self.session, entity, 'system', None) |
508 #self.repo.commit() |
532 #self.repo.commit() |
509 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
533 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
510 data = cu.fetchall() |
534 data = cu.fetchall() |
511 self.assertEquals(data, []) |
535 self.assertEqual(data, []) |
512 |
536 |
513 |
537 |
514 class FTITC(CubicWebTC): |
538 class FTITC(CubicWebTC): |
515 |
539 |
516 def test_reindex_and_modified_since(self): |
540 def test_reindex_and_modified_since(self): |
517 self.repo.system_source.multisources_etypes.add('Personne') |
541 self.repo.system_source.multisources_etypes.add('Personne') |
518 eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] |
542 eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] |
519 self.commit() |
543 self.commit() |
520 ts = datetime.now() |
544 ts = datetime.now() |
521 self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
545 self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
522 self.session.set_pool() |
546 self.session.set_pool() |
523 cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) |
547 cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) |
524 omtime = cu.fetchone()[0] |
548 omtime = cu.fetchone()[0] |
525 # our sqlite datetime adapter is ignore seconds fraction, so we have to |
549 # our sqlite datetime adapter is ignore seconds fraction, so we have to |
526 # ensure update is done the next seconds |
550 # ensure update is done the next seconds |
527 time.sleep(1 - (ts.second - int(ts.second))) |
551 time.sleep(1 - (ts.second - int(ts.second))) |
528 self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}) |
552 self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}) |
529 self.commit() |
553 self.commit() |
530 self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
554 self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
531 self.session.set_pool() |
555 self.session.set_pool() |
532 cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) |
556 cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) |
533 mtime = cu.fetchone()[0] |
557 mtime = cu.fetchone()[0] |
534 self.failUnless(omtime < mtime) |
558 self.failUnless(omtime < mtime) |
535 self.commit() |
559 self.commit() |
536 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
560 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
537 self.assertEquals(modified, [('Personne', eidp)]) |
561 self.assertEqual(modified, [('Personne', eidp)]) |
538 self.assertEquals(deleted, []) |
562 self.assertEqual(deleted, []) |
539 date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime) |
563 date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime) |
540 self.assertEquals(modified, []) |
564 self.assertEqual(modified, []) |
541 self.assertEquals(deleted, []) |
565 self.assertEqual(deleted, []) |
542 self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp}) |
566 self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp}) |
543 self.commit() |
567 self.commit() |
544 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
568 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
545 self.assertEquals(modified, []) |
569 self.assertEqual(modified, []) |
546 self.assertEquals(deleted, [('Personne', eidp)]) |
570 self.assertEqual(deleted, [('Personne', eidp)]) |
547 |
571 |
548 def test_fulltext_container_entity(self): |
572 def test_fulltext_container_entity(self): |
549 assert self.schema.rschema('use_email').fulltext_container == 'subject' |
573 assert self.schema.rschema('use_email').fulltext_container == 'subject' |
550 req = self.request() |
574 req = self.request() |
551 toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr') |
575 toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr') |
552 self.commit() |
576 self.commit() |
553 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
577 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
554 self.assertEquals(rset.rows, []) |
578 self.assertEqual(rset.rows, []) |
555 req.user.set_relations(use_email=toto) |
579 req.user.set_relations(use_email=toto) |
556 self.commit() |
580 self.commit() |
557 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
581 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
558 self.assertEquals(rset.rows, [[req.user.eid]]) |
582 self.assertEqual(rset.rows, [[req.user.eid]]) |
559 req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', |
583 req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', |
560 {'y': toto.eid}) |
584 {'y': toto.eid}) |
561 self.commit() |
585 self.commit() |
562 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
586 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
563 self.assertEquals(rset.rows, []) |
587 self.assertEqual(rset.rows, []) |
564 tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr') |
588 tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr') |
565 req.user.set_relations(use_email=tutu) |
589 req.user.set_relations(use_email=tutu) |
566 self.commit() |
590 self.commit() |
567 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
591 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
568 self.assertEquals(rset.rows, [[req.user.eid]]) |
592 self.assertEqual(rset.rows, [[req.user.eid]]) |
569 tutu.set_attributes(address=u'hip@logilab.fr') |
593 tutu.set_attributes(address=u'hip@logilab.fr') |
570 self.commit() |
594 self.commit() |
571 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
595 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
572 self.assertEquals(rset.rows, []) |
596 self.assertEqual(rset.rows, []) |
573 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) |
597 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) |
574 self.assertEquals(rset.rows, [[req.user.eid]]) |
598 self.assertEqual(rset.rows, [[req.user.eid]]) |
575 |
599 |
576 def test_no_uncessary_ftiindex_op(self): |
600 def test_no_uncessary_ftiindex_op(self): |
577 req = self.request() |
601 req = self.request() |
578 req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu') |
602 req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu') |
579 self.failIf(any(x for x in self.session.pending_operations |
603 self.failIf(any(x for x in self.session.pending_operations |
614 |
638 |
615 with self.temporary_appobjects(EcritParHook): |
639 with self.temporary_appobjects(EcritParHook): |
616 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
640 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
617 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
641 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
618 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
642 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
619 self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
643 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
620 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
644 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
621 CALLED[:] = () |
645 CALLED[:] = () |
622 self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
646 self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
623 self.assertEquals(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), |
647 self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), |
624 ('after_delete_relation', eidn, 'ecrit_par', eidp)]) |
648 ('after_delete_relation', eidn, 'ecrit_par', eidp)]) |
625 CALLED[:] = () |
649 CALLED[:] = () |
626 eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] |
650 eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] |
627 self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
651 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
628 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
652 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
629 |
653 |
630 def test_unique_contraint(self): |
654 def test_unique_contraint(self): |
631 req = self.request() |
655 req = self.request() |
632 toto = req.create_entity('Personne', nom=u'toto') |
656 toto = req.create_entity('Personne', nom=u'toto') |