23 from os.path import abspath, join, exists |
23 from os.path import abspath, join, exists |
24 import subprocess |
24 import subprocess |
25 from socket import socket, error as socketerror |
25 from socket import socket, error as socketerror |
26 |
26 |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags |
|
28 |
|
29 from cubicweb import AuthenticationError |
28 from cubicweb.devtools.testlib import CubicWebTC |
30 from cubicweb.devtools.testlib import CubicWebTC |
29 from cubicweb.devtools.repotest import RQLGeneratorTC |
31 from cubicweb.devtools.repotest import RQLGeneratorTC |
30 from cubicweb.devtools.httptest import get_available_port |
32 from cubicweb.devtools.httptest import get_available_port |
31 from cubicweb.devtools import get_test_db_handler |
33 from cubicweb.devtools import get_test_db_handler |
32 |
34 |
33 from cubicweb.server.sources.ldapuser import * |
35 from cubicweb.server.sources.ldapuser import * |
34 |
36 |
35 CONFIG = u'''host=%s |
37 CONFIG = u'''user-base-dn=ou=People,dc=cubicweb,dc=test |
36 user-base-dn=ou=People,dc=cubicweb,dc=test |
|
37 user-scope=ONELEVEL |
38 user-scope=ONELEVEL |
38 user-classes=top,posixAccount |
39 user-classes=top,posixAccount |
39 user-login-attr=uid |
40 user-login-attr=uid |
40 user-default-group=users |
41 user-default-group=users |
41 user-attrs-map=gecos:email,uid:login |
42 user-attrs-map=gecos:email,uid:login |
42 ''' |
43 ''' |
43 |
44 URL = None |
44 |
45 |
45 def setUpModule(*args): |
46 def setUpModule(*args): |
46 create_slapd_configuration(LDAPUserSourceTC.config) |
47 create_slapd_configuration(LDAPUserSourceTC.config) |
47 |
48 |
48 def tearDownModule(*args): |
49 def tearDownModule(*args): |
49 terminate_slapd() |
50 terminate_slapd() |
50 |
51 |
51 def create_slapd_configuration(config): |
52 def create_slapd_configuration(config): |
52 global slapd_process, CONFIG |
53 global slapd_process, URL |
53 basedir = join(config.apphome, "ldapdb") |
54 basedir = join(config.apphome, "ldapdb") |
54 slapdconf = join(config.apphome, "slapd.conf") |
55 slapdconf = join(config.apphome, "slapd.conf") |
55 confin = file(join(config.apphome, "slapd.conf.in")).read() |
56 confin = file(join(config.apphome, "slapd.conf.in")).read() |
56 confstream = file(slapdconf, 'w') |
57 confstream = file(slapdconf, 'w') |
57 confstream.write(confin % {'apphome': config.apphome}) |
58 confstream.write(confin % {'apphome': config.apphome}) |
91 os.kill(slapd_process.pid, signal.SIGTERM) |
92 os.kill(slapd_process.pid, signal.SIGTERM) |
92 slapd_process.wait() |
93 slapd_process.wait() |
93 print "DONE" |
94 print "DONE" |
94 del slapd_process |
95 del slapd_process |
95 |
96 |
96 class LDAPUserSourceTC(CubicWebTC): |
97 |
97 test_db_id = 'ldap-user' |
98 |
98 tags = CubicWebTC.tags | Tags(('ldap')) |
99 |
|
100 class LDAPFeedSourceTC(CubicWebTC): |
|
101 test_db_id = 'ldap-feed' |
99 |
102 |
100 @classmethod |
103 @classmethod |
101 def pre_setup_database(cls, session, config): |
104 def pre_setup_database(cls, session, config): |
102 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
105 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed', |
103 config=CONFIG) |
106 url=URL, config=CONFIG) |
104 session.commit() |
107 session.commit() |
105 # XXX keep it there |
108 isession = session.repo.internal_session() |
106 session.execute('CWUser U') |
109 lfsource = isession.repo.sources_by_uri['ldapuser'] |
|
110 stats = lfsource.pull_data(isession, force=True, raise_on_error=True) |
|
111 |
|
112 def setUp(self): |
|
113 super(LDAPFeedSourceTC, self).setUp() |
|
114 # ldap source url in the database may use a different port as the one |
|
115 # just attributed |
|
116 lfsource = self.repo.sources_by_uri['ldapuser'] |
|
117 lfsource.urls = [URL] |
|
118 |
|
119 def assertMetadata(self, entity): |
|
120 self.assertTrue(entity.creation_date) |
|
121 self.assertTrue(entity.modification_date) |
107 |
122 |
108 def test_authenticate(self): |
123 def test_authenticate(self): |
109 source = self.repo.sources_by_uri['ldapuser'] |
124 source = self.repo.sources_by_uri['ldapuser'] |
110 self.session.set_cnxset() |
125 self.session.set_cnxset() |
111 self.assertRaises(AuthenticationError, |
126 self.assertRaises(AuthenticationError, |
112 source.authenticate, self.session, 'toto', 'toto') |
127 source.authenticate, self.session, 'toto', 'toto') |
113 |
|
114 def test_synchronize(self): |
|
115 source = self.repo.sources_by_uri['ldapuser'] |
|
116 source.synchronize() |
|
117 |
128 |
118 def test_base(self): |
129 def test_base(self): |
119 # check a known one |
130 # check a known one |
120 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
131 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
121 e = rset.get_entity(0, 0) |
132 e = rset.get_entity(0, 0) |
122 self.assertEqual(e.login, 'syt') |
133 self.assertEqual(e.login, 'syt') |
123 e.complete() |
134 e.complete() |
124 self.assertEqual(e.creation_date, None) |
135 self.assertMetadata(e) |
125 self.assertEqual(e.modification_date, None) |
136 self.assertEqual(e.firstname, None) |
|
137 self.assertEqual(e.surname, None) |
|
138 self.assertEqual(e.in_group[0].name, 'users') |
|
139 self.assertEqual(e.owned_by[0].login, 'syt') |
|
140 self.assertEqual(e.created_by, ()) |
|
141 self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault') |
|
142 # email content should be indexed on the user |
|
143 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
|
144 self.assertEqual(rset.rows, [[e.eid]]) |
|
145 |
|
146 def test_copy_to_system_source(self): |
|
147 source = self.repo.sources_by_uri['ldapuser'] |
|
148 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
149 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
|
150 self.commit() |
|
151 source.reset_caches() |
|
152 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
153 self.assertEqual(len(rset), 1) |
|
154 e = rset.get_entity(0, 0) |
|
155 self.assertEqual(e.eid, eid) |
|
156 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False}, |
|
157 'type': 'CWUser', |
|
158 'extid': None}) |
|
159 self.assertEqual(e.cw_source[0].name, 'system') |
|
160 self.assertTrue(e.creation_date) |
|
161 self.assertTrue(e.modification_date) |
|
162 # XXX test some password has been set |
|
163 source.pull_data(self.session) |
|
164 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
165 self.assertEqual(len(rset), 1) |
|
166 |
|
167 |
|
168 class LDAPUserSourceTC(LDAPFeedSourceTC): |
|
169 test_db_id = 'ldap-user' |
|
170 tags = CubicWebTC.tags | Tags(('ldap')) |
|
171 |
|
172 @classmethod |
|
173 def pre_setup_database(cls, session, config): |
|
174 session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser', |
|
175 url=URL, config=CONFIG) |
|
176 session.commit() |
|
177 # XXX keep it there |
|
178 session.execute('CWUser U') |
|
179 |
|
180 def assertMetadata(self, entity): |
|
181 self.assertEqual(entity.creation_date, None) |
|
182 self.assertEqual(entity.modification_date, None) |
|
183 |
|
184 def test_synchronize(self): |
|
185 source = self.repo.sources_by_uri['ldapuser'] |
|
186 source.synchronize() |
|
187 |
|
188 def test_base(self): |
|
189 # check a known one |
|
190 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
191 e = rset.get_entity(0, 0) |
|
192 self.assertEqual(e.login, 'syt') |
|
193 e.complete() |
|
194 self.assertMetadata(e) |
126 self.assertEqual(e.firstname, None) |
195 self.assertEqual(e.firstname, None) |
127 self.assertEqual(e.surname, None) |
196 self.assertEqual(e.surname, None) |
128 self.assertEqual(e.in_group[0].name, 'users') |
197 self.assertEqual(e.in_group[0].name, 'users') |
129 self.assertEqual(e.owned_by[0].login, 'syt') |
198 self.assertEqual(e.owned_by[0].login, 'syt') |
130 self.assertEqual(e.created_by, ()) |
199 self.assertEqual(e.created_by, ()) |
345 rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) |
414 rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) |
346 self.assertEqual(rset.rows, []) |
415 self.assertEqual(rset.rows, []) |
347 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
416 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
348 self.assertEqual(rset.rows, [[None]]) |
417 self.assertEqual(rset.rows, [[None]]) |
349 |
418 |
350 def test_copy_to_system_source(self): |
|
351 source = self.repo.sources_by_uri['ldapuser'] |
|
352 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
|
353 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
|
354 self.commit() |
|
355 source.reset_caches() |
|
356 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
357 self.assertEqual(len(rset), 1) |
|
358 e = rset.get_entity(0, 0) |
|
359 self.assertEqual(e.eid, eid) |
|
360 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False}, |
|
361 'type': 'CWUser', |
|
362 'extid': None}) |
|
363 self.assertEqual(e.cw_source[0].name, 'system') |
|
364 self.assertTrue(e.creation_date) |
|
365 self.assertTrue(e.modification_date) |
|
366 # XXX test some password has been set |
|
367 source.synchronize() |
|
368 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
|
369 self.assertEqual(len(rset), 1) |
|
370 |
|
371 def test_nonregr1(self): |
419 def test_nonregr1(self): |
372 self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' |
420 self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' |
373 'X modification_date AA', |
421 'X modification_date AA', |
374 {'x': self.session.user.eid}) |
422 {'x': self.session.user.eid}) |
375 |
423 |
401 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
449 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
402 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
450 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
403 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', |
451 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', |
404 {'x': self.session.user.eid}) |
452 {'x': self.session.user.eid}) |
405 |
453 |
406 |
|
407 class GlobTrFuncTC(TestCase): |
454 class GlobTrFuncTC(TestCase): |
408 |
455 |
409 def test_count(self): |
456 def test_count(self): |
410 trfunc = GlobTrFunc('count', 0) |
457 trfunc = GlobTrFunc('count', 0) |
411 res = trfunc.apply([[1], [2], [3], [4]]) |
458 res = trfunc.apply([[1], [2], [3], [4]]) |
435 res = trfunc.apply([[1], [2], [3], [4]]) |
482 res = trfunc.apply([[1], [2], [3], [4]]) |
436 self.assertEqual(res, [[4]]) |
483 self.assertEqual(res, [[4]]) |
437 trfunc = GlobTrFunc('max', 1) |
484 trfunc = GlobTrFunc('max', 1) |
438 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
485 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
439 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
486 self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) |
|
487 |
440 |
488 |
441 class RQL2LDAPFilterTC(RQLGeneratorTC): |
489 class RQL2LDAPFilterTC(RQLGeneratorTC): |
442 |
490 |
443 tags = RQLGeneratorTC.tags | Tags(('ldap')) |
491 tags = RQLGeneratorTC.tags | Tags(('ldap')) |
444 |
492 |