222 self.assertTrue(entity.creation_date) |
219 self.assertTrue(entity.creation_date) |
223 self.assertTrue(entity.modification_date) |
220 self.assertTrue(entity.modification_date) |
224 |
221 |
225 def test_authenticate(self): |
222 def test_authenticate(self): |
226 source = self.repo.sources_by_uri['ldap'] |
223 source = self.repo.sources_by_uri['ldap'] |
227 self.session.set_cnxset() |
224 with self.admin_access.repo_cnx() as cnx: |
228 # ensure we won't be logged against |
225 # ensure we won't be logged against |
229 self.assertRaises(AuthenticationError, |
226 self.assertRaises(AuthenticationError, |
230 source.authenticate, self.session, 'toto', 'toto') |
227 source.authenticate, cnx, 'toto', 'toto') |
231 self.assertTrue(source.authenticate(self.session, 'syt', 'syt')) |
228 self.assertTrue(source.authenticate(cnx, 'syt', 'syt')) |
232 self.assertTrue(self.repo.connect('syt', password='syt')) |
229 self.assertTrue(self.repo.connect('syt', password='syt')) |
233 |
230 |
234 def test_base(self): |
231 def test_base(self): |
235 # check a known one |
232 with self.admin_access.repo_cnx() as cnx: |
236 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
233 # check a known one |
237 e = rset.get_entity(0, 0) |
234 rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
238 self.assertEqual(e.login, 'syt') |
235 e = rset.get_entity(0, 0) |
239 e.complete() |
236 self.assertEqual(e.login, 'syt') |
240 self.assertMetadata(e) |
237 e.complete() |
241 self.assertEqual(e.firstname, None) |
238 self.assertMetadata(e) |
242 self.assertEqual(e.surname, None) |
239 self.assertEqual(e.firstname, None) |
243 self.assertIn('users', set(g.name for g in e.in_group)) |
240 self.assertEqual(e.surname, None) |
244 self.assertEqual(e.owned_by[0].login, 'syt') |
241 self.assertIn('users', set(g.name for g in e.in_group)) |
245 self.assertEqual(e.created_by, ()) |
242 self.assertEqual(e.owned_by[0].login, 'syt') |
246 addresses = [pe.address for pe in e.use_email] |
243 self.assertEqual(e.created_by, ()) |
247 addresses.sort() |
244 addresses = [pe.address for pe in e.use_email] |
248 self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'], |
245 addresses.sort() |
249 addresses) |
246 self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'], |
250 self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr', |
247 addresses) |
251 'syt@logilab.fr']) |
248 self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr', |
252 # email content should be indexed on the user |
249 'syt@logilab.fr']) |
253 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
250 # email content should be indexed on the user |
254 self.assertEqual(rset.rows, [[e.eid]]) |
251 rset = cnx.execute('CWUser X WHERE X has_text "thenault"') |
|
252 self.assertEqual(rset.rows, [[e.eid]]) |
255 |
253 |
256 def test_copy_to_system_source(self): |
254 def test_copy_to_system_source(self): |
257 "make sure we can 'convert' an LDAP user into a system one" |
255 "make sure we can 'convert' an LDAP user into a system one" |
258 source = self.repo.sources_by_uri['ldap'] |
256 with self.admin_access.repo_cnx() as cnx: |
259 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
257 source = self.repo.sources_by_uri['ldap'] |
260 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
258 eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
261 self.commit() |
259 cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
262 source.reset_caches() |
260 cnx.commit() |
263 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
261 source.reset_caches() |
264 self.assertEqual(len(rset), 1) |
262 rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
265 e = rset.get_entity(0, 0) |
263 self.assertEqual(len(rset), 1) |
266 self.assertEqual(e.eid, eid) |
264 e = rset.get_entity(0, 0) |
267 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', |
265 self.assertEqual(e.eid, eid) |
268 'uri': u'system', |
266 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', |
269 'use-cwuri-as-url': False}, |
267 'uri': u'system', |
270 'type': 'CWUser', |
268 'use-cwuri-as-url': False}, |
271 'extid': None}) |
269 'type': 'CWUser', |
272 self.assertEqual(e.cw_source[0].name, 'system') |
270 'extid': None}) |
273 self.assertTrue(e.creation_date) |
271 self.assertEqual(e.cw_source[0].name, 'system') |
274 self.assertTrue(e.modification_date) |
272 self.assertTrue(e.creation_date) |
275 source.pull_data(self.session) |
273 self.assertTrue(e.modification_date) |
276 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
274 source.pull_data(cnx) |
277 self.assertEqual(len(rset), 1) |
275 rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
278 self.assertTrue(self.repo.system_source.authenticate( |
276 self.assertEqual(len(rset), 1) |
279 self.session, 'syt', password='syt')) |
277 self.assertTrue(self.repo.system_source.authenticate(cnx, 'syt', password='syt')) |
280 # make sure the pull from ldap have not "reverted" user as a ldap-feed user |
278 # make sure the pull from ldap have not "reverted" user as a ldap-feed user |
281 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', |
279 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', |
282 'uri': u'system', |
280 'uri': u'system', |
283 'use-cwuri-as-url': False}, |
281 'use-cwuri-as-url': False}, |
284 'type': 'CWUser', |
282 'type': 'CWUser', |
285 'extid': None}) |
283 'extid': None}) |
286 # and that the password stored in the system source is not empty or so |
284 # and that the password stored in the system source is not empty or so |
287 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
285 user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
288 user.cw_clear_all_caches() |
286 user.cw_clear_all_caches() |
289 pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0] |
287 pwd = cnx.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0] |
290 self.assertIsNotNone(pwd) |
288 self.assertIsNotNone(pwd) |
291 self.assertTrue(str(pwd)) |
289 self.assertTrue(str(pwd)) |
292 |
290 |
293 |
291 |
294 |
292 |
295 class LDAPFeedUserDeletionTC(LDAPFeedTestBase): |
293 class LDAPFeedUserDeletionTC(LDAPFeedTestBase): |
296 """ |
294 """ |
297 A testcase for situations where users are deleted from or |
295 A testcase for situations where users are deleted from or |
298 unavailabe in the LDAP database. |
296 unavailable in the LDAP database. |
299 """ |
297 """ |
|
298 |
300 def test_a_filter_inactivate(self): |
299 def test_a_filter_inactivate(self): |
301 """ filtered out people should be deactivated, unable to authenticate """ |
300 """ filtered out people should be deactivated, unable to authenticate """ |
302 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
301 with self.admin_access.repo_cnx() as cnx: |
303 config = source.repo_source.check_config(source) |
302 source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
304 # filter with adim's phone number |
303 config = source.repo_source.check_config(source) |
305 config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') |
304 # filter with adim's phone number |
306 source.repo_source.update_config(source, config) |
305 config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109') |
307 self.commit() |
306 source.repo_source.update_config(source, config) |
308 self.pull() |
307 cnx.commit() |
|
308 with self.repo.internal_cnx() as cnx: |
|
309 self.pull(cnx) |
309 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
310 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
310 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
311 with self.admin_access.repo_cnx() as cnx: |
311 'U in_state S, S name N').rows[0][0], |
312 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
312 'deactivated') |
313 'U in_state S, S name N').rows[0][0], |
313 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
314 'deactivated') |
314 'U in_state S, S name N').rows[0][0], |
315 self.assertEqual(cnx.execute('Any N WHERE U login "adim", ' |
315 'activated') |
316 'U in_state S, S name N').rows[0][0], |
316 # unfilter, syt should be activated again |
317 'activated') |
317 config['user-filter'] = u'' |
318 # unfilter, syt should be activated again |
318 source.repo_source.update_config(source, config) |
319 config['user-filter'] = u'' |
319 self.commit() |
320 source.repo_source.update_config(source, config) |
320 self.pull() |
321 cnx.commit() |
321 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
322 with self.repo.internal_cnx() as cnx: |
322 'U in_state S, S name N').rows[0][0], |
323 self.pull(cnx) |
323 'activated') |
324 with self.admin_access.repo_cnx() as cnx: |
324 self.assertEqual(self.execute('Any N WHERE U login "adim", ' |
325 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
325 'U in_state S, S name N').rows[0][0], |
326 'U in_state S, S name N').rows[0][0], |
326 'activated') |
327 'activated') |
|
328 self.assertEqual(cnx.execute('Any N WHERE U login "adim", ' |
|
329 'U in_state S, S name N').rows[0][0], |
|
330 'activated') |
327 |
331 |
328 def test_delete(self): |
332 def test_delete(self): |
329 """ delete syt, pull, check deactivation, repull, |
333 """ delete syt, pull, check deactivation, repull, |
330 read syt, pull, check activation |
334 read syt, pull, check activation |
331 """ |
335 """ |
332 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
336 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
333 self.pull() |
337 with self.repo.internal_cnx() as cnx: |
|
338 self.pull(cnx) |
334 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
339 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
335 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
340 with self.admin_access.repo_cnx() as cnx: |
336 'U in_state S, S name N').rows[0][0], |
341 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
337 'deactivated') |
342 'U in_state S, S name N').rows[0][0], |
338 # check that it doesn't choke |
343 'deactivated') |
339 self.pull() |
344 with self.repo.internal_cnx() as cnx: |
|
345 # check that it doesn't choke |
|
346 self.pull(cnx) |
340 # reinsert syt |
347 # reinsert syt |
341 self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test', |
348 self.add_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test', |
342 { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'], |
349 { 'objectClass': ['OpenLDAPperson','posixAccount','top','shadowAccount'], |
343 'cn': 'Sylvain Thenault', |
350 'cn': 'Sylvain Thenault', |
344 'sn': 'Thenault', |
351 'sn': 'Thenault', |
351 'telephoneNumber': '106', |
358 'telephoneNumber': '106', |
352 'displayName': 'sthenault', |
359 'displayName': 'sthenault', |
353 'gecos': 'Sylvain Thenault', |
360 'gecos': 'Sylvain Thenault', |
354 'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'], |
361 'mail': ['sylvain.thenault@logilab.fr','syt@logilab.fr'], |
355 'userPassword': 'syt', |
362 'userPassword': 'syt', |
356 }) |
363 }) |
357 self.pull() |
364 with self.repo.internal_cnx() as cnx: |
358 self.assertEqual(self.execute('Any N WHERE U login "syt", ' |
365 self.pull(cnx) |
359 'U in_state S, S name N').rows[0][0], |
366 with self.admin_access.repo_cnx() as cnx: |
360 'activated') |
367 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
|
368 'U in_state S, S name N').rows[0][0], |
|
369 'activated') |
361 |
370 |
362 def test_reactivate_deleted(self): |
371 def test_reactivate_deleted(self): |
363 # test reactivating BY HAND the user isn't enough to |
372 # test reactivating BY HAND the user isn't enough to |
364 # authenticate, as the native source refuse to authenticate |
373 # authenticate, as the native source refuse to authenticate |
365 # user from other sources |
374 # user from other sources |
366 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
375 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
367 self.pull() |
376 with self.repo.internal_cnx() as cnx: |
368 # reactivate user (which source is still ldap-feed) |
377 self.pull(cnx) |
369 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
378 with self.admin_access.repo_cnx() as cnx: |
370 user.cw_adapt_to('IWorkflowable').fire_transition('activate') |
379 # reactivate user (which source is still ldap-feed) |
371 self.commit() |
380 user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
372 with self.assertRaises(AuthenticationError): |
381 user.cw_adapt_to('IWorkflowable').fire_transition('activate') |
373 self.repo.connect('syt', password='syt') |
382 cnx.commit() |
374 |
383 with self.assertRaises(AuthenticationError): |
375 # ok now let's try to make it a system user |
384 self.repo.connect('syt', password='syt') |
376 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid}) |
385 |
377 self.commit() |
386 # ok now let's try to make it a system user |
|
387 cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid}) |
|
388 cnx.commit() |
378 # and that we can now authenticate again |
389 # and that we can now authenticate again |
379 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto') |
390 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto') |
380 self.assertTrue(self.repo.connect('syt', password='syt')) |
391 self.assertTrue(self.repo.connect('syt', password='syt')) |
381 |
392 |
|
393 |
382 class LDAPFeedGroupTC(LDAPFeedTestBase): |
394 class LDAPFeedGroupTC(LDAPFeedTestBase): |
383 """ |
395 """ |
384 A testcase for group support in ldapfeed. |
396 A testcase for group support in ldapfeed. |
385 """ |
397 """ |
386 |
398 |
387 def test_groups_exist(self): |
399 def test_groups_exist(self): |
388 rset = self.sexecute('CWGroup X WHERE X name "dir"') |
400 with self.admin_access.repo_cnx() as cnx: |
389 self.assertEqual(len(rset), 1) |
401 rset = cnx.execute('CWGroup X WHERE X name "dir"') |
390 |
402 self.assertEqual(len(rset), 1) |
391 rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"') |
403 |
392 self.assertEqual(len(rset), 2) |
404 rset = cnx.execute('CWGroup X WHERE X cw_source S, S name "ldap"') |
|
405 self.assertEqual(len(rset), 2) |
393 |
406 |
394 def test_group_deleted(self): |
407 def test_group_deleted(self): |
395 rset = self.sexecute('CWGroup X WHERE X name "dir"') |
408 with self.admin_access.repo_cnx() as cnx: |
396 self.assertEqual(len(rset), 1) |
409 rset = cnx.execute('CWGroup X WHERE X name "dir"') |
|
410 self.assertEqual(len(rset), 1) |
397 |
411 |
398 def test_in_group(self): |
412 def test_in_group(self): |
399 rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'}) |
413 with self.admin_access.repo_cnx() as cnx: |
400 dirgroup = rset.get_entity(0, 0) |
414 rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'dir'}) |
401 self.assertEqual(set(['syt', 'adim']), |
415 dirgroup = rset.get_entity(0, 0) |
402 set([u.login for u in dirgroup.reverse_in_group])) |
416 self.assertEqual(set(['syt', 'adim']), |
403 rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'}) |
417 set([u.login for u in dirgroup.reverse_in_group])) |
404 logilabgroup = rset.get_entity(0, 0) |
418 rset = cnx.execute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'}) |
405 self.assertEqual(set(['adim']), |
419 logilabgroup = rset.get_entity(0, 0) |
406 set([u.login for u in logilabgroup.reverse_in_group])) |
420 self.assertEqual(set(['adim']), |
|
421 set([u.login for u in logilabgroup.reverse_in_group])) |
407 |
422 |
408 def test_group_member_added(self): |
423 def test_group_member_added(self): |
409 self.pull() |
424 with self.repo.internal_cnx() as cnx: |
410 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
425 self.pull(cnx) |
411 {'name': 'logilab'}) |
426 with self.admin_access.repo_cnx() as cnx: |
412 self.assertEqual(len(rset), 1) |
427 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', |
413 self.assertEqual(rset[0][0], 'adim') |
428 {'name': 'logilab'}) |
|
429 self.assertEqual(len(rset), 1) |
|
430 self.assertEqual(rset[0][0], 'adim') |
414 |
431 |
415 try: |
432 try: |
416 self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', |
433 self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', |
417 {('add', 'memberUid'): ['syt']}) |
434 {('add', 'memberUid'): ['syt']}) |
418 time.sleep(1.1) # timestamps precision is 1s |
435 time.sleep(1.1) # timestamps precision is 1s |
419 self.pull() |
436 with self.repo.internal_cnx() as cnx: |
420 |
437 self.pull(cnx) |
421 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
438 |
422 {'name': 'logilab'}) |
439 with self.admin_access.repo_cnx() as cnx: |
423 self.assertEqual(len(rset), 2) |
440 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', |
424 members = set([u[0] for u in rset]) |
441 {'name': 'logilab'}) |
425 self.assertEqual(set(['adim', 'syt']), members) |
442 self.assertEqual(len(rset), 2) |
|
443 members = set([u[0] for u in rset]) |
|
444 self.assertEqual(set(['adim', 'syt']), members) |
426 |
445 |
427 finally: |
446 finally: |
428 # back to normal ldap setup |
447 # back to normal ldap setup |
429 self.tearDownClass() |
448 self.tearDownClass() |
430 self.setUpClass() |
449 self.setUpClass() |
431 |
450 |
432 def test_group_member_deleted(self): |
451 def test_group_member_deleted(self): |
433 self.pull() # ensure we are sync'ed |
452 with self.repo.internal_cnx() as cnx: |
434 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
453 self.pull(cnx) # ensure we are sync'ed |
435 {'name': 'logilab'}) |
454 with self.admin_access.repo_cnx() as cnx: |
436 self.assertEqual(len(rset), 1) |
455 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', |
437 self.assertEqual(rset[0][0], 'adim') |
456 {'name': 'logilab'}) |
|
457 self.assertEqual(len(rset), 1) |
|
458 self.assertEqual(rset[0][0], 'adim') |
438 |
459 |
439 try: |
460 try: |
440 self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', |
461 self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', |
441 {('delete', 'memberUid'): ['adim']}) |
462 {('delete', 'memberUid'): ['adim']}) |
442 time.sleep(1.1) # timestamps precision is 1s |
463 time.sleep(1.1) # timestamps precision is 1s |
443 self.pull() |
464 with self.repo.internal_cnx() as cnx: |
444 |
465 self.pull(cnx) |
445 rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L', |
466 |
446 {'name': 'logilab'}) |
467 with self.admin_access.repo_cnx() as cnx: |
447 self.assertEqual(len(rset), 0) |
468 rset = cnx.execute('Any L WHERE U in_group G, G name %(name)s, U login L', |
|
469 {'name': 'logilab'}) |
|
470 self.assertEqual(len(rset), 0) |
448 finally: |
471 finally: |
449 # back to normal ldap setup |
472 # back to normal ldap setup |
450 self.tearDownClass() |
473 self.tearDownClass() |
451 self.setUpClass() |
474 self.setUpClass() |
452 |
475 |