77 self.assertEquals(e.in_group[0].name, 'users') |
77 self.assertEquals(e.in_group[0].name, 'users') |
78 self.assertEquals(e.owned_by[0].login, 'syt') |
78 self.assertEquals(e.owned_by[0].login, 'syt') |
79 self.assertEquals(e.created_by, []) |
79 self.assertEquals(e.created_by, []) |
80 self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault') |
80 self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault') |
81 # email content should be indexed on the user |
81 # email content should be indexed on the user |
82 rset = self.execute('EUser X WHERE X has_text "thenault"') |
82 rset = self.execute('CWUser X WHERE X has_text "thenault"') |
83 self.assertEquals(rset.rows, [[e.eid]]) |
83 self.assertEquals(rset.rows, [[e.eid]]) |
84 |
84 |
85 def test_not(self): |
85 def test_not(self): |
86 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
86 eid = self.execute('CWUser X WHERE X login "syt"')[0][0] |
87 rset = self.execute('EUser X WHERE NOT X eid %s' % eid) |
87 rset = self.execute('CWUser X WHERE NOT X eid %s' % eid) |
88 self.assert_(rset) |
88 self.assert_(rset) |
89 self.assert_(not eid in (r[0] for r in rset)) |
89 self.assert_(not eid in (r[0] for r in rset)) |
90 |
90 |
91 def test_multiple(self): |
91 def test_multiple(self): |
92 seid = self.execute('EUser X WHERE X login "syt"')[0][0] |
92 seid = self.execute('CWUser X WHERE X login "syt"')[0][0] |
93 aeid = self.execute('EUser X WHERE X login "adim"')[0][0] |
93 aeid = self.execute('CWUser X WHERE X login "adim"')[0][0] |
94 rset = self.execute('EUser X, Y WHERE X login "syt", Y login "adim"') |
94 rset = self.execute('CWUser X, Y WHERE X login "syt", Y login "adim"') |
95 self.assertEquals(rset.rows, [[seid, aeid]]) |
95 self.assertEquals(rset.rows, [[seid, aeid]]) |
96 rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"') |
96 rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"') |
97 self.assertEquals(rset.rows, [[seid, aeid, 'syt']]) |
97 self.assertEquals(rset.rows, [[seid, aeid, 'syt']]) |
98 |
98 |
99 def test_in(self): |
99 def test_in(self): |
100 seid = self.execute('EUser X WHERE X login "syt"')[0][0] |
100 seid = self.execute('CWUser X WHERE X login "syt"')[0][0] |
101 aeid = self.execute('EUser X WHERE X login "adim"')[0][0] |
101 aeid = self.execute('CWUser X WHERE X login "adim"')[0][0] |
102 rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L') |
102 rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L') |
103 self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) |
103 self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) |
104 |
104 |
105 def test_relations(self): |
105 def test_relations(self): |
106 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
106 eid = self.execute('CWUser X WHERE X login "syt"')[0][0] |
107 rset = self.execute('Any X,E WHERE X is EUser, X login L, X primary_email E') |
107 rset = self.execute('Any X,E WHERE X is CWUser, X login L, X primary_email E') |
108 self.assert_(eid in (r[0] for r in rset)) |
108 self.assert_(eid in (r[0] for r in rset)) |
109 rset = self.execute('Any X,L,E WHERE X is EUser, X login L, X primary_email E') |
109 rset = self.execute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') |
110 self.assert_('syt' in (r[1] for r in rset)) |
110 self.assert_('syt' in (r[1] for r in rset)) |
111 |
111 |
112 def test_count(self): |
112 def test_count(self): |
113 nbusers = self.execute('Any COUNT(X) WHERE X is EUser')[0][0] |
113 nbusers = self.execute('Any COUNT(X) WHERE X is CWUser')[0][0] |
114 # just check this is a possible number |
114 # just check this is a possible number |
115 self.assert_(nbusers > 1, nbusers) |
115 self.assert_(nbusers > 1, nbusers) |
116 self.assert_(nbusers < 30, nbusers) |
116 self.assert_(nbusers < 30, nbusers) |
117 |
117 |
118 def test_upper(self): |
118 def test_upper(self): |
119 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
119 eid = self.execute('CWUser X WHERE X login "syt"')[0][0] |
120 rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid) |
120 rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid) |
121 self.assertEquals(rset[0][0], 'SYT') |
121 self.assertEquals(rset[0][0], 'SYT') |
122 |
122 |
123 def test_unknown_attr(self): |
123 def test_unknown_attr(self): |
124 eid = self.execute('EUser X WHERE X login "syt"')[0][0] |
124 eid = self.execute('CWUser X WHERE X login "syt"')[0][0] |
125 rset = self.execute('Any L,C,M WHERE X eid %s, X login L, ' |
125 rset = self.execute('Any L,C,M WHERE X eid %s, X login L, ' |
126 'X creation_date C, X modification_date M' % eid) |
126 'X creation_date C, X modification_date M' % eid) |
127 self.assertEquals(rset[0][0], 'syt') |
127 self.assertEquals(rset[0][0], 'syt') |
128 self.assertEquals(rset[0][1], None) |
128 self.assertEquals(rset[0][1], None) |
129 self.assertEquals(rset[0][2], None) |
129 self.assertEquals(rset[0][2], None) |
143 def test_nonregr_set_owned_by(self): |
143 def test_nonregr_set_owned_by(self): |
144 # test that when a user coming from ldap is triggering a transition |
144 # test that when a user coming from ldap is triggering a transition |
145 # the related TrInfo has correct owner information |
145 # the related TrInfo has correct owner information |
146 self.execute('SET X in_group G WHERE X login "syt", G name "managers"') |
146 self.execute('SET X in_group G WHERE X login "syt", G name "managers"') |
147 self.commit() |
147 self.commit() |
148 syt = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0) |
148 syt = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0) |
149 self.assertEquals([g.name for g in syt.in_group], ['managers', 'users']) |
149 self.assertEquals([g.name for g in syt.in_group], ['managers', 'users']) |
150 self.patch_authenticate() |
150 self.patch_authenticate() |
151 cnx = self.login('syt', 'dummypassword') |
151 cnx = self.login('syt', 'dummypassword') |
152 cu = cnx.cursor() |
152 cu = cnx.cursor() |
153 cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"') |
153 cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"') |
154 try: |
154 try: |
155 cnx.commit() |
155 cnx.commit() |
156 alf = self.execute('EUser X WHERE X login "alf"').get_entity(0, 0) |
156 alf = self.execute('CWUser X WHERE X login "alf"').get_entity(0, 0) |
157 self.assertEquals(alf.in_state[0].name, 'deactivated') |
157 self.assertEquals(alf.in_state[0].name, 'deactivated') |
158 trinfo = alf.latest_trinfo() |
158 trinfo = alf.latest_trinfo() |
159 self.assertEquals(trinfo.owned_by[0].login, 'syt') |
159 self.assertEquals(trinfo.owned_by[0].login, 'syt') |
160 # select from_state to skip the user's creation TrInfo |
160 # select from_state to skip the user's creation TrInfo |
161 rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' |
161 rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' |
175 def test_multiple_entities_from_different_sources(self): |
175 def test_multiple_entities_from_different_sources(self): |
176 self.create_user('cochon') |
176 self.create_user('cochon') |
177 self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) |
177 self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"')) |
178 |
178 |
179 def test_exists1(self): |
179 def test_exists1(self): |
180 self.add_entity('EGroup', name=u'bougloup1') |
180 self.add_entity('CWGroup', name=u'bougloup1') |
181 self.add_entity('EGroup', name=u'bougloup2') |
181 self.add_entity('CWGroup', name=u'bougloup2') |
182 self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
182 self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') |
183 self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') |
183 self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"') |
184 rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
184 rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') |
185 self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) |
185 self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) |
186 |
186 |
208 self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
208 self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"') |
209 self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
209 self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"') |
210 self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') |
210 self.execute('SET X copain Y WHERE X login "comme", Y login "billy"') |
211 self.execute('SET X copain Y WHERE X login "syt", Y login "billy"') |
211 self.execute('SET X copain Y WHERE X login "syt", Y login "billy"') |
212 # search for group name, login where |
212 # search for group name, login where |
213 # EUser copain with "comme" or "cochon" AND same login as the copain |
213 # CWUser copain with "comme" or "cochon" AND same login as the copain |
214 # OR |
214 # OR |
215 # EUser in_state activated AND not copain with billy |
215 # CWUser in_state activated AND not copain with billy |
216 # |
216 # |
217 # SO we expect everybody but "comme" and "syt" |
217 # SO we expect everybody but "comme" and "syt" |
218 rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
218 rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, ' |
219 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' |
219 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' |
220 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') |
220 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') |
241 self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], |
241 self.assertEquals(sorted(rset.rows), [['guests', 'cochon'], |
242 ['users', 'cochon'], |
242 ['users', 'cochon'], |
243 ['users', 'syt']]) |
243 ['users', 'syt']]) |
244 |
244 |
245 def test_cd_restriction(self): |
245 def test_cd_restriction(self): |
246 rset = self.execute('EUser X WHERE X creation_date > "2009-02-01"') |
246 rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"') |
247 self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date |
247 self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date |
248 |
248 |
249 def test_union(self): |
249 def test_union(self): |
250 afeids = self.execute('State X') |
250 afeids = self.execute('State X') |
251 ueids = self.execute('EUser X') |
251 ueids = self.execute('CWUser X') |
252 rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is EUser)') |
252 rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') |
253 self.assertEquals(sorted(r[0] for r in rset.rows), |
253 self.assertEquals(sorted(r[0] for r in rset.rows), |
254 sorted(r[0] for r in afeids + ueids)) |
254 sorted(r[0] for r in afeids + ueids)) |
255 |
255 |
256 def _init_security_test(self): |
256 def _init_security_test(self): |
257 self.create_user('iaminguestsgrouponly', groups=('guests',)) |
257 self.create_user('iaminguestsgrouponly', groups=('guests',)) |
299 self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
299 self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', |
300 {'x': emaileid}) |
300 {'x': emaileid}) |
301 |
301 |
302 def test_nonregr5(self): |
302 def test_nonregr5(self): |
303 # original jpl query: |
303 # original jpl query: |
304 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is EUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
304 # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 |
305 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
305 rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login |
306 self.execute(rql, )#{'x': }) |
306 self.execute(rql, )#{'x': }) |
307 |
307 |
308 def test_nonregr6(self): |
308 def test_nonregr6(self): |
309 self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
309 self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' |
310 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
310 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' |
311 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
311 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' |
312 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is EUser)', |
312 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', |
313 {'x': self.session.user.eid}) |
313 {'x': self.session.user.eid}) |
314 |
314 |
315 |
315 |
316 class GlobTrFuncTC(TestCase): |
316 class GlobTrFuncTC(TestCase): |
317 |
317 |
361 def tearDown(self): |
361 def tearDown(self): |
362 repo._free_pool(self.pool) |
362 repo._free_pool(self.pool) |
363 RQLGeneratorTC.tearDown(self) |
363 RQLGeneratorTC.tearDown(self) |
364 |
364 |
365 def test_base(self): |
365 def test_base(self): |
366 rqlst = self._prepare('EUser X WHERE X login "toto"').children[0] |
366 rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] |
367 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
367 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
368 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
368 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
369 |
369 |
370 def test_kwargs(self): |
370 def test_kwargs(self): |
371 rqlst = self._prepare('EUser X WHERE X login %(x)s').children[0] |
371 rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] |
372 self.o._args = {'x': "toto"} |
372 self.o._args = {'x': "toto"} |
373 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
373 self.assertEquals(self.o.generate(rqlst, 'X')[1], |
374 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
374 '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))') |
375 |
375 |
376 def test_get_attr(self): |
376 def test_get_attr(self): |