cubicweb/server/test/unittest_ldapsource.py
branch3.25
changeset 12146 d540defa0591
parent 12143 a446124bcf3c
child 12151 569dce882f60
equal deleted inserted replaced
12145:752b94ed9748 12146:d540defa0591
   157         cnx.commit()
   157         cnx.commit()
   158         return cls.pull(cnx)
   158         return cls.pull(cnx)
   159 
   159 
   160     @staticmethod
   160     @staticmethod
   161     def pull(cnx):
   161     def pull(cnx):
   162         lfsource = cnx.repo.sources_by_uri['ldap']
   162         lfsource = cnx.repo.source_by_uri('ldap')
   163         stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
   163         stats = lfsource.pull_data(cnx, force=True, raise_on_error=True)
   164         cnx.commit()
   164         cnx.commit()
   165         return stats
   165         return stats
   166 
   166 
   167     def setup_database(self):
   167     def setup_database(self):
   206                 modcmd.append('%s: %s' % (key, value))
   206                 modcmd.append('%s: %s' % (key, value))
   207             modcmd.append('-')
   207             modcmd.append('-')
   208         self._ldapmodify(modcmd)
   208         self._ldapmodify(modcmd)
   209 
   209 
   210     def _ldapmodify(self, modcmd):
   210     def _ldapmodify(self, modcmd):
   211         uri = self.repo.sources_by_uri['ldap'].urls[0]
   211         uri = self.repo.source_by_uri('ldap').urls[0]
   212         updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
   212         updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
   213                      'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
   213                      'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
   214         PIPE = subprocess.PIPE
   214         PIPE = subprocess.PIPE
   215         p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
   215         p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
   216         p.stdin.write('\n'.join(modcmd).encode('ascii'))
   216         p.stdin.write('\n'.join(modcmd).encode('ascii'))
   245     def assertMetadata(self, entity):
   245     def assertMetadata(self, entity):
   246         self.assertTrue(entity.creation_date)
   246         self.assertTrue(entity.creation_date)
   247         self.assertTrue(entity.modification_date)
   247         self.assertTrue(entity.modification_date)
   248 
   248 
   249     def test_authenticate(self):
   249     def test_authenticate(self):
   250         source = self.repo.sources_by_uri['ldap']
   250         source = self.repo.source_by_uri('ldap')
   251         with self.admin_access.repo_cnx() as cnx:
   251         with self.admin_access.repo_cnx() as cnx:
   252             # ensure we won't be logged against
   252             # ensure we won't be logged against
   253             self.assertRaises(AuthenticationError,
   253             self.assertRaises(AuthenticationError,
   254                               source.authenticate, cnx, 'toto', 'toto')
   254                               source.authenticate, cnx, 'toto', 'toto')
   255             self.assertRaises(AuthenticationError,
   255             self.assertRaises(AuthenticationError,
   280             self.assertEqual(rset.rows, [[e.eid]])
   280             self.assertEqual(rset.rows, [[e.eid]])
   281 
   281 
   282     def test_copy_to_system_source(self):
   282     def test_copy_to_system_source(self):
   283         "make sure we can 'convert' an LDAP user into a system one"
   283         "make sure we can 'convert' an LDAP user into a system one"
   284         with self.admin_access.repo_cnx() as cnx:
   284         with self.admin_access.repo_cnx() as cnx:
   285             source = self.repo.sources_by_uri['ldap']
   285             source = self.repo.source_by_uri('ldap')
   286             eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   286             eid = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   287             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   287             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   288             cnx.commit()
   288             cnx.commit()
   289             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   289             rset = cnx.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   290             self.assertEqual(len(rset), 1)
   290             self.assertEqual(len(rset), 1)
   313     A testcase for password generation on CWUser when none is imported
   313     A testcase for password generation on CWUser when none is imported
   314     """
   314     """
   315 
   315 
   316     def setup_database(self):
   316     def setup_database(self):
   317         with self.admin_access.repo_cnx() as cnx:
   317         with self.admin_access.repo_cnx() as cnx:
   318             lfsource = cnx.repo.sources_by_uri['ldap']
   318             lfsource = cnx.repo.source_by_uri('ldap')
   319             del lfsource.user_attrs['userPassword']
   319             del lfsource.user_attrs['userPassword']
   320         super(LDAPGeneratePwdTC, self).setup_database()
   320         super(LDAPGeneratePwdTC, self).setup_database()
   321 
   321 
   322     def test_no_password(self):
   322     def test_no_password(self):
   323         with self.admin_access.repo_cnx() as cnx:
   323         with self.admin_access.repo_cnx() as cnx:
   340             options = {'user-filter': '(%s=%s)' % ('telephonenumber', '109')}
   340             options = {'user-filter': '(%s=%s)' % ('telephonenumber', '109')}
   341             update_source_config(source, options)
   341             update_source_config(source, options)
   342             cnx.commit()
   342             cnx.commit()
   343         with self.repo.internal_cnx() as cnx:
   343         with self.repo.internal_cnx() as cnx:
   344             self.pull(cnx)
   344             self.pull(cnx)
   345             repo_source = self.repo.sources_by_uri['ldap']
   345             repo_source = self.repo.source_by_uri('ldap')
   346             self.assertRaises(AuthenticationError,
   346             self.assertRaises(AuthenticationError,
   347                               repo_source.authenticate, cnx, 'syt', 'syt')
   347                               repo_source.authenticate, cnx, 'syt', 'syt')
   348         with self.admin_access.repo_cnx() as cnx:
   348         with self.admin_access.repo_cnx() as cnx:
   349             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   349             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   350                                          'U in_state S, S name N').rows[0][0],
   350                                          'U in_state S, S name N').rows[0][0],
   372         read syt, pull, check activation
   372         read syt, pull, check activation
   373         """
   373         """
   374         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   374         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   375         with self.repo.internal_cnx() as cnx:
   375         with self.repo.internal_cnx() as cnx:
   376             self.pull(cnx)
   376             self.pull(cnx)
   377             source = self.repo.sources_by_uri['ldap']
   377             source = self.repo.source_by_uri('ldap')
   378             self.assertRaises(AuthenticationError,
   378             self.assertRaises(AuthenticationError,
   379                               source.authenticate, cnx, 'syt', 'syt')
   379                               source.authenticate, cnx, 'syt', 'syt')
   380         with self.admin_access.repo_cnx() as cnx:
   380         with self.admin_access.repo_cnx() as cnx:
   381             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   381             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   382                                          'U in_state S, S name N').rows[0][0],
   382                                          'U in_state S, S name N').rows[0][0],
   411 
   411 
   412     def test_reactivate_deleted(self):
   412     def test_reactivate_deleted(self):
   413         # test reactivating BY HAND the user isn't enough to
   413         # test reactivating BY HAND the user isn't enough to
   414         # authenticate, as the native source refuse to authenticate
   414         # authenticate, as the native source refuse to authenticate
   415         # user from other sources
   415         # user from other sources
   416         repo_source = self.repo.sources_by_uri['ldap']
   416         repo_source = self.repo.source_by_uri('ldap')
   417         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   417         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   418         with self.repo.internal_cnx() as cnx:
   418         with self.repo.internal_cnx() as cnx:
   419             self.pull(cnx)
   419             self.pull(cnx)
   420         with self.admin_access.repo_cnx() as cnx:
   420         with self.admin_access.repo_cnx() as cnx:
   421             # reactivate user (which source is still ldap-feed)
   421             # reactivate user (which source is still ldap-feed)