cubicweb/server/test/unittest_ldapsource.py
changeset 12044 70bb46dfa87b
parent 12042 5e64a98572de
child 12045 d19f7ec36d33
equal deleted inserted replaced
12043:b8d2e6b9f548 12044:70bb46dfa87b
   324     unavailable in the LDAP database.
   324     unavailable in the LDAP database.
   325     """
   325     """
   326 
   326 
   327     def test_a_filter_inactivate(self):
   327     def test_a_filter_inactivate(self):
   328         """ filtered out people should be deactivated, unable to authenticate """
   328         """ filtered out people should be deactivated, unable to authenticate """
       
   329         repo_source = self.repo.sources_by_uri['ldap']
   329         with self.admin_access.repo_cnx() as cnx:
   330         with self.admin_access.repo_cnx() as cnx:
   330             source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
   331             source = cnx.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0, 0)
   331             config = source.repo_source.check_config(source)
   332             config = source.repo_source.check_config(source)
   332             # filter with adim's phone number
   333             # filter with adim's phone number
   333             config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
   334             config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
   334             source.repo_source.update_config(source, config)
   335             source.repo_source.update_config(source, config)
   335             cnx.commit()
   336             cnx.commit()
   336         with self.repo.internal_cnx() as cnx:
   337         with self.repo.internal_cnx() as cnx:
   337             self.pull(cnx)
   338             self.pull(cnx)
   338         self.assertRaises(AuthenticationError, self.repo.new_session, 'syt', password='syt')
   339             self.assertRaises(AuthenticationError,
       
   340                               repo_source.authenticate, cnx, 'syt', 'syt')
   339         with self.admin_access.repo_cnx() as cnx:
   341         with self.admin_access.repo_cnx() as cnx:
   340             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   342             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   341                                          'U in_state S, S name N').rows[0][0],
   343                                          'U in_state S, S name N').rows[0][0],
   342                              'deactivated')
   344                              'deactivated')
   343             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
   345             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
   362         read syt, pull, check activation
   364         read syt, pull, check activation
   363         """
   365         """
   364         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   366         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   365         with self.repo.internal_cnx() as cnx:
   367         with self.repo.internal_cnx() as cnx:
   366             self.pull(cnx)
   368             self.pull(cnx)
   367         self.assertRaises(AuthenticationError, self.repo.new_session, 'syt', password='syt')
   369             source = self.repo.sources_by_uri['ldap']
       
   370             self.assertRaises(AuthenticationError,
       
   371                               source.authenticate, cnx, 'syt', 'syt')
   368         with self.admin_access.repo_cnx() as cnx:
   372         with self.admin_access.repo_cnx() as cnx:
   369             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   373             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   370                                          'U in_state S, S name N').rows[0][0],
   374                                          'U in_state S, S name N').rows[0][0],
   371                              'deactivated')
   375                              'deactivated')
   372         with self.repo.internal_cnx() as cnx:
   376         with self.repo.internal_cnx() as cnx:
   399 
   403 
   400     def test_reactivate_deleted(self):
   404     def test_reactivate_deleted(self):
   401         # test reactivating BY HAND the user isn't enough to
   405         # test reactivating BY HAND the user isn't enough to
   402         # authenticate, as the native source refuse to authenticate
   406         # authenticate, as the native source refuse to authenticate
   403         # user from other sources
   407         # user from other sources
       
   408         repo_source = self.repo.sources_by_uri['ldap']
   404         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   409         self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
   405         with self.repo.internal_cnx() as cnx:
   410         with self.repo.internal_cnx() as cnx:
   406             self.pull(cnx)
   411             self.pull(cnx)
   407         with self.admin_access.repo_cnx() as cnx:
   412         with self.admin_access.repo_cnx() as cnx:
   408             # reactivate user (which source is still ldap-feed)
   413             # reactivate user (which source is still ldap-feed)
   409             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   414             user = cnx.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   410             user.cw_adapt_to('IWorkflowable').fire_transition('activate')
   415             user.cw_adapt_to('IWorkflowable').fire_transition('activate')
   411             cnx.commit()
   416             cnx.commit()
   412             with self.assertRaises(AuthenticationError):
   417             self.assertRaises(AuthenticationError,
   413                 self.repo.new_session('syt', password='syt')
   418                               repo_source.authenticate, cnx, 'syt', 'syt')
   414 
   419 
   415             # ok now let's try to make it a system user
   420             # ok now let's try to make it a system user
   416             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
   421             cnx.execute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
   417             cnx.commit()
   422             cnx.commit()
   418         # and that we can now authenticate again
   423             # and that we can now authenticate again
   419         self.assertRaises(AuthenticationError, self.repo.new_session, 'syt', password='toto')
   424             self.assertRaises(AuthenticationError,
   420         self.assertTrue(self.repo.new_session('syt', password='syt'))
   425                               repo_source.authenticate, cnx, 'syt', 'toto')
       
   426             self.assertTrue(self.repo.authenticate_user(cnx, 'syt', password='syt'))
   421 
   427 
   422 
   428 
   423 class LDAPFeedGroupTC(LDAPFeedTestBase):
   429 class LDAPFeedGroupTC(LDAPFeedTestBase):
   424     """
   430     """
   425     A testcase for group support in ldapfeed.
   431     A testcase for group support in ldapfeed.