30 from cubicweb.devtools.httptest import get_available_port |
30 from cubicweb.devtools.httptest import get_available_port |
31 from cubicweb.devtools import get_test_db_handler |
31 from cubicweb.devtools import get_test_db_handler |
32 |
32 |
33 from cubicweb.server.sources.ldapuser import * |
33 from cubicweb.server.sources.ldapuser import * |
34 |
34 |
35 SYT = 'syt' |
|
36 SYT_EMAIL = 'Sylvain Thenault' |
|
37 ADIM = 'adim' |
|
38 CONFIG = u'''host=%s |
35 CONFIG = u'''host=%s |
39 user-base-dn=ou=People,dc=cubicweb,dc=test |
36 user-base-dn=ou=People,dc=cubicweb,dc=test |
40 user-scope=ONELEVEL |
37 user-scope=ONELEVEL |
41 user-classes=top,posixAccount |
38 user-classes=top,posixAccount |
42 user-login-attr=uid |
39 user-login-attr=uid |
43 user-default-group=users |
40 user-default-group=users |
44 user-attrs-map=gecos:email,uid:login |
41 user-attrs-map=gecos:email,uid:login |
45 ''' |
42 ''' |
46 |
43 |
47 |
|
48 def nopwd_authenticate(self, session, login, password): |
|
49 """used to monkey patch the source to get successful authentication without |
|
50 upassword checking |
|
51 """ |
|
52 assert login, 'no login!' |
|
53 searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))] |
|
54 searchfilter.extend(self.base_filters) |
|
55 searchstr = '(&%s)' % ''.join(searchfilter) |
|
56 # first search the user |
|
57 try: |
|
58 user = self._search(session, self.user_base_dn, self.user_base_scope, |
|
59 searchstr)[0] |
|
60 except IndexError: |
|
61 # no such user |
|
62 raise AuthenticationError() |
|
63 # don't check upassword ! |
|
64 return self.repo.extid2eid(self, user['dn'], 'CWUser', session) |
|
65 |
44 |
66 def setUpModule(*args): |
45 def setUpModule(*args): |
67 create_slapd_configuration(LDAPUserSourceTC.config) |
46 create_slapd_configuration(LDAPUserSourceTC.config) |
68 |
47 |
69 def tearDownModule(*args): |
48 def tearDownModule(*args): |
145 source = self.repo.sources_by_uri['ldapuser'] |
115 source = self.repo.sources_by_uri['ldapuser'] |
146 source.synchronize() |
116 source.synchronize() |
147 |
117 |
148 def test_base(self): |
118 def test_base(self): |
149 # check a known one |
119 # check a known one |
150 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) |
120 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
151 e = rset.get_entity(0, 0) |
121 e = rset.get_entity(0, 0) |
152 self.assertEqual(e.login, SYT) |
122 self.assertEqual(e.login, 'syt') |
153 e.complete() |
123 e.complete() |
154 self.assertEqual(e.creation_date, None) |
124 self.assertEqual(e.creation_date, None) |
155 self.assertEqual(e.modification_date, None) |
125 self.assertEqual(e.modification_date, None) |
156 self.assertEqual(e.firstname, None) |
126 self.assertEqual(e.firstname, None) |
157 self.assertEqual(e.surname, None) |
127 self.assertEqual(e.surname, None) |
158 self.assertEqual(e.in_group[0].name, 'users') |
128 self.assertEqual(e.in_group[0].name, 'users') |
159 self.assertEqual(e.owned_by[0].login, SYT) |
129 self.assertEqual(e.owned_by[0].login, 'syt') |
160 self.assertEqual(e.created_by, ()) |
130 self.assertEqual(e.created_by, ()) |
161 self.assertEqual(e.primary_email[0].address, SYT_EMAIL) |
131 self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault') |
162 # email content should be indexed on the user |
132 # email content should be indexed on the user |
163 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
133 rset = self.sexecute('CWUser X WHERE X has_text "thenault"') |
164 self.assertEqual(rset.rows, [[e.eid]]) |
134 self.assertEqual(rset.rows, [[e.eid]]) |
165 |
135 |
166 def test_not(self): |
136 def test_not(self): |
167 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
137 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
168 rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) |
138 rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) |
169 self.assert_(rset) |
139 self.assert_(rset) |
170 self.assert_(not eid in (r[0] for r in rset)) |
140 self.assert_(not eid in (r[0] for r in rset)) |
171 |
141 |
172 def test_multiple(self): |
142 def test_multiple(self): |
173 seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
143 seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
174 aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM})[0][0] |
144 aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] |
175 rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s', |
145 rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s', |
176 {'syt': SYT, 'adim': ADIM}) |
146 {'syt': 'syt', 'adim': 'adim'}) |
177 self.assertEqual(rset.rows, [[seid, aeid]]) |
147 self.assertEqual(rset.rows, [[seid, aeid]]) |
178 rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s', |
148 rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s', |
179 {'syt': SYT, 'adim': ADIM}) |
149 {'syt': 'syt', 'adim': 'adim'}) |
180 self.assertEqual(rset.rows, [[seid, aeid, SYT]]) |
150 self.assertEqual(rset.rows, [[seid, aeid, 'syt']]) |
181 |
151 |
182 def test_in(self): |
152 def test_in(self): |
183 seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
153 seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
184 aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM})[0][0] |
154 aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] |
185 rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % (SYT, ADIM)) |
155 rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim')) |
186 self.assertEqual(rset.rows, [[aeid, ADIM], [seid, SYT]]) |
156 self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) |
187 |
157 |
188 def test_relations(self): |
158 def test_relations(self): |
189 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
159 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
190 rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') |
160 rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') |
191 self.assert_(eid in (r[0] for r in rset)) |
161 self.assert_(eid in (r[0] for r in rset)) |
192 rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') |
162 rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') |
193 self.assert_(SYT in (r[1] for r in rset)) |
163 self.assert_('syt' in (r[1] for r in rset)) |
194 |
164 |
195 def test_count(self): |
165 def test_count(self): |
196 nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] |
166 nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] |
197 # just check this is a possible number |
167 # just check this is a possible number |
198 self.assert_(nbusers > 1, nbusers) |
168 self.assert_(nbusers > 1, nbusers) |
199 self.assert_(nbusers < 30, nbusers) |
169 self.assert_(nbusers < 30, nbusers) |
200 |
170 |
201 def test_upper(self): |
171 def test_upper(self): |
202 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
172 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
203 rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) |
173 rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) |
204 self.assertEqual(rset[0][0], SYT.upper()) |
174 self.assertEqual(rset[0][0], 'syt'.upper()) |
205 |
175 |
206 def test_unknown_attr(self): |
176 def test_unknown_attr(self): |
207 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
177 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
208 rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' |
178 rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' |
209 'X creation_date C, X modification_date M' % eid) |
179 'X creation_date C, X modification_date M' % eid) |
210 self.assertEqual(rset[0][0], SYT) |
180 self.assertEqual(rset[0][0], 'syt') |
211 self.assertEqual(rset[0][1], None) |
181 self.assertEqual(rset[0][1], None) |
212 self.assertEqual(rset[0][2], None) |
182 self.assertEqual(rset[0][2], None) |
213 |
183 |
214 def test_sort(self): |
184 def test_sort(self): |
215 logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')] |
185 logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')] |
219 logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')] |
189 logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')] |
220 self.assertEqual(logins, sorted(logins)) |
190 self.assertEqual(logins, sorted(logins)) |
221 |
191 |
222 def test_or(self): |
192 def test_or(self): |
223 rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")', |
193 rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")', |
224 {'login': SYT}) |
194 {'login': 'syt'}) |
225 self.assertEqual(len(rset), 2, rset.rows) # syt + admin |
195 self.assertEqual(len(rset), 2, rset.rows) # syt + admin |
226 |
196 |
227 def test_nonregr_set_owned_by(self): |
197 def test_nonregr_set_owned_by(self): |
228 # test that when a user coming from ldap is triggering a transition |
198 # test that when a user coming from ldap is triggering a transition |
229 # the related TrInfo has correct owner information |
199 # the related TrInfo has correct owner information |
230 self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': SYT}) |
200 self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) |
231 self.commit() |
201 self.commit() |
232 syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}).get_entity(0, 0) |
202 syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0) |
233 self.assertEqual([g.name for g in syt.in_group], ['managers', 'users']) |
203 self.assertEqual([g.name for g in syt.in_group], ['managers', 'users']) |
234 self.patch_authenticate() |
204 cnx = self.login('syt', password='syt') |
235 cnx = self.login(SYT, password='dummypassword') |
|
236 cu = cnx.cursor() |
205 cu = cnx.cursor() |
237 adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': ADIM}).get_entity(0, 0) |
206 adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) |
238 iworkflowable = adim.cw_adapt_to('IWorkflowable') |
207 iworkflowable = adim.cw_adapt_to('IWorkflowable') |
239 iworkflowable.fire_transition('deactivate') |
208 iworkflowable.fire_transition('deactivate') |
240 try: |
209 try: |
241 cnx.commit() |
210 cnx.commit() |
242 adim.cw_clear_all_caches() |
211 adim.cw_clear_all_caches() |
243 self.assertEqual(adim.in_state[0].name, 'deactivated') |
212 self.assertEqual(adim.in_state[0].name, 'deactivated') |
244 trinfo = iworkflowable.latest_trinfo() |
213 trinfo = iworkflowable.latest_trinfo() |
245 self.assertEqual(trinfo.owned_by[0].login, SYT) |
214 self.assertEqual(trinfo.owned_by[0].login, 'syt') |
246 # select from_state to skip the user's creation TrInfo |
215 # select from_state to skip the user's creation TrInfo |
247 rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' |
216 rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' |
248 'WF creation_date D, WF from_state FS,' |
217 'WF creation_date D, WF from_state FS,' |
249 'WF owned_by U?, X eid %(x)s', |
218 'WF owned_by U?, X eid %(x)s', |
250 {'x': adim.eid}) |
219 {'x': adim.eid}) |
251 self.assertEqual(rset.rows, [[syt.eid]]) |
220 self.assertEqual(rset.rows, [[syt.eid]]) |
252 finally: |
221 finally: |
253 # restore db state |
222 # restore db state |
254 self.restore_connection() |
223 self.restore_connection() |
255 adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': ADIM}).get_entity(0, 0) |
224 adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) |
256 adim.cw_adapt_to('IWorkflowable').fire_transition('activate') |
225 adim.cw_adapt_to('IWorkflowable').fire_transition('activate') |
257 self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': SYT}) |
226 self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) |
258 |
227 |
259 def test_same_column_names(self): |
228 def test_same_column_names(self): |
260 self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') |
229 self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') |
261 |
230 |
262 def test_multiple_entities_from_different_sources(self): |
231 def test_multiple_entities_from_different_sources(self): |
263 req = self.request() |
232 req = self.request() |
264 self.create_user(req, 'cochon') |
233 self.create_user(req, 'cochon') |
265 self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT})) |
234 self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) |
266 |
235 |
267 def test_exists1(self): |
236 def test_exists1(self): |
268 self.session.set_cnxset() |
237 self.session.set_cnxset() |
269 self.session.create_entity('CWGroup', name=u'bougloup1') |
238 self.session.create_entity('CWGroup', name=u'bougloup1') |
270 self.session.create_entity('CWGroup', name=u'bougloup2') |
239 self.session.create_entity('CWGroup', name=u'bougloup2') |
271 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
240 self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
272 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': SYT}) |
241 self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'}) |
273 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' |
242 rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' |
274 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
243 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
275 self.assertEqual(rset.rows, [['admin', 'activated'], [SYT, 'activated']]) |
244 self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) |
276 |
245 |
277 def test_exists2(self): |
246 def test_exists2(self): |
278 req = self.request() |
247 req = self.request() |
279 self.create_user(req, 'comme') |
248 self.create_user(req, 'comme') |
280 self.create_user(req, 'cochon') |
249 self.create_user(req, 'cochon') |
287 req = self.request() |
256 req = self.request() |
288 self.create_user(req, 'comme') |
257 self.create_user(req, 'comme') |
289 self.create_user(req, 'cochon') |
258 self.create_user(req, 'cochon') |
290 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
259 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
291 self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
260 self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) |
292 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) |
261 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) |
293 self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': SYT})) |
262 self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) |
294 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' |
263 rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' |
295 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
264 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') |
296 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', SYT]]) |
265 self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) |
297 |
266 |
298 def test_exists4(self): |
267 def test_exists4(self): |
299 req = self.request() |
268 req = self.request() |
300 self.create_user(req, 'comme') |
269 self.create_user(req, 'comme') |
301 self.create_user(req, 'cochon', groups=('users', 'guests')) |
270 self.create_user(req, 'cochon', groups=('users', 'guests')) |
302 self.create_user(req, 'billy') |
271 self.create_user(req, 'billy') |
303 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
272 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
304 self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
273 self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
305 self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') |
274 self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') |
306 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': SYT}) |
275 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'}) |
307 # search for group name, login where |
276 # search for group name, login where |
308 # CWUser copain with "comme" or "cochon" AND same login as the copain |
277 # CWUser copain with "comme" or "cochon" AND same login as the copain |
309 # OR |
278 # OR |
310 # CWUser in_state activated AND not copain with billy |
279 # CWUser in_state activated AND not copain with billy |
311 # |
280 # |
313 rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
282 rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
314 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' |
283 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' |
315 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') |
284 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') |
316 all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') |
285 all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') |
317 all.rows.remove(['users', 'comme']) |
286 all.rows.remove(['users', 'comme']) |
318 all.rows.remove(['users', SYT]) |
287 all.rows.remove(['users', 'syt']) |
319 self.assertEqual(sorted(rset.rows), sorted(all.rows)) |
288 self.assertEqual(sorted(rset.rows), sorted(all.rows)) |
320 |
289 |
321 def test_exists5(self): |
290 def test_exists5(self): |
322 req = self.request() |
291 req = self.request() |
323 self.create_user(req, 'comme') |
292 self.create_user(req, 'comme') |
324 self.create_user(req, 'cochon', groups=('users', 'guests')) |
293 self.create_user(req, 'cochon', groups=('users', 'guests')) |
325 self.create_user(req, 'billy') |
294 self.create_user(req, 'billy') |
326 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
295 self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
327 self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
296 self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
328 self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') |
297 self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') |
329 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': SYT}) |
298 self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) |
330 rset= self.sexecute('Any L WHERE X login L, ' |
299 rset= self.sexecute('Any L WHERE X login L, ' |
331 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
300 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
332 'NOT EXISTS(X copain T2, T2 login "billy")') |
301 'NOT EXISTS(X copain T2, T2 login "billy")') |
333 self.assertEqual(sorted(rset.rows), [['cochon'], [SYT]]) |
302 self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']]) |
334 rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
303 rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
335 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
304 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' |
336 'NOT EXISTS(X copain T2, T2 login "billy")') |
305 'NOT EXISTS(X copain T2, T2 login "billy")') |
337 self.assertEqual(sorted(rset.rows), [['guests', 'cochon'], |
306 self.assertEqual(sorted(rset.rows), [['guests', 'cochon'], |
338 ['users', 'cochon'], |
307 ['users', 'cochon'], |
339 ['users', SYT]]) |
308 ['users', 'syt']]) |
340 |
309 |
341 def test_cd_restriction(self): |
310 def test_cd_restriction(self): |
342 rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') |
311 rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') |
343 # admin/anon but no ldap user since it doesn't support creation_date |
312 # admin/anon but no ldap user since it doesn't support creation_date |
344 self.assertEqual(sorted(e.login for e in rset.entities()), |
313 self.assertEqual(sorted(e.login for e in rset.entities()), |
357 cnx = self.login('iaminguestsgrouponly') |
326 cnx = self.login('iaminguestsgrouponly') |
358 return cnx.cursor() |
327 return cnx.cursor() |
359 |
328 |
360 def test_security1(self): |
329 def test_security1(self): |
361 cu = self._init_security_test() |
330 cu = self._init_security_test() |
362 rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': SYT}) |
331 rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
363 self.assertEqual(rset.rows, []) |
332 self.assertEqual(rset.rows, []) |
364 rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') |
333 rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') |
365 self.assertEqual(len(rset.rows), 1) |
334 self.assertEqual(len(rset.rows), 1) |
366 |
335 |
367 def test_security2(self): |
336 def test_security2(self): |
368 cu = self._init_security_test() |
337 cu = self._init_security_test() |
369 rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': SYT}) |
338 rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'}) |
370 self.assertEqual(rset.rows, []) |
339 self.assertEqual(rset.rows, []) |
371 rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') |
340 rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') |
372 self.assertEqual(len(rset.rows), 1) |
341 self.assertEqual(len(rset.rows), 1) |
373 |
342 |
374 def test_security3(self): |
343 def test_security3(self): |
375 cu = self._init_security_test() |
344 cu = self._init_security_test() |
376 rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': SYT}) |
345 rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) |
377 self.assertEqual(rset.rows, []) |
346 self.assertEqual(rset.rows, []) |
378 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
347 rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') |
379 self.assertEqual(rset.rows, [[None]]) |
348 self.assertEqual(rset.rows, [[None]]) |
380 |
349 |
381 def test_copy_to_system_source(self): |
350 def test_copy_to_system_source(self): |
382 source = self.repo.sources_by_uri['ldapuser'] |
351 source = self.repo.sources_by_uri['ldapuser'] |
383 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT})[0][0] |
352 eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] |
384 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
353 self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid}) |
385 self.commit() |
354 self.commit() |
386 source.reset_caches() |
355 source.reset_caches() |
387 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': SYT}) |
356 rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) |
388 self.assertEqual(len(rset), 1) |
357 self.assertEqual(len(rset), 1) |
389 e = rset.get_entity(0, 0) |
358 e = rset.get_entity(0, 0) |
390 self.assertEqual(e.eid, eid) |
359 self.assertEqual(e.eid, eid) |
391 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False}, |
360 self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native', 'uri': u'system', 'use-cwuri-as-url': False}, |
392 'type': 'CWUser', |
361 'type': 'CWUser', |