32 from cubicweb.devtools.httptest import get_available_port |
32 from cubicweb.devtools.httptest import get_available_port |
33 from cubicweb.devtools import get_test_db_handler |
33 from cubicweb.devtools import get_test_db_handler |
34 |
34 |
35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter |
35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter |
36 |
36 |
37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test' |
37 CONFIG_LDAPFEED = CONFIG_LDAPUSER = u'''user-base-dn=ou=People,dc=cubicweb,dc=test''' |
|
38 |
38 URL = None |
39 URL = None |
39 |
40 |
40 def create_slapd_configuration(cls): |
41 def create_slapd_configuration(cls): |
41 global URL |
42 global URL |
42 slapddir = tempfile.mkdtemp('cw-unittest-ldap') |
43 slapddir = tempfile.mkdtemp('cw-unittest-ldap') |
59 cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] |
60 cmdline = ["/usr/sbin/slapd", "-f", slapdconf, "-h", ldapuri, "-d", "0"] |
60 config.info('Starting slapd:', ' '.join(cmdline)) |
61 config.info('Starting slapd:', ' '.join(cmdline)) |
61 cls.slapd_process = subprocess.Popen(cmdline) |
62 cls.slapd_process = subprocess.Popen(cmdline) |
62 time.sleep(0.2) |
63 time.sleep(0.2) |
63 if cls.slapd_process.poll() is None: |
64 if cls.slapd_process.poll() is None: |
64 config.info('slapd started with pid %s' % cls.slapd_process.pid) |
65 config.info('slapd started with pid %s', cls.slapd_process.pid) |
65 else: |
66 else: |
66 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
67 raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' % |
67 (" ".join(cmdline), os.getcwd())) |
68 (" ".join(cmdline), os.getcwd())) |
68 URL = u'ldap://%s' % host |
69 URL = u'ldap://%s' % host |
69 return slapddir |
70 return slapddir |
98 pass |
99 pass |
99 |
100 |
100 class CheckWrongGroup(LDAPTestBase): |
101 class CheckWrongGroup(LDAPTestBase): |
101 |
102 |
102 def test_wrong_group(self): |
103 def test_wrong_group(self): |
103 self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
104 self.session.create_entity('CWSource', name=u'ldapfeed', type=u'ldapfeed', parser=u'ldapfeed', |
104 url=URL, config=CONFIG) |
105 url=URL, config=CONFIG_LDAPFEED) |
105 self.commit() |
106 self.commit() |
106 with self.session.repo.internal_session(safe=True) as session: |
107 with self.session.repo.internal_session(safe=True) as session: |
107 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
108 source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0) |
108 config = source.repo_source.check_config(source) |
109 config = source.repo_source.check_config(source) |
109 # inject a bogus group here, along with at least a valid one |
110 # inject a bogus group here, along with at least a valid one |
112 session.commit(free_cnxset=False) |
113 session.commit(free_cnxset=False) |
113 # here we emitted an error log entry |
114 # here we emitted an error log entry |
114 stats = source.repo_source.pull_data(session, force=True, raise_on_error=True) |
115 stats = source.repo_source.pull_data(session, force=True, raise_on_error=True) |
115 session.commit() |
116 session.commit() |
116 |
117 |
|
118 |
117 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase): |
119 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase): |
118 test_db_id = 'ldap-feed' |
120 test_db_id = 'ldap-feed' |
119 |
121 |
120 @classmethod |
122 @classmethod |
121 def pre_setup_database(cls, session, config): |
123 def pre_setup_database(cls, session, config): |
122 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
124 session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', |
123 url=URL, config=CONFIG) |
125 url=URL, config=CONFIG_LDAPFEED) |
124 session.commit() |
126 session.commit() |
125 with session.repo.internal_session(safe=True) as isession: |
127 with session.repo.internal_session(safe=True) as isession: |
126 lfsource = isession.repo.sources_by_uri['ldapuser'] |
128 lfsource = isession.repo.sources_by_uri['ldap'] |
127 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
129 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
128 |
130 |
129 def _pull(self): |
131 def _pull(self): |
130 with self.session.repo.internal_session() as isession: |
132 with self.session.repo.internal_session() as isession: |
131 lfsource = isession.repo.sources_by_uri['ldapuser'] |
133 lfsource = isession.repo.sources_by_uri['ldap'] |
132 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
134 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
133 isession.commit() |
135 isession.commit() |
134 |
136 |
135 def test_a_filter_inactivate(self): |
137 def test_a_filter_inactivate(self): |
136 """ filtered out people should be deactivated, unable to authenticate """ |
138 """ filtered out people should be deactivated, unable to authenticate """ |
162 |
164 |
163 def test_delete(self): |
165 def test_delete(self): |
164 """ delete syt, pull, check deactivation, repull, |
166 """ delete syt, pull, check deactivation, repull, |
165 readd syt, pull, check activation |
167 readd syt, pull, check activation |
166 """ |
168 """ |
167 uri = self.repo.sources_by_uri['ldapuser'].urls[0] |
169 uri = self.repo.sources_by_uri['ldap'].urls[0] |
168 deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' " |
170 deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' " |
169 "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri) |
171 "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri) |
170 os.system(deletecmd) |
172 os.system(deletecmd) |
171 self._pull() |
173 self._pull() |
172 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
174 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
189 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
191 user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0) |
190 user.cw_adapt_to('IWorkflowable').fire_transition('activate') |
192 user.cw_adapt_to('IWorkflowable').fire_transition('activate') |
191 self.commit() |
193 self.commit() |
192 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
194 self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt') |
193 |
195 |
|
196 |
194 class LDAPFeedSourceTC(LDAPTestBase): |
197 class LDAPFeedSourceTC(LDAPTestBase): |
195 test_db_id = 'ldap-feed' |
198 test_db_id = 'ldap-feed' |
196 |
199 |
197 @classmethod |
200 @classmethod |
198 def pre_setup_database(cls, session, config): |
201 def pre_setup_database(cls, session, config): |
199 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
202 session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed', |
200 url=URL, config=CONFIG) |
203 url=URL, config=CONFIG_LDAPFEED) |
201 session.commit() |
204 session.commit() |
202 isession = session.repo.internal_session(safe=True) |
205 isession = session.repo.internal_session(safe=True) |
203 lfsource = isession.repo.sources_by_uri['ldapuser'] |
206 lfsource = isession.repo.sources_by_uri['ldap'] |
204 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
207 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
205 |
208 |
206 def setUp(self): |
209 def setUp(self): |
207 super(LDAPFeedSourceTC, self).setUp() |
210 super(LDAPFeedSourceTC, self).setUp() |
208 # ldap source url in the database may use a different port as the one |
211 # ldap source url in the database may use a different port as the one |
209 # just attributed |
212 # just attributed |
210 lfsource = self.repo.sources_by_uri['ldapuser'] |
213 lfsource = self.repo.sources_by_uri['ldap'] |
211 lfsource.urls = [URL] |
214 lfsource.urls = [URL] |
212 |
215 |
213 def assertMetadata(self, entity): |
216 def assertMetadata(self, entity): |
214 self.assertTrue(entity.creation_date) |
217 self.assertTrue(entity.creation_date) |
215 self.assertTrue(entity.modification_date) |
218 self.assertTrue(entity.modification_date) |
216 |
219 |
217 def test_authenticate(self): |
220 def test_authenticate(self): |
218 source = self.repo.sources_by_uri['ldapuser'] |
221 source = self.repo.sources_by_uri['ldap'] |
219 self.session.set_cnxset() |
222 self.session.set_cnxset() |
220 # ensure we won't be logged against |
223 # ensure we won't be logged against |
221 self.assertRaises(AuthenticationError, |
224 self.assertRaises(AuthenticationError, |
222 source.authenticate, self.session, 'toto', 'toto') |
225 source.authenticate, self.session, 'toto', 'toto') |
223 self.assertTrue(source.authenticate(self.session, 'syt', 'syt')) |
226 self.assertTrue(source.authenticate(self.session, 'syt', 'syt')) |
239 # email content should be indexed on the user |
242 # email content should be indexed on the user |
240 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
243 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
241 self.assertEqual(rset.rows, [[e.eid]]) |
244 self.assertEqual(rset.rows, [[e.eid]]) |
242 |
245 |
243 def test_copy_to_system_source(self): |
246 def test_copy_to_system_source(self): |
244 source = self.repo.sources_by_uri['ldapuser'] |
247 source = self.repo.sources_by_uri['ldap'] |
245 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
248 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
246 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
249 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
247 self.commit() |
250 self.commit() |
248 source.reset_caches() |
251 source.reset_caches() |
249 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
252 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
269 test_db_id = 'ldap-user' |
272 test_db_id = 'ldap-user' |
270 tags = CubicWebTC.tags | Tags(('ldap')) |
273 tags = CubicWebTC.tags | Tags(('ldap')) |
271 |
274 |
272 @classmethod |
275 @classmethod |
273 def pre_setup_database(cls, session, config): |
276 def pre_setup_database(cls, session, config): |
274 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
277 session.create_entity('CWSource', name=u'ldap', type=u'ldapuser', |
275 url=URL, config=CONFIG) |
278 url=URL, config=CONFIG_LDAPUSER) |
276 session.commit() |
279 session.commit() |
277 # XXX keep it there |
280 # XXX keep it there |
278 session.execute('CWUser U') |
281 session.execute('CWUser U') |
279 |
282 |
280 def assertMetadata(self, entity): |
283 def assertMetadata(self, entity): |
281 self.assertEqual(entity.creation_date, None) |
284 self.assertEqual(entity.creation_date, None) |
282 self.assertEqual(entity.modification_date, None) |
285 self.assertEqual(entity.modification_date, None) |
283 |
286 |
284 def test_synchronize(self): |
287 def test_synchronize(self): |
285 source = self.repo.sources_by_uri['ldapuser'] |
288 source = self.repo.sources_by_uri['ldap'] |
286 source.synchronize() |
289 source.synchronize() |
287 |
290 |
288 def test_base(self): |
291 def test_base(self): |
289 # check a known one |
292 # check a known one |
290 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
293 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |