206 modcmd.append('%s: %s' % (key, value)) |
206 modcmd.append('%s: %s' % (key, value)) |
207 modcmd.append('-') |
207 modcmd.append('-') |
208 self._ldapmodify(modcmd) |
208 self._ldapmodify(modcmd) |
209 |
209 |
210 def _ldapmodify(self, modcmd): |
210 def _ldapmodify(self, modcmd): |
211 uri = self.repo.sources_by_uri['ldap'].urls[0] |
211 uri = self.repo.source_by_uri('ldap').urls[0] |
212 updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D', |
212 updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D', |
213 'cn=admin,dc=cubicweb,dc=test', '-w', 'cw'] |
213 'cn=admin,dc=cubicweb,dc=test', '-w', 'cw'] |
214 PIPE = subprocess.PIPE |
214 PIPE = subprocess.PIPE |
215 p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) |
215 p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) |
216 p.stdin.write('\n'.join(modcmd).encode('ascii')) |
216 p.stdin.write('\n'.join(modcmd).encode('ascii')) |
245 def assertMetadata(self, entity): |
245 def assertMetadata(self, entity): |
246 self.assertTrue(entity.creation_date) |
246 self.assertTrue(entity.creation_date) |
247 self.assertTrue(entity.modification_date) |
247 self.assertTrue(entity.modification_date) |
248 |
248 |
249 def test_authenticate(self): |
249 def test_authenticate(self): |
250 source = self.repo.sources_by_uri['ldap'] |
250 source = self.repo.source_by_uri('ldap') |
251 with self.admin_access.repo_cnx() as cnx: |
251 with self.admin_access.repo_cnx() as cnx: |
252 # ensure we won't be logged against |
252 # ensure we won't be logged against |
253 self.assertRaises(AuthenticationError, |
253 self.assertRaises(AuthenticationError, |
254 source.authenticate, cnx, 'toto', 'toto') |
254 source.authenticate, cnx, 'toto', 'toto') |
255 self.assertRaises(AuthenticationError, |
255 self.assertRaises(AuthenticationError, |
280 self.assertEqual(rset.rows, [[e.eid]]) |
280 self.assertEqual(rset.rows, [[e.eid]]) |
281 |
281 |
282 def test_copy_to_system_source(self): |
282 def test_copy_to_system_source(self): |
283 "make sure we can 'convert' an LDAP user into a system one" |
283 "make sure we can 'convert' an LDAP user into a system one" |
284 with self.admin_access.repo_cnx() as cnx: |
284 with self.admin_access.repo_cnx() as cnx: |
285 source = self.repo.sources_by_uri['ldap'] |
285 source = self.repo.source_by_uri('ldap') |
286 eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
286 eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
287 cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
287 cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
288 cnx.commit() |
288 cnx.commit() |
289 rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
289 rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
290 self.assertEqual(len(rset), 1) |
290 self.assertEqual(len(rset), 1) |
313 A testcase for password generation on CWUser when none is imported |
313 A testcase for password generation on CWUser when none is imported |
314 """ |
314 """ |
315 |
315 |
316 def setup_database(self): |
316 def setup_database(self): |
317 with self.admin_access.repo_cnx() as cnx: |
317 with self.admin_access.repo_cnx() as cnx: |
318 lfsource = cnx.repo.sources_by_uri['ldap'] |
318 lfsource = cnx.repo.source_by_uri('ldap') |
319 del lfsource.user_attrs['userPassword'] |
319 del lfsource.user_attrs['userPassword'] |
320 super(LDAPGeneratePwdTC, self).setup_database() |
320 super(LDAPGeneratePwdTC, self).setup_database() |
321 |
321 |
322 def test_no_password(self): |
322 def test_no_password(self): |
323 with self.admin_access.repo_cnx() as cnx: |
323 with self.admin_access.repo_cnx() as cnx: |
340 options = {'user-filter': '(%s=%s)' % ('telephonenumber', '109')} |
340 options = {'user-filter': '(%s=%s)' % ('telephonenumber', '109')} |
341 update_source_config(source, options) |
341 update_source_config(source, options) |
342 cnx.commit() |
342 cnx.commit() |
343 with self.repo.internal_cnx() as cnx: |
343 with self.repo.internal_cnx() as cnx: |
344 self.pull(cnx) |
344 self.pull(cnx) |
345 repo_source = self.repo.sources_by_uri['ldap'] |
345 repo_source = self.repo.source_by_uri('ldap') |
346 self.assertRaises(AuthenticationError, |
346 self.assertRaises(AuthenticationError, |
347 repo_source.authenticate, cnx, 'syt', 'syt') |
347 repo_source.authenticate, cnx, 'syt', 'syt') |
348 with self.admin_access.repo_cnx() as cnx: |
348 with self.admin_access.repo_cnx() as cnx: |
349 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
349 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
350 'U in_state S, S name N').rows[0][0], |
350 'U in_state S, S name N').rows[0][0], |
372 read syt, pull, check activation |
372 read syt, pull, check activation |
373 """ |
373 """ |
374 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
374 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
375 with self.repo.internal_cnx() as cnx: |
375 with self.repo.internal_cnx() as cnx: |
376 self.pull(cnx) |
376 self.pull(cnx) |
377 source = self.repo.sources_by_uri['ldap'] |
377 source = self.repo.source_by_uri('ldap') |
378 self.assertRaises(AuthenticationError, |
378 self.assertRaises(AuthenticationError, |
379 source.authenticate, cnx, 'syt', 'syt') |
379 source.authenticate, cnx, 'syt', 'syt') |
380 with self.admin_access.repo_cnx() as cnx: |
380 with self.admin_access.repo_cnx() as cnx: |
381 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
381 self.assertEqual(cnx.execute('Any N WHERE U login "syt", ' |
382 'U in_state S, S name N').rows[0][0], |
382 'U in_state S, S name N').rows[0][0], |
411 |
411 |
412 def test_reactivate_deleted(self): |
412 def test_reactivate_deleted(self): |
413 # test reactivating BY HAND the user isn't enough to |
413 # test reactivating BY HAND the user isn't enough to |
414 # authenticate, as the native source refuse to authenticate |
414 # authenticate, as the native source refuse to authenticate |
415 # user from other sources |
415 # user from other sources |
416 repo_source = self.repo.sources_by_uri['ldap'] |
416 repo_source = self.repo.source_by_uri('ldap') |
417 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
417 self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test') |
418 with self.repo.internal_cnx() as cnx: |
418 with self.repo.internal_cnx() as cnx: |
419 self.pull(cnx) |
419 self.pull(cnx) |
420 with self.admin_access.repo_cnx() as cnx: |
420 with self.admin_access.repo_cnx() as cnx: |
421 # reactivate user (which source is still ldap-feed) |
421 # reactivate user (which source is still ldap-feed) |