cubicweb/server/test/unittest_ldapsource.py
branch3.25
changeset 12143 a446124bcf3c
parent 12136 6069ee7d4824
child 12146 d540defa0591
equal deleted inserted replaced
12142:db2fc87348ab 12143:a446124bcf3c
   113 
   113 
   114 def ldapsource(cnx):
   114 def ldapsource(cnx):
   115     return cnx.find('CWSource', type=u'ldapfeed').one()
   115     return cnx.find('CWSource', type=u'ldapfeed').one()
   116 
   116 
   117 
   117 
       
   118 def update_source_config(source, options):
       
   119     config = source.dictconfig
       
   120     config.update(options)
       
   121     source.cw_set(config=u'\n'.join('%s=%s' % x for x in config.items()))
       
   122 
       
   123 
   118 class LDAPFeedTestBase(CubicWebTC):
   124 class LDAPFeedTestBase(CubicWebTC):
   119     test_db_id = 'ldap-feed'
   125     test_db_id = 'ldap-feed'
   120     loglevel = 'ERROR'
   126     loglevel = 'ERROR'
   121 
   127 
   122     @classmethod
   128     @classmethod
   220     """
   226     """
   221 
   227 
   222     def test_wrong_group(self):
   228     def test_wrong_group(self):
   223         with self.admin_access.repo_cnx() as cnx:
   229         with self.admin_access.repo_cnx() as cnx:
   224             source = ldapsource(cnx)
   230             source = ldapsource(cnx)
   225             config = source.repo_source.check_config(source)
       
   226             # inject a bogus group here, along with at least a valid one
   231             # inject a bogus group here, along with at least a valid one
   227             config['user-default-group'] = ('thisgroupdoesnotexists', 'users')
   232             options = {'use-default-group': 'thisgroupdoesnotexists,users'}
   228             source.repo_source.update_config(source, config)
   233             update_source_config(source, options)
   229             cnx.commit()
   234             cnx.commit()
   230             # here we emitted an error log entry
   235             # here we emitted an error log entry
   231             source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
   236             source.repo_source.pull_data(cnx, force=True, raise_on_error=True)
   232             cnx.commit()
   237             cnx.commit()
   233 
   238 
   327     unavailable in the LDAP database.
   332     unavailable in the LDAP database.
   328     """
   333     """
   329 
   334 
   330     def test_a_filter_inactivate(self):
   335     def test_a_filter_inactivate(self):
   331         """ filtered out people should be deactivated, unable to authenticate """
   336         """ filtered out people should be deactivated, unable to authenticate """
   332         repo_source = self.repo.sources_by_uri['ldap']
       
   333         with self.admin_access.repo_cnx() as cnx:
   337         with self.admin_access.repo_cnx() as cnx:
   334             source = ldapsource(cnx)
   338             source = ldapsource(cnx)
   335             config = repo_source.check_config(source)
       
   336             # filter with adim's phone number
   339             # filter with adim's phone number
   337             config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
   340             options = {'user-filter': '(%s=%s)' % ('telephonenumber', '109')}
   338             repo_source.update_config(source, config)
   341             update_source_config(source, options)
   339             cnx.commit()
   342             cnx.commit()
   340         with self.repo.internal_cnx() as cnx:
   343         with self.repo.internal_cnx() as cnx:
   341             self.pull(cnx)
   344             self.pull(cnx)
       
   345             repo_source = self.repo.sources_by_uri['ldap']
   342             self.assertRaises(AuthenticationError,
   346             self.assertRaises(AuthenticationError,
   343                               repo_source.authenticate, cnx, 'syt', 'syt')
   347                               repo_source.authenticate, cnx, 'syt', 'syt')
   344         with self.admin_access.repo_cnx() as cnx:
   348         with self.admin_access.repo_cnx() as cnx:
   345             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   349             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   346                                          'U in_state S, S name N').rows[0][0],
   350                                          'U in_state S, S name N').rows[0][0],
   347                              'deactivated')
   351                              'deactivated')
   348             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
   352             self.assertEqual(cnx.execute('Any N WHERE U login "adim", '
   349                                          'U in_state S, S name N').rows[0][0],
   353                                          'U in_state S, S name N').rows[0][0],
   350                              'activated')
   354                              'activated')
   351             # unfilter, syt should be activated again
   355             # unfilter, syt should be activated again
   352             config['user-filter'] = u''
   356             source = ldapsource(cnx)
   353             repo_source.update_config(source, config)
   357             options = {'user-filter': u''}
       
   358             update_source_config(source, options)
   354             cnx.commit()
   359             cnx.commit()
   355         with self.repo.internal_cnx() as cnx:
   360         with self.repo.internal_cnx() as cnx:
   356             self.pull(cnx)
   361             self.pull(cnx)
   357         with self.admin_access.repo_cnx() as cnx:
   362         with self.admin_access.repo_cnx() as cnx:
   358             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '
   363             self.assertEqual(cnx.execute('Any N WHERE U login "syt", '