63 namecol = SQL_PREFIX + 'name' |
63 namecol = SQL_PREFIX + 'name' |
64 finalcol = SQL_PREFIX + 'final' |
64 finalcol = SQL_PREFIX + 'final' |
65 self.session.set_pool() |
65 self.session.set_pool() |
66 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( |
66 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s is NULL' % ( |
67 namecol, table, finalcol)) |
67 namecol, table, finalcol)) |
68 self.assertEquals(cu.fetchall(), []) |
68 self.assertEqual(cu.fetchall(), []) |
69 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' |
69 cu = self.session.system_sql('SELECT %s FROM %s WHERE %s=%%(final)s ORDER BY %s' |
70 % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) |
70 % (namecol, table, finalcol, namecol), {'final': 'TRUE'}) |
71 self.assertEquals(cu.fetchall(), [(u'Boolean',), (u'Bytes',), |
71 self.assertEqual(cu.fetchall(), [(u'Boolean',), (u'Bytes',), |
72 (u'Date',), (u'Datetime',), |
72 (u'Date',), (u'Datetime',), |
73 (u'Decimal',),(u'Float',), |
73 (u'Decimal',),(u'Float',), |
74 (u'Int',), |
74 (u'Int',), |
75 (u'Interval',), (u'Password',), |
75 (u'Interval',), (u'Password',), |
76 (u'String',), (u'Time',)]) |
76 (u'String',), (u'Time',)]) |
159 def test_shared_data(self): |
159 def test_shared_data(self): |
160 repo = self.repo |
160 repo = self.repo |
161 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
161 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
162 repo.set_shared_data(cnxid, 'data', 4) |
162 repo.set_shared_data(cnxid, 'data', 4) |
163 cnxid2 = repo.connect(self.admlogin, password=self.admpassword) |
163 cnxid2 = repo.connect(self.admlogin, password=self.admpassword) |
164 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
164 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
165 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
165 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) |
166 repo.set_shared_data(cnxid2, 'data', 5) |
166 repo.set_shared_data(cnxid2, 'data', 5) |
167 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
167 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
168 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), 5) |
168 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), 5) |
169 repo.get_shared_data(cnxid2, 'data', pop=True) |
169 repo.get_shared_data(cnxid2, 'data', pop=True) |
170 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
170 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
171 self.assertEquals(repo.get_shared_data(cnxid2, 'data'), None) |
171 self.assertEqual(repo.get_shared_data(cnxid2, 'data'), None) |
172 repo.close(cnxid) |
172 repo.close(cnxid) |
173 repo.close(cnxid2) |
173 repo.close(cnxid2) |
174 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
174 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
175 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') |
175 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid2, 'data') |
176 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
176 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 1) |
177 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) |
177 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid2, 'data', 1) |
178 |
178 |
179 def test_check_session(self): |
179 def test_check_session(self): |
180 repo = self.repo |
180 repo = self.repo |
181 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
181 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
182 self.assertEquals(repo.check_session(cnxid), None) |
182 self.assertEqual(repo.check_session(cnxid), None) |
183 repo.close(cnxid) |
183 repo.close(cnxid) |
184 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
184 self.assertRaises(BadConnectionId, repo.check_session, cnxid) |
185 |
185 |
186 def test_transaction_base(self): |
186 def test_transaction_base(self): |
187 repo = self.repo |
187 repo = self.repo |
188 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
188 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
189 # check db state |
189 # check db state |
190 result = repo.execute(cnxid, 'Personne X') |
190 result = repo.execute(cnxid, 'Personne X') |
191 self.assertEquals(result.rowcount, 0) |
191 self.assertEqual(result.rowcount, 0) |
192 # rollback entity insertion |
192 # rollback entity insertion |
193 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
193 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
194 result = repo.execute(cnxid, 'Personne X') |
194 result = repo.execute(cnxid, 'Personne X') |
195 self.assertEquals(result.rowcount, 1) |
195 self.assertEqual(result.rowcount, 1) |
196 repo.rollback(cnxid) |
196 repo.rollback(cnxid) |
197 result = repo.execute(cnxid, 'Personne X') |
197 result = repo.execute(cnxid, 'Personne X') |
198 self.assertEquals(result.rowcount, 0, result.rows) |
198 self.assertEqual(result.rowcount, 0, result.rows) |
199 # commit |
199 # commit |
200 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
200 repo.execute(cnxid, "INSERT Personne X: X nom 'bidule'") |
201 repo.commit(cnxid) |
201 repo.commit(cnxid) |
202 result = repo.execute(cnxid, 'Personne X') |
202 result = repo.execute(cnxid, 'Personne X') |
203 self.assertEquals(result.rowcount, 1) |
203 self.assertEqual(result.rowcount, 1) |
204 |
204 |
205 def test_transaction_base2(self): |
205 def test_transaction_base2(self): |
206 repo = self.repo |
206 repo = self.repo |
207 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
207 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
208 # rollback relation insertion |
208 # rollback relation insertion |
209 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
209 repo.execute(cnxid, "SET U in_group G WHERE U login 'admin', G name 'guests'") |
210 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
210 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
211 self.assertEquals(result.rowcount, 1) |
211 self.assertEqual(result.rowcount, 1) |
212 repo.rollback(cnxid) |
212 repo.rollback(cnxid) |
213 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
213 result = repo.execute(cnxid, "Any U WHERE U in_group G, U login 'admin', G name 'guests'") |
214 self.assertEquals(result.rowcount, 0, result.rows) |
214 self.assertEqual(result.rowcount, 0, result.rows) |
215 |
215 |
216 def test_transaction_base3(self): |
216 def test_transaction_base3(self): |
217 repo = self.repo |
217 repo = self.repo |
218 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
218 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
219 # rollback state change which trigger TrInfo insertion |
219 # rollback state change which trigger TrInfo insertion |
220 session = repo._get_session(cnxid) |
220 session = repo._get_session(cnxid) |
221 session.set_pool() |
221 session.set_pool() |
222 user = session.user |
222 user = session.user |
223 user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') |
223 user.cw_adapt_to('IWorkflowable').fire_transition('deactivate') |
224 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
224 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
225 self.assertEquals(len(rset), 1) |
225 self.assertEqual(len(rset), 1) |
226 repo.rollback(cnxid) |
226 repo.rollback(cnxid) |
227 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
227 rset = repo.execute(cnxid, 'TrInfo T WHERE T wf_info_for X, X eid %(x)s', {'x': user.eid}) |
228 self.assertEquals(len(rset), 0) |
228 self.assertEqual(len(rset), 0) |
229 |
229 |
230 def test_transaction_interleaved(self): |
230 def test_transaction_interleaved(self): |
231 self.skip('implement me') |
231 self.skipTest('implement me') |
232 |
232 |
233 def test_close_kill_processing_request(self): |
233 def test_close_kill_processing_request(self): |
234 repo = self.repo |
234 repo = self.repo |
235 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
235 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
236 repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') |
236 repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') |
244 def run_transaction(): |
244 def run_transaction(): |
245 repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') |
245 repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') |
246 repo.commit(cnxid) |
246 repo.commit(cnxid) |
247 try: |
247 try: |
248 ex = self.assertRaises(Exception, run_transaction) |
248 ex = self.assertRaises(Exception, run_transaction) |
249 self.assertEquals(str(ex), 'try to access pool on a closed session') |
249 self.assertEqual(str(ex), 'try to access pool on a closed session') |
250 finally: |
250 finally: |
251 t.join() |
251 t.join() |
252 |
252 |
253 def test_initial_schema(self): |
253 def test_initial_schema(self): |
254 schema = self.repo.schema |
254 schema = self.repo.schema |
255 # check order of attributes is respected |
255 # check order of attributes is respected |
256 self.assertListEquals([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
256 self.assertListEqual([r.type for r in schema.eschema('CWAttribute').ordered_relations() |
257 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
257 if not r.type in ('eid', 'is', 'is_instance_of', 'identity', |
258 'creation_date', 'modification_date', 'cwuri', |
258 'creation_date', 'modification_date', 'cwuri', |
259 'owned_by', 'created_by', |
259 'owned_by', 'created_by', |
260 'update_permission', 'read_permission', |
260 'update_permission', 'read_permission', |
261 'in_basket')], |
261 'in_basket')], |
264 'constrained_by', |
264 'constrained_by', |
265 'cardinality', 'ordernum', |
265 'cardinality', 'ordernum', |
266 'indexed', 'fulltextindexed', 'internationalizable', |
266 'indexed', 'fulltextindexed', 'internationalizable', |
267 'defaultval', 'description', 'description_format']) |
267 'defaultval', 'description', 'description_format']) |
268 |
268 |
269 self.assertEquals(schema.eschema('CWEType').main_attribute(), 'name') |
269 self.assertEqual(schema.eschema('CWEType').main_attribute(), 'name') |
270 self.assertEquals(schema.eschema('State').main_attribute(), 'name') |
270 self.assertEqual(schema.eschema('State').main_attribute(), 'name') |
271 |
271 |
272 constraints = schema.rschema('name').rdef('CWEType', 'String').constraints |
272 constraints = schema.rschema('name').rdef('CWEType', 'String').constraints |
273 self.assertEquals(len(constraints), 2) |
273 self.assertEqual(len(constraints), 2) |
274 for cstr in constraints[:]: |
274 for cstr in constraints[:]: |
275 if isinstance(cstr, UniqueConstraint): |
275 if isinstance(cstr, UniqueConstraint): |
276 constraints.remove(cstr) |
276 constraints.remove(cstr) |
277 break |
277 break |
278 else: |
278 else: |
279 self.fail('unique constraint not found') |
279 self.fail('unique constraint not found') |
280 sizeconstraint = constraints[0] |
280 sizeconstraint = constraints[0] |
281 self.assertEquals(sizeconstraint.min, None) |
281 self.assertEqual(sizeconstraint.min, None) |
282 self.assertEquals(sizeconstraint.max, 64) |
282 self.assertEqual(sizeconstraint.max, 64) |
283 |
283 |
284 constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints |
284 constraints = schema.rschema('relation_type').rdef('CWAttribute', 'CWRType').constraints |
285 self.assertEquals(len(constraints), 1) |
285 self.assertEqual(len(constraints), 1) |
286 cstr = constraints[0] |
286 cstr = constraints[0] |
287 self.assert_(isinstance(cstr, RQLConstraint)) |
287 self.assert_(isinstance(cstr, RQLConstraint)) |
288 self.assertEquals(cstr.restriction, 'O final TRUE') |
288 self.assertEqual(cstr.restriction, 'O final TRUE') |
289 |
289 |
290 ownedby = schema.rschema('owned_by') |
290 ownedby = schema.rschema('owned_by') |
291 self.assertEquals(ownedby.objects('CWEType'), ('CWUser',)) |
291 self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) |
292 |
292 |
293 def test_pyro(self): |
293 def test_pyro(self): |
294 import Pyro |
294 import Pyro |
295 Pyro.config.PYRO_MULTITHREADED = 0 |
295 Pyro.config.PYRO_MULTITHREADED = 0 |
296 done = [] |
296 done = [] |
335 |
335 |
336 def test_internal_api(self): |
336 def test_internal_api(self): |
337 repo = self.repo |
337 repo = self.repo |
338 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
338 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
339 session = repo._get_session(cnxid, setpool=True) |
339 session = repo._get_session(cnxid, setpool=True) |
340 self.assertEquals(repo.type_and_source_from_eid(1, session), |
340 self.assertEqual(repo.type_and_source_from_eid(1, session), |
341 ('CWGroup', 'system', None)) |
341 ('CWGroup', 'system', None)) |
342 self.assertEquals(repo.type_from_eid(1, session), 'CWGroup') |
342 self.assertEqual(repo.type_from_eid(1, session), 'CWGroup') |
343 self.assertEquals(repo.source_from_eid(1, session).uri, 'system') |
343 self.assertEqual(repo.source_from_eid(1, session).uri, 'system') |
344 self.assertEquals(repo.eid2extid(repo.system_source, 1, session), None) |
344 self.assertEqual(repo.eid2extid(repo.system_source, 1, session), None) |
345 class dummysource: uri = 'toto' |
345 class dummysource: uri = 'toto' |
346 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
346 self.assertRaises(UnknownEid, repo.eid2extid, dummysource, 1, session) |
347 |
347 |
348 def test_public_api(self): |
348 def test_public_api(self): |
349 self.assertEquals(self.repo.get_schema(), self.repo.schema) |
349 self.assertEqual(self.repo.get_schema(), self.repo.schema) |
350 self.assertEquals(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
350 self.assertEqual(self.repo.source_defs(), {'system': {'adapter': 'native', 'uri': 'system'}}) |
351 # .properties() return a result set |
351 # .properties() return a result set |
352 self.assertEquals(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
352 self.assertEqual(self.repo.properties().rql, 'Any K,V WHERE P is CWProperty,P pkey K, P value V, NOT P for_user U') |
353 |
353 |
354 def test_session_api(self): |
354 def test_session_api(self): |
355 repo = self.repo |
355 repo = self.repo |
356 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
356 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
357 self.assertEquals(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
357 self.assertEqual(repo.user_info(cnxid), (5, 'admin', set([u'managers']), {})) |
358 self.assertEquals(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
358 self.assertEqual(repo.describe(cnxid, 1), (u'CWGroup', u'system', None)) |
359 repo.close(cnxid) |
359 repo.close(cnxid) |
360 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
360 self.assertRaises(BadConnectionId, repo.user_info, cnxid) |
361 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
361 self.assertRaises(BadConnectionId, repo.describe, cnxid, 1) |
362 |
362 |
363 def test_shared_data_api(self): |
363 def test_shared_data_api(self): |
364 repo = self.repo |
364 repo = self.repo |
365 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
365 cnxid = repo.connect(self.admlogin, password=self.admpassword) |
366 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
366 self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) |
367 repo.set_shared_data(cnxid, 'data', 4) |
367 repo.set_shared_data(cnxid, 'data', 4) |
368 self.assertEquals(repo.get_shared_data(cnxid, 'data'), 4) |
368 self.assertEqual(repo.get_shared_data(cnxid, 'data'), 4) |
369 repo.get_shared_data(cnxid, 'data', pop=True) |
369 repo.get_shared_data(cnxid, 'data', pop=True) |
370 repo.get_shared_data(cnxid, 'whatever', pop=True) |
370 repo.get_shared_data(cnxid, 'whatever', pop=True) |
371 self.assertEquals(repo.get_shared_data(cnxid, 'data'), None) |
371 self.assertEqual(repo.get_shared_data(cnxid, 'data'), None) |
372 repo.close(cnxid) |
372 repo.close(cnxid) |
373 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
373 self.assertRaises(BadConnectionId, repo.set_shared_data, cnxid, 'data', 0) |
374 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
374 self.assertRaises(BadConnectionId, repo.get_shared_data, cnxid, 'data') |
375 |
375 |
376 def test_schema_is_relation(self): |
376 def test_schema_is_relation(self): |
392 p1 = self.request().create_entity('Personne', nom=u'toto') |
392 p1 = self.request().create_entity('Personne', nom=u'toto') |
393 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
393 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
394 {'x': note.eid, 'p': p1.eid}) |
394 {'x': note.eid, 'p': p1.eid}) |
395 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
395 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
396 {'x': note.eid}) |
396 {'x': note.eid}) |
397 self.assertEquals(len(rset), 1) |
397 self.assertEqual(len(rset), 1) |
398 p2 = self.request().create_entity('Personne', nom=u'tutu') |
398 p2 = self.request().create_entity('Personne', nom=u'tutu') |
399 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
399 self.execute('SET A todo_by P WHERE A eid %(x)s, P eid %(p)s', |
400 {'x': note.eid, 'p': p2.eid}) |
400 {'x': note.eid, 'p': p2.eid}) |
401 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
401 rset = self.execute('Any P WHERE A todo_by P, A eid %(x)s', |
402 {'x': note.eid}) |
402 {'x': note.eid}) |
403 self.assertEquals(len(rset), 1) |
403 self.assertEqual(len(rset), 1) |
404 self.assertEquals(rset.rows[0][0], p2.eid) |
404 self.assertEqual(rset.rows[0][0], p2.eid) |
405 |
405 |
406 def test_delete_if_object_inlined_singlecard(self): |
406 def test_delete_if_object_inlined_singlecard(self): |
407 req = self.request() |
407 req = self.request() |
408 c = req.create_entity('Card', title=u'Carte') |
408 c = req.create_entity('Card', title=u'Carte') |
409 req.create_entity('Personne', nom=u'Vincent', fiche=c) |
409 req.create_entity('Personne', nom=u'Vincent', fiche=c) |
410 req.create_entity('Personne', nom=u'Florent', fiche=c) |
410 req.create_entity('Personne', nom=u'Florent', fiche=c) |
411 self.commit() |
411 self.commit() |
412 self.assertEquals(len(c.reverse_fiche), 1) |
412 self.assertEqual(len(c.reverse_fiche), 1) |
413 |
413 |
414 def test_set_attributes_in_before_update(self): |
414 def test_set_attributes_in_before_update(self): |
415 # local hook |
415 # local hook |
416 class DummyBeforeHook(Hook): |
416 class DummyBeforeHook(Hook): |
417 __regid__ = 'dummy-before-hook' |
417 __regid__ = 'dummy-before-hook' |
475 self.session.set_pool() |
475 self.session.set_pool() |
476 self.assert_(self.repo.system_source.create_eid(self.session)) |
476 self.assert_(self.repo.system_source.create_eid(self.session)) |
477 |
477 |
478 def test_source_from_eid(self): |
478 def test_source_from_eid(self): |
479 self.session.set_pool() |
479 self.session.set_pool() |
480 self.assertEquals(self.repo.source_from_eid(1, self.session), |
480 self.assertEqual(self.repo.source_from_eid(1, self.session), |
481 self.repo.sources_by_uri['system']) |
481 self.repo.sources_by_uri['system']) |
482 |
482 |
483 def test_source_from_eid_raise(self): |
483 def test_source_from_eid_raise(self): |
484 self.session.set_pool() |
484 self.session.set_pool() |
485 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
485 self.assertRaises(UnknownEid, self.repo.source_from_eid, -2, self.session) |
486 |
486 |
487 def test_type_from_eid(self): |
487 def test_type_from_eid(self): |
488 self.session.set_pool() |
488 self.session.set_pool() |
489 self.assertEquals(self.repo.type_from_eid(1, self.session), 'CWGroup') |
489 self.assertEqual(self.repo.type_from_eid(1, self.session), 'CWGroup') |
490 |
490 |
491 def test_type_from_eid_raise(self): |
491 def test_type_from_eid_raise(self): |
492 self.session.set_pool() |
492 self.session.set_pool() |
493 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
493 self.assertRaises(UnknownEid, self.repo.type_from_eid, -2, self.session) |
494 |
494 |
501 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
501 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
502 data = cu.fetchall() |
502 data = cu.fetchall() |
503 self.assertIsInstance(data[0][3], datetime) |
503 self.assertIsInstance(data[0][3], datetime) |
504 data[0] = list(data[0]) |
504 data[0] = list(data[0]) |
505 data[0][3] = None |
505 data[0][3] = None |
506 self.assertEquals(tuplify(data), [(-1, 'Personne', 'system', None, None)]) |
506 self.assertEqual(tuplify(data), [(-1, 'Personne', 'system', None, None)]) |
507 self.repo.delete_info(self.session, entity, 'system', None) |
507 self.repo.delete_info(self.session, entity, 'system', None) |
508 #self.repo.commit() |
508 #self.repo.commit() |
509 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
509 cu = self.session.system_sql('SELECT * FROM entities WHERE eid = -1') |
510 data = cu.fetchall() |
510 data = cu.fetchall() |
511 self.assertEquals(data, []) |
511 self.assertEqual(data, []) |
512 |
512 |
513 |
513 |
514 class FTITC(CubicWebTC): |
514 class FTITC(CubicWebTC): |
515 |
515 |
516 def test_reindex_and_modified_since(self): |
516 def test_reindex_and_modified_since(self): |
517 self.repo.system_source.multisources_etypes.add('Personne') |
517 self.repo.system_source.multisources_etypes.add('Personne') |
518 eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] |
518 eidp = self.execute('INSERT Personne X: X nom "toto", X prenom "tutu"')[0][0] |
519 self.commit() |
519 self.commit() |
520 ts = datetime.now() |
520 ts = datetime.now() |
521 self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
521 self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
522 self.session.set_pool() |
522 self.session.set_pool() |
523 cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) |
523 cu = self.session.system_sql('SELECT mtime, eid FROM entities WHERE eid = %s' % eidp) |
524 omtime = cu.fetchone()[0] |
524 omtime = cu.fetchone()[0] |
525 # our sqlite datetime adapter is ignore seconds fraction, so we have to |
525 # our sqlite datetime adapter is ignore seconds fraction, so we have to |
526 # ensure update is done the next seconds |
526 # ensure update is done the next seconds |
527 time.sleep(1 - (ts.second - int(ts.second))) |
527 time.sleep(1 - (ts.second - int(ts.second))) |
528 self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}) |
528 self.execute('SET X nom "tata" WHERE X eid %(x)s', {'x': eidp}) |
529 self.commit() |
529 self.commit() |
530 self.assertEquals(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
530 self.assertEqual(len(self.execute('Personne X WHERE X has_text "tutu"')), 1) |
531 self.session.set_pool() |
531 self.session.set_pool() |
532 cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) |
532 cu = self.session.system_sql('SELECT mtime FROM entities WHERE eid = %s' % eidp) |
533 mtime = cu.fetchone()[0] |
533 mtime = cu.fetchone()[0] |
534 self.failUnless(omtime < mtime) |
534 self.failUnless(omtime < mtime) |
535 self.commit() |
535 self.commit() |
536 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
536 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
537 self.assertEquals(modified, [('Personne', eidp)]) |
537 self.assertEqual(modified, [('Personne', eidp)]) |
538 self.assertEquals(deleted, []) |
538 self.assertEqual(deleted, []) |
539 date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime) |
539 date, modified, deleted = self.repo.entities_modified_since(('Personne',), mtime) |
540 self.assertEquals(modified, []) |
540 self.assertEqual(modified, []) |
541 self.assertEquals(deleted, []) |
541 self.assertEqual(deleted, []) |
542 self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp}) |
542 self.execute('DELETE Personne X WHERE X eid %(x)s', {'x': eidp}) |
543 self.commit() |
543 self.commit() |
544 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
544 date, modified, deleted = self.repo.entities_modified_since(('Personne',), omtime) |
545 self.assertEquals(modified, []) |
545 self.assertEqual(modified, []) |
546 self.assertEquals(deleted, [('Personne', eidp)]) |
546 self.assertEqual(deleted, [('Personne', eidp)]) |
547 |
547 |
548 def test_fulltext_container_entity(self): |
548 def test_fulltext_container_entity(self): |
549 assert self.schema.rschema('use_email').fulltext_container == 'subject' |
549 assert self.schema.rschema('use_email').fulltext_container == 'subject' |
550 req = self.request() |
550 req = self.request() |
551 toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr') |
551 toto = req.create_entity('EmailAddress', address=u'toto@logilab.fr') |
552 self.commit() |
552 self.commit() |
553 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
553 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
554 self.assertEquals(rset.rows, []) |
554 self.assertEqual(rset.rows, []) |
555 req.user.set_relations(use_email=toto) |
555 req.user.set_relations(use_email=toto) |
556 self.commit() |
556 self.commit() |
557 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
557 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
558 self.assertEquals(rset.rows, [[req.user.eid]]) |
558 self.assertEqual(rset.rows, [[req.user.eid]]) |
559 req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', |
559 req.execute('DELETE X use_email Y WHERE X login "admin", Y eid %(y)s', |
560 {'y': toto.eid}) |
560 {'y': toto.eid}) |
561 self.commit() |
561 self.commit() |
562 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
562 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'toto'}) |
563 self.assertEquals(rset.rows, []) |
563 self.assertEqual(rset.rows, []) |
564 tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr') |
564 tutu = req.create_entity('EmailAddress', address=u'tutu@logilab.fr') |
565 req.user.set_relations(use_email=tutu) |
565 req.user.set_relations(use_email=tutu) |
566 self.commit() |
566 self.commit() |
567 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
567 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
568 self.assertEquals(rset.rows, [[req.user.eid]]) |
568 self.assertEqual(rset.rows, [[req.user.eid]]) |
569 tutu.set_attributes(address=u'hip@logilab.fr') |
569 tutu.set_attributes(address=u'hip@logilab.fr') |
570 self.commit() |
570 self.commit() |
571 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
571 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'tutu'}) |
572 self.assertEquals(rset.rows, []) |
572 self.assertEqual(rset.rows, []) |
573 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) |
573 rset = req.execute('Any X WHERE X has_text %(t)s', {'t': 'hip'}) |
574 self.assertEquals(rset.rows, [[req.user.eid]]) |
574 self.assertEqual(rset.rows, [[req.user.eid]]) |
575 |
575 |
576 def test_no_uncessary_ftiindex_op(self): |
576 def test_no_uncessary_ftiindex_op(self): |
577 req = self.request() |
577 req = self.request() |
578 req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu') |
578 req.create_entity('Workflow', name=u'dummy workflow', description=u'huuuuu') |
579 self.failIf(any(x for x in self.session.pending_operations |
579 self.failIf(any(x for x in self.session.pending_operations |
614 |
614 |
615 with self.temporary_appobjects(EcritParHook): |
615 with self.temporary_appobjects(EcritParHook): |
616 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
616 eidp = self.execute('INSERT Personne X: X nom "toto"')[0][0] |
617 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
617 eidn = self.execute('INSERT Note X: X type "T"')[0][0] |
618 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
618 self.execute('SET N ecrit_par Y WHERE N type "T", Y nom "toto"') |
619 self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
619 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
620 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
620 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
621 CALLED[:] = () |
621 CALLED[:] = () |
622 self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
622 self.execute('DELETE N ecrit_par Y WHERE N type "T", Y nom "toto"') |
623 self.assertEquals(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), |
623 self.assertEqual(CALLED, [('before_delete_relation', eidn, 'ecrit_par', eidp), |
624 ('after_delete_relation', eidn, 'ecrit_par', eidp)]) |
624 ('after_delete_relation', eidn, 'ecrit_par', eidp)]) |
625 CALLED[:] = () |
625 CALLED[:] = () |
626 eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] |
626 eidn = self.execute('INSERT Note N: N ecrit_par P WHERE P nom "toto"')[0][0] |
627 self.assertEquals(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
627 self.assertEqual(CALLED, [('before_add_relation', eidn, 'ecrit_par', eidp), |
628 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
628 ('after_add_relation', eidn, 'ecrit_par', eidp)]) |
629 |
629 |
630 def test_unique_contraint(self): |
630 def test_unique_contraint(self): |
631 req = self.request() |
631 req = self.request() |
632 toto = req.create_entity('Personne', nom=u'toto') |
632 toto = req.create_entity('Personne', nom=u'toto') |