server/test/unittest_ldapsource.py
changeset 8901 661b6aaac240
parent 8885 b3409c1dc012
child 8902 9edfa3660bf4
equal deleted inserted replaced
8900:010a59e12d89 8901:661b6aaac240
    32 from cubicweb.devtools.httptest import get_available_port
    32 from cubicweb.devtools.httptest import get_available_port
    33 from cubicweb.devtools import get_test_db_handler
    33 from cubicweb.devtools import get_test_db_handler
    34 
    34 
    35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
    35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
    36 
    36 
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
    37 CONFIG_LDAPFEED = CONFIG_LDAPUSER = u'''user-base-dn=ou=People,dc=cubicweb,dc=test'''
       
    38 
    38 URL = None
    39 URL = None
    39 
    40 
    40 def create_slapd_configuration(cls):
    41 def create_slapd_configuration(cls):
    41     global URL
    42     global URL
    42     slapddir = tempfile.mkdtemp('cw-unittest-ldap')
    43     slapddir = tempfile.mkdtemp('cw-unittest-ldap')
    59     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    60     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    60     config.info('Starting slapd:', ' '.join(cmdline))
    61     config.info('Starting slapd:', ' '.join(cmdline))
    61     cls.slapd_process = subprocess.Popen(cmdline)
    62     cls.slapd_process = subprocess.Popen(cmdline)
    62     time.sleep(0.2)
    63     time.sleep(0.2)
    63     if cls.slapd_process.poll() is None:
    64     if cls.slapd_process.poll() is None:
    64         config.info('slapd started with pid %s' % cls.slapd_process.pid)
    65         config.info('slapd started with pid %s', cls.slapd_process.pid)
    65     else:
    66     else:
    66         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    67         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
    67                                (" ".join(cmdline), os.getcwd()))
    68                                (" ".join(cmdline), os.getcwd()))
    68     URL = u'ldap://%s' % host
    69     URL = u'ldap://%s' % host
    69     return slapddir
    70     return slapddir
    98             pass
    99             pass
    99 
   100 
   100 class CheckWrongGroup(LDAPTestBase):
   101 class CheckWrongGroup(LDAPTestBase):
   101 
   102 
   102     def test_wrong_group(self):
   103     def test_wrong_group(self):
   103         self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
   104         self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed',
   104                                    url=URL, config=CONFIG)
   105                                    url=URL, config=CONFIG_LDAPFEED)
   105         self.commit()
   106         self.commit()
   106         with self.session.repo.internal_session(safe=True) as session:
   107         with self.session.repo.internal_session(safe=True) as session:
   107             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
   108             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
   108             config = source.repo_source.check_config(source)
   109             config = source.repo_source.check_config(source)
   109             # inject a bogus group here, along with at least a valid one
   110             # inject a bogus group here, along with at least a valid one
   112             session.commit(free_cnxset=False)
   113             session.commit(free_cnxset=False)
   113             # here we emitted an error log entry
   114             # here we emitted an error log entry
   114             stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
   115             stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
   115             session.commit()
   116             session.commit()
   116 
   117 
       
   118 
   117 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
   119 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
   118     test_db_id = 'ldap-feed'
   120     test_db_id = 'ldap-feed'
   119 
   121 
   120     @classmethod
   122     @classmethod
   121     def pre_setup_database(cls, session, config):
   123     def pre_setup_database(cls, session, config):
   122         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
   124         session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
   123                               url=URL, config=CONFIG)
   125                               url=URL, config=CONFIG_LDAPFEED)
   124         session.commit()
   126         session.commit()
   125         with session.repo.internal_session(safe=True) as isession:
   127         with session.repo.internal_session(safe=True) as isession:
   126             lfsource = isession.repo.sources_by_uri['ldapuser']
   128             lfsource = isession.repo.sources_by_uri['ldap']
   127             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   129             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   128 
   130 
   129     def _pull(self):
   131     def _pull(self):
   130         with self.session.repo.internal_session() as isession:
   132         with self.session.repo.internal_session() as isession:
   131             lfsource = isession.repo.sources_by_uri['ldapuser']
   133             lfsource = isession.repo.sources_by_uri['ldap']
   132             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   134             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   133             isession.commit()
   135             isession.commit()
   134 
   136 
   135     def test_a_filter_inactivate(self):
   137     def test_a_filter_inactivate(self):
   136         """ filtered out people should be deactivated, unable to authenticate """
   138         """ filtered out people should be deactivated, unable to authenticate """
   162 
   164 
   163     def test_delete(self):
   165     def test_delete(self):
   164         """ delete syt, pull, check deactivation, repull,
   166         """ delete syt, pull, check deactivation, repull,
   165         readd syt, pull, check activation
   167         readd syt, pull, check activation
   166         """
   168         """
   167         uri = self.repo.sources_by_uri['ldapuser'].urls[0]
   169         uri = self.repo.sources_by_uri['ldap'].urls[0]
   168         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
   170         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
   169                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
   171                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
   170         os.system(deletecmd)
   172         os.system(deletecmd)
   171         self._pull()
   173         self._pull()
   172         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   174         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   189         user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   191         user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
   190         user.cw_adapt_to('IWorkflowable').fire_transition('activate')
   192         user.cw_adapt_to('IWorkflowable').fire_transition('activate')
   191         self.commit()
   193         self.commit()
   192         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   194         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
   193 
   195 
       
   196 
   194 class LDAPFeedSourceTC(LDAPTestBase):
   197 class LDAPFeedSourceTC(LDAPTestBase):
   195     test_db_id = 'ldap-feed'
   198     test_db_id = 'ldap-feed'
   196 
   199 
   197     @classmethod
   200     @classmethod
   198     def pre_setup_database(cls, session, config):
   201     def pre_setup_database(cls, session, config):
   199         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
   202         session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
   200                               url=URL, config=CONFIG)
   203                               url=URL, config=CONFIG_LDAPFEED)
   201         session.commit()
   204         session.commit()
   202         isession = session.repo.internal_session(safe=True)
   205         isession = session.repo.internal_session(safe=True)
   203         lfsource = isession.repo.sources_by_uri['ldapuser']
   206         lfsource = isession.repo.sources_by_uri['ldap']
   204         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   207         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
   205 
   208 
   206     def setUp(self):
   209     def setUp(self):
   207         super(LDAPFeedSourceTC, self).setUp()
   210         super(LDAPFeedSourceTC, self).setUp()
   208         # ldap source url in the database may use a different port as the one
   211         # ldap source url in the database may use a different port as the one
   209         # just attributed
   212         # just attributed
   210         lfsource = self.repo.sources_by_uri['ldapuser']
   213         lfsource = self.repo.sources_by_uri['ldap']
   211         lfsource.urls = [URL]
   214         lfsource.urls = [URL]
   212 
   215 
   213     def assertMetadata(self, entity):
   216     def assertMetadata(self, entity):
   214         self.assertTrue(entity.creation_date)
   217         self.assertTrue(entity.creation_date)
   215         self.assertTrue(entity.modification_date)
   218         self.assertTrue(entity.modification_date)
   216 
   219 
   217     def test_authenticate(self):
   220     def test_authenticate(self):
   218         source = self.repo.sources_by_uri['ldapuser']
   221         source = self.repo.sources_by_uri['ldap']
   219         self.session.set_cnxset()
   222         self.session.set_cnxset()
   220         # ensure we won't be logged against
   223         # ensure we won't be logged against
   221         self.assertRaises(AuthenticationError,
   224         self.assertRaises(AuthenticationError,
   222                           source.authenticate, self.session, 'toto', 'toto')
   225                           source.authenticate, self.session, 'toto', 'toto')
   223         self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
   226         self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
   239         # email content should be indexed on the user
   242         # email content should be indexed on the user
   240         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
   243         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
   241         self.assertEqual(rset.rows, [[e.eid]])
   244         self.assertEqual(rset.rows, [[e.eid]])
   242 
   245 
   243     def test_copy_to_system_source(self):
   246     def test_copy_to_system_source(self):
   244         source = self.repo.sources_by_uri['ldapuser']
   247         source = self.repo.sources_by_uri['ldap']
   245         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   248         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
   246         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   249         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
   247         self.commit()
   250         self.commit()
   248         source.reset_caches()
   251         source.reset_caches()
   249         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   252         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   269     test_db_id = 'ldap-user'
   272     test_db_id = 'ldap-user'
   270     tags = CubicWebTC.tags | Tags(('ldap'))
   273     tags = CubicWebTC.tags | Tags(('ldap'))
   271 
   274 
   272     @classmethod
   275     @classmethod
   273     def pre_setup_database(cls, session, config):
   276     def pre_setup_database(cls, session, config):
   274         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
   277         session.create_entity('CWSource', name=u'ldap', type=u'ldapuser',
   275                               url=URL, config=CONFIG)
   278                               url=URL, config=CONFIG_LDAPUSER)
   276         session.commit()
   279         session.commit()
   277         # XXX keep it there
   280         # XXX keep it there
   278         session.execute('CWUser U')
   281         session.execute('CWUser U')
   279 
   282 
   280     def assertMetadata(self, entity):
   283     def assertMetadata(self, entity):
   281         self.assertEqual(entity.creation_date, None)
   284         self.assertEqual(entity.creation_date, None)
   282         self.assertEqual(entity.modification_date, None)
   285         self.assertEqual(entity.modification_date, None)
   283 
   286 
   284     def test_synchronize(self):
   287     def test_synchronize(self):
   285         source = self.repo.sources_by_uri['ldapuser']
   288         source = self.repo.sources_by_uri['ldap']
   286         source.synchronize()
   289         source.synchronize()
   287 
   290 
   288     def test_base(self):
   291     def test_base(self):
   289         # check a known one
   292         # check a known one
   290         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
   293         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})