|
1 """cubicweb.server.sources.ldapusers unit and functional tests""" |
|
2 |
|
3 from logilab.common.testlib import TestCase, unittest_main, mock_object |
|
4 from cubicweb.devtools import init_test_database, TestServerConfiguration |
|
5 from cubicweb.devtools.apptest import RepositoryBasedTC |
|
6 from cubicweb.devtools.repotest import RQLGeneratorTC |
|
7 |
|
8 from cubicweb.server.sources.ldapuser import * |
|
9 |
|
10 def nopwd_authenticate(self, session, login, upassword): |
|
11 """used to monkey patch the source to get successful authentication without |
|
12 upassword checking |
|
13 """ |
|
14 assert login, 'no login!' |
|
15 searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))] |
|
16 searchfilter.extend([filter_format('(%s=%s)', ('objectClass', o)) |
|
17 for o in self.user_classes]) |
|
18 searchstr = '(&%s)' % ''.join(searchfilter) |
|
19 # first search the user |
|
20 try: |
|
21 user = self._search(session, self.user_base_dn, self.user_base_scope, |
|
22 searchstr)[0] |
|
23 except IndexError: |
|
24 # no such user |
|
25 raise AuthenticationError() |
|
26 # don't check upassword ! |
|
27 return self.extid2eid(user['dn'], 'EUser', session) |
|
28 |
|
29 |
|
30 |
|
31 config = TestServerConfiguration('data') |
|
32 config.sources_file = lambda : 'data/sourcesldap' |
|
33 repo, cnx = init_test_database('sqlite', config=config) |
|
34 |
|
35 class LDAPUserSourceTC(RepositoryBasedTC): |
|
36 repo = repo |
|
37 |
|
38 def patch_authenticate(self): |
|
39 self._orig_authenticate = LDAPUserSource.authenticate |
|
40 LDAPUserSource.authenticate = nopwd_authenticate |
|
41 |
|
42 def setUp(self): |
|
43 self._prepare() |
|
44 # XXX: need this first query else we get 'database is locked' from |
|
45 # sqlite since it doesn't support multiple connections on the same |
|
46 # database |
|
47 # so doing, ldap inserted users don't get removed between each test |
|
48 rset = self.execute('EUser X') |
|
49 self.commit() |
|
50 # check we get some users from ldap |
|
51 self.assert_(len(rset) > 1) |
|
52 self.maxeid = self.execute('Any MAX(X)')[0][0] |
|
53 |
|
54 def tearDown(self): |
|
55 if hasattr(self, '_orig_authenticate'): |
|
56 LDAPUserSource.authenticate = self._orig_authenticate |
|
57 RepositoryBasedTC.tearDown(self) |
|
58 |
|
59 def test_authenticate(self): |
|
60 source = self.repo.sources_by_uri['ldapuser'] |
|
61 self.assertRaises(AuthenticationError, |
|
62 source.authenticate, self.session, 'toto', 'toto') |
|
63 |
|
64 def test_synchronize(self): |
|
65 source = self.repo.sources_by_uri['ldapuser'] |
|
66 source.synchronize() |
|
67 |
|
68 def test_base(self): |
|
69 # check a known one |
|
70 e = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0) |
|
71 self.assertEquals(e.login, 'syt') |
|
72 e.complete() |
|
73 self.assertEquals(e.creation_date, None) |
|
74 self.assertEquals(e.modification_date, None) |
|
75 self.assertEquals(e.firstname, None) |
|
76 self.assertEquals(e.surname, None) |
|
77 self.assertEquals(e.in_group[0].name, 'users') |
|
78 self.assertEquals(e.owned_by[0].login, 'syt') |
|
79 self.assertEquals(e.created_by, []) |
|
80 self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault') |
|
81 # email content should be indexed on the user |
|
82 rset = self.execute('EUser X WHERE X has_text "thenault"') |
|
83 self.assertEquals(rset.rows, [[e.eid]]) |
|
84 |
|
85 def test_not(self): |
|
86 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
|
87 rset = self.execute('EUser X WHERE NOT X eid %s' % eid) |
|
88 self.assert_(rset) |
|
89 self.assert_(not eid in (r[0] for r in rset)) |
|
90 |
|
91 def test_multiple(self): |
|
92 seid = self.execute('EUser X WHERE X login "syt"')[0][0] |
|
93 aeid = self.execute('EUser X WHERE X login "adim"')[0][0] |
|
94 rset = self.execute('EUser X, Y WHERE X login "syt", Y login "adim"') |
|
95 self.assertEquals(rset.rows, [[seid, aeid]]) |
|
96 rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"') |
|
97 self.assertEquals(rset.rows, [[seid, aeid, 'syt']]) |
|
98 |
|
99 def test_in(self): |
|
100 seid = self.execute('EUser X WHERE X login "syt"')[0][0] |
|
101 aeid = self.execute('EUser X WHERE X login "adim"')[0][0] |
|
102 rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L') |
|
103 self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) |
|
104 |
|
105 def test_relations(self): |
|
106 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
|
107 rset = self.execute('Any X,E WHERE X is EUser, X login L, X primary_email E') |
|
108 self.assert_(eid in (r[0] for r in rset)) |
|
109 rset = self.execute('Any X,L,E WHERE X is EUser, X login L, X primary_email E') |
|
110 self.assert_('syt' in (r[1] for r in rset)) |
|
111 |
|
112 def test_count(self): |
|
113 nbusers = self.execute('Any COUNT(X) WHERE X is EUser')[0][0] |
|
114 # just check this is a possible number |
|
115 self.assert_(nbusers > 1, nbusers) |
|
116 self.assert_(nbusers < 30, nbusers) |
|
117 |
|
118 def test_upper(self): |
|
119 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
|
120 rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid) |
|
121 self.assertEquals(rset[0][0], 'SYT') |
|
122 |
|
123 def test_unknown_attr(self): |
|
124 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
|
125 rset = self.execute('Any L,C,M WHERE X eid %s, X login L, ' |
|
126 'X creation_date C, X modification_date M' % eid) |
|
127 self.assertEquals(rset[0][0], 'syt') |
|
128 self.assertEquals(rset[0][1], None) |
|
129 self.assertEquals(rset[0][2], None) |
|
130 |
|
131 def test_sort(self): |
|
132 logins = [l for l, in self.execute('Any L ORDERBY L WHERE X login L')] |
|
133 self.assertEquals(logins, sorted(logins)) |
|
134 |
|
135 def test_lower_sort(self): |
|
136 logins = [l for l, in self.execute('Any L ORDERBY lower(L) WHERE X login L')] |
|
137 self.assertEquals(logins, sorted(logins)) |
|
138 |
|
139 def test_or(self): |
|
140 rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")') |
|
141 self.assertEquals(len(rset), 2, rset.rows) # syt + admin |
|
142 |
|
143 def test_nonregr_set_owned_by(self): |
|
144 # test that when a user coming from ldap is triggering a transition |
|
145 # the related TrInfo has correct owner information |
|
146 self.execute('SET X in_group G WHERE X login "syt", G name "managers"') |
|
147 self.commit() |
|
148 syt = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0) |
|
149 self.assertEquals([g.name for g in syt.in_group], ['managers', 'users']) |
|
150 self.patch_authenticate() |
|
151 cnx = self.login('syt', 'dummypassword') |
|
152 cu = cnx.cursor() |
|
153 cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"') |
|
154 try: |
|
155 cnx.commit() |
|
156 alf = self.execute('EUser X WHERE X login "alf"').get_entity(0, 0) |
|
157 self.assertEquals(alf.in_state[0].name, 'deactivated') |
|
158 trinfo = alf.latest_trinfo() |
|
159 self.assertEquals(trinfo.owned_by[0].login, 'syt') |
|
160 # select from_state to skip the user's creation TrInfo |
|
161 rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' |
|
162 'WF creation_date D, WF from_state FS,' |
|
163 'WF owned_by U?, X eid %(x)s', |
|
164 {'x': alf.eid}, 'x') |
|
165 self.assertEquals(rset.rows, [[syt.eid]]) |
|
166 finally: |
|
167 # restore db state |
|
168 self.restore_connection() |
|
169 self.execute('SET X in_state S WHERE X login "alf", S name "activated"') |
|
170 self.execute('DELETE X in_group G WHERE X login "syt", G name "managers"') |
|
171 |
|
172 def test_same_column_names(self): |
|
173 self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') |
|
174 |
|
175 def test_multiple_entities_from_different_sources(self): |
|
176 self.create_user('cochon') |
|
177 self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) |
|
178 |
|
179 def test_exists1(self): |
|
180 self.add_entity('EGroup', name=u'bougloup1') |
|
181 self.add_entity('EGroup', name=u'bougloup2') |
|
182 self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
|
183 self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') |
|
184 rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
|
185 self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) |
|
186 |
|
187 def test_exists2(self): |
|
188 self.create_user('comme') |
|
189 self.create_user('cochon') |
|
190 self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
191 rset = self.execute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') |
|
192 self.assertEquals(rset.rows, [['managers'], ['users']]) |
|
193 |
|
194 def test_exists3(self): |
|
195 self.create_user('comme') |
|
196 self.create_user('cochon') |
|
197 self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
198 self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
|
199 self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"') |
|
200 self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"')) |
|
201 rset = self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
|
202 self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) |
|
203 |
|
204 def test_exists4(self): |
|
205 self.create_user('comme') |
|
206 self.create_user('cochon', groups=('users', 'guests')) |
|
207 self.create_user('billy') |
|
208 self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
209 self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
|
210 self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') |
|
211 self.execute('SET X copain Y WHERE X login "syt", Y login "billy"') |
|
212 # search for group name, login where |
|
213 # EUser copain with "comme" or "cochon" AND same login as the copain |
|
214 # OR |
|
215 # EUser in_state activated AND not copain with billy |
|
216 # |
|
217 # SO we expect everybody but "comme" and "syt" |
|
218 rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
|
219 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' |
|
220 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') |
|
221 all = self.execute('Any GN, L WHERE X in_group G, X login L, G name GN') |
|
222 all.rows.remove(['users', 'comme']) |
|
223 all.rows.remove(['users', 'syt']) |
|
224 self.assertEquals(sorted(rset.rows), sorted(all.rows)) |
|
225 |
|
226 def test_exists5(self): |
|
227 self.create_user('comme') |
|
228 self.create_user('cochon', groups=('users', 'guests')) |
|
229 self.create_user('billy') |
|
230 self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
|
231 self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
|
232 self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') |
|
233 self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"') |
|
234 rset= self.execute('Any L WHERE X login L, ' |
|
235 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
|
236 'NOT EXISTS(X copain T2, T2 login "billy")') |
|
237 self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']]) |
|
238 rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
|
239 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
|
240 'NOT EXISTS(X copain T2, T2 login "billy")') |
|
241 self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], |
|
242 ['users', 'cochon'], |
|
243 ['users', 'syt']]) |
|
244 |
|
245 |
|
246 def test_union(self): |
|
247 afeids = self.execute('State X') |
|
248 ueids = self.execute('EUser X') |
|
249 rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is EUser)') |
|
250 self.assertEquals(sorted(r[0] for r in rset.rows), |
|
251 sorted(r[0] for r in afeids + ueids)) |
|
252 |
|
253 def _init_security_test(self): |
|
254 self.create_user('iaminguestsgrouponly', groups=('guests',)) |
|
255 cnx = self.login('iaminguestsgrouponly') |
|
256 return cnx.cursor() |
|
257 |
|
258 def test_security1(self): |
|
259 cu = self._init_security_test() |
|
260 rset = cu.execute('Any X WHERE X login "syt"') |
|
261 self.assertEquals(rset.rows, []) |
|
262 rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') |
|
263 self.assertEquals(len(rset.rows), 1) |
|
264 |
|
265 def test_security2(self): |
|
266 cu = self._init_security_test() |
|
267 rset = cu.execute('Any X WHERE X has_text "syt"') |
|
268 self.assertEquals(rset.rows, []) |
|
269 rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') |
|
270 self.assertEquals(len(rset.rows), 1) |
|
271 |
|
272 def test_security3(self): |
|
273 cu = self._init_security_test() |
|
274 rset = cu.execute('Any F WHERE X has_text "syt", X firstname F') |
|
275 self.assertEquals(rset.rows, []) |
|
276 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
|
277 self.assertEquals(rset.rows, [[None]]) |
|
278 |
|
279 def test_nonregr_1(self): |
|
280 self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' |
|
281 'X modification_date AA', |
|
282 {'x': cnx.user(self.session).eid}) |
|
283 |
|
284 def test_nonregr_2(self): |
|
285 self.execute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' |
|
286 'X login L, X modification_date AA', |
|
287 {'x': cnx.user(self.session).eid}) |
|
288 |
|
289 def test_nonregr_3(self): |
|
290 self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' |
|
291 'X modification_date AA', |
|
292 {'x': cnx.user(self.session).eid}) |
|
293 |
|
294 def test_nonregr_4(self): |
|
295 emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] |
|
296 self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
|
297 {'x': emaileid}) |
|
298 |
|
299 def test_nonregr_5(self): |
|
300 # original jpl query: |
|
301 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is EUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
|
302 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
|
303 self.execute(rql, )#{'x': }) |
|
304 |
|
305 |
|
306 class GlobTrFuncTC(TestCase): |
|
307 |
|
308 def test_count(self): |
|
309 trfunc = GlobTrFunc('count', 0) |
|
310 res = trfunc.apply([[1], [2], [3], [4]]) |
|
311 self.assertEquals(res, [[4]]) |
|
312 trfunc = GlobTrFunc('count', 1) |
|
313 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
314 self.assertEquals(res, [[1, 2], [2, 1], [3, 1]]) |
|
315 |
|
316 def test_sum(self): |
|
317 trfunc = GlobTrFunc('sum', 0) |
|
318 res = trfunc.apply([[1], [2], [3], [4]]) |
|
319 self.assertEquals(res, [[10]]) |
|
320 trfunc = GlobTrFunc('sum', 1) |
|
321 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
322 self.assertEquals(res, [[1, 7], [2, 4], [3, 6]]) |
|
323 |
|
324 def test_min(self): |
|
325 trfunc = GlobTrFunc('min', 0) |
|
326 res = trfunc.apply([[1], [2], [3], [4]]) |
|
327 self.assertEquals(res, [[1]]) |
|
328 trfunc = GlobTrFunc('min', 1) |
|
329 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
330 self.assertEquals(res, [[1, 2], [2, 4], [3, 6]]) |
|
331 |
|
332 def test_max(self): |
|
333 trfunc = GlobTrFunc('max', 0) |
|
334 res = trfunc.apply([[1], [2], [3], [4]]) |
|
335 self.assertEquals(res, [[4]]) |
|
336 trfunc = GlobTrFunc('max', 1) |
|
337 res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) |
|
338 self.assertEquals(res, [[1, 5], [2, 4], [3, 6]]) |
|
339 |
|
340 |
|
341 class RQL2LDAPFilterTC(RQLGeneratorTC): |
|
342 schema = repo.schema |
|
343 |
|
344 def setUp(self): |
|
345 RQLGeneratorTC.setUp(self) |
|
346 ldapsource = repo.sources[-1] |
|
347 self.pool = repo._get_pool() |
|
348 session = mock_object(pool=self.pool) |
|
349 self.o = RQL2LDAPFilter(ldapsource, session) |
|
350 |
|
351 def tearDown(self): |
|
352 repo._free_pool(self.pool) |
|
353 RQLGeneratorTC.tearDown(self) |
|
354 |
|
355 def test_base(self): |
|
356 rqlst = self._prepare('EUser X WHERE X login "toto"').children[0] |
|
357 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
|
358 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
|
359 |
|
360 def test_kwargs(self): |
|
361 rqlst = self._prepare('EUser X WHERE X login %(x)s').children[0] |
|
362 self.o._args = {'x': "toto"} |
|
363 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
|
364 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
|
365 |
|
366 def test_get_attr(self): |
|
367 rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] |
|
368 self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') |
|
369 |
|
370 |
|
371 if __name__ == '__main__': |
|
372 unittest_main() |