server/test/unittest_ldapuser.py
changeset 2773 b2530e3e0afb
parent 1977 606923dff11b
child 2968 0e3460341023
equal deleted inserted replaced
2767:58c519e5a31f 2773:b2530e3e0afb
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
     6 :license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses
     7 """
     7 """
     8 
     8 
     9 from logilab.common.testlib import TestCase, unittest_main, mock_object
     9 from logilab.common.testlib import TestCase, unittest_main, mock_object
    10 from cubicweb.devtools import init_test_database, TestServerConfiguration
    10 from cubicweb.devtools import TestServerConfiguration
    11 from cubicweb.devtools.apptest import RepositoryBasedTC
    11 from cubicweb.devtools.testlib import CubicWebTC
    12 from cubicweb.devtools.repotest import RQLGeneratorTC
    12 from cubicweb.devtools.repotest import RQLGeneratorTC
    13 
    13 
    14 from cubicweb.server.sources.ldapuser import *
    14 from cubicweb.server.sources.ldapuser import *
    15 
    15 
    16 def nopwd_authenticate(self, session, login, upassword):
    16 def nopwd_authenticate(self, session, login, upassword):
    32     # don't check upassword !
    32     # don't check upassword !
    33     return self.extid2eid(user['dn'], 'CWUser', session)
    33     return self.extid2eid(user['dn'], 'CWUser', session)
    34 
    34 
    35 
    35 
    36 
    36 
    37 config = TestServerConfiguration('data')
    37 class LDAPUserSourceTC(CubicWebTC):
    38 config.sources_file = lambda : 'data/sourcesldap'
    38     config = TestServerConfiguration('data')
    39 repo, cnx = init_test_database('sqlite', config=config)
    39     config.sources_file = lambda : 'data/sourcesldap'
    40 
       
    41 class LDAPUserSourceTC(RepositoryBasedTC):
       
    42     repo, cnx = repo, cnx
       
    43 
    40 
    44     def patch_authenticate(self):
    41     def patch_authenticate(self):
    45         self._orig_authenticate = LDAPUserSource.authenticate
    42         self._orig_authenticate = LDAPUserSource.authenticate
    46         LDAPUserSource.authenticate = nopwd_authenticate
    43         LDAPUserSource.authenticate = nopwd_authenticate
    47 
    44 
    48     def setUp(self):
    45     def setup_database(self):
    49         self._prepare()
       
    50         # XXX: need this first query else we get 'database is locked' from
    46         # XXX: need this first query else we get 'database is locked' from
    51         # sqlite since it doesn't support multiple connections on the same
    47         # sqlite since it doesn't support multiple connections on the same
    52         # database
    48         # database
    53         # so doing, ldap inserted users don't get removed between each test
    49         # so doing, ldap inserted users don't get removed between each test
    54         rset = self.execute('CWUser X')
    50         rset = self.sexecute('CWUser X')
    55         self.commit()
       
    56         # check we get some users from ldap
    51         # check we get some users from ldap
    57         self.assert_(len(rset) > 1)
    52         self.assert_(len(rset) > 1)
    58         self.maxeid = self.execute('Any MAX(X)')[0][0]
       
    59 
    53 
    60     def tearDown(self):
    54     def tearDown(self):
    61         if hasattr(self, '_orig_authenticate'):
    55         if hasattr(self, '_orig_authenticate'):
    62             LDAPUserSource.authenticate = self._orig_authenticate
    56             LDAPUserSource.authenticate = self._orig_authenticate
    63         RepositoryBasedTC.tearDown(self)
    57         CubicWebTC.tearDown(self)
    64 
    58 
    65     def test_authenticate(self):
    59     def test_authenticate(self):
    66         source = self.repo.sources_by_uri['ldapuser']
    60         source = self.repo.sources_by_uri['ldapuser']
       
    61         self.session.set_pool()
    67         self.assertRaises(AuthenticationError,
    62         self.assertRaises(AuthenticationError,
    68                           source.authenticate, self.session, 'toto', 'toto')
    63                           source.authenticate, self.session, 'toto', 'toto')
    69 
    64 
    70     def test_synchronize(self):
    65     def test_synchronize(self):
    71         source = self.repo.sources_by_uri['ldapuser']
    66         source = self.repo.sources_by_uri['ldapuser']
    72         source.synchronize()
    67         source.synchronize()
    73 
    68 
    74     def test_base(self):
    69     def test_base(self):
    75         # check a known one
    70         # check a known one
    76         e = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
    71         e = self.sexecute('CWUser X WHERE X login "syt"').get_entity(0, 0)
    77         self.assertEquals(e.login, 'syt')
    72         self.assertEquals(e.login, 'syt')
    78         e.complete()
    73         e.complete()
    79         self.assertEquals(e.creation_date, None)
    74         self.assertEquals(e.creation_date, None)
    80         self.assertEquals(e.modification_date, None)
    75         self.assertEquals(e.modification_date, None)
    81         self.assertEquals(e.firstname, None)
    76         self.assertEquals(e.firstname, None)
    83         self.assertEquals(e.in_group[0].name, 'users')
    78         self.assertEquals(e.in_group[0].name, 'users')
    84         self.assertEquals(e.owned_by[0].login, 'syt')
    79         self.assertEquals(e.owned_by[0].login, 'syt')
    85         self.assertEquals(e.created_by, [])
    80         self.assertEquals(e.created_by, [])
    86         self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault')
    81         self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault')
    87         # email content should be indexed on the user
    82         # email content should be indexed on the user
    88         rset = self.execute('CWUser X WHERE X has_text "thenault"')
    83         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
    89         self.assertEquals(rset.rows, [[e.eid]])
    84         self.assertEquals(rset.rows, [[e.eid]])
    90 
    85 
    91     def test_not(self):
    86     def test_not(self):
    92         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
    87         eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
    93         rset = self.execute('CWUser X WHERE NOT X eid %s' % eid)
    88         rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
    94         self.assert_(rset)
    89         self.assert_(rset)
    95         self.assert_(not eid in (r[0] for r in rset))
    90         self.assert_(not eid in (r[0] for r in rset))
    96 
    91 
    97     def test_multiple(self):
    92     def test_multiple(self):
    98         seid = self.execute('CWUser X WHERE X login "syt"')[0][0]
    93         seid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
    99         aeid = self.execute('CWUser X WHERE X login "adim"')[0][0]
    94         aeid = self.sexecute('CWUser X WHERE X login "adim"')[0][0]
   100         rset = self.execute('CWUser X, Y WHERE X login "syt", Y login "adim"')
    95         rset = self.sexecute('CWUser X, Y WHERE X login "syt", Y login "adim"')
   101         self.assertEquals(rset.rows, [[seid, aeid]])
    96         self.assertEquals(rset.rows, [[seid, aeid]])
   102         rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
    97         rset = self.sexecute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
   103         self.assertEquals(rset.rows, [[seid, aeid, 'syt']])
    98         self.assertEquals(rset.rows, [[seid, aeid, 'syt']])
   104 
    99 
   105     def test_in(self):
   100     def test_in(self):
   106         seid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   101         seid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
   107         aeid = self.execute('CWUser X WHERE X login "adim"')[0][0]
   102         aeid = self.sexecute('CWUser X WHERE X login "adim"')[0][0]
   108         rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
   103         rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
   109         self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
   104         self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
   110 
   105 
   111     def test_relations(self):
   106     def test_relations(self):
   112         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   107         eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
   113         rset = self.execute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
   108         rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
   114         self.assert_(eid in (r[0] for r in rset))
   109         self.assert_(eid in (r[0] for r in rset))
   115         rset = self.execute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
   110         rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
   116         self.assert_('syt' in (r[1] for r in rset))
   111         self.assert_('syt' in (r[1] for r in rset))
   117 
   112 
   118     def test_count(self):
   113     def test_count(self):
   119         nbusers = self.execute('Any COUNT(X) WHERE X is CWUser')[0][0]
   114         nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
   120         # just check this is a possible number
   115         # just check this is a possible number
   121         self.assert_(nbusers > 1, nbusers)
   116         self.assert_(nbusers > 1, nbusers)
   122         self.assert_(nbusers < 30, nbusers)
   117         self.assert_(nbusers < 30, nbusers)
   123 
   118 
   124     def test_upper(self):
   119     def test_upper(self):
   125         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   120         eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
   126         rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
   121         rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
   127         self.assertEquals(rset[0][0], 'SYT')
   122         self.assertEquals(rset[0][0], 'SYT')
   128 
   123 
   129     def test_unknown_attr(self):
   124     def test_unknown_attr(self):
   130         eid = self.execute('CWUser X WHERE X login "syt"')[0][0]
   125         eid = self.sexecute('CWUser X WHERE X login "syt"')[0][0]
   131         rset = self.execute('Any L,C,M WHERE X eid %s, X login L, '
   126         rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
   132                             'X creation_date C, X modification_date M' % eid)
   127                             'X creation_date C, X modification_date M' % eid)
   133         self.assertEquals(rset[0][0], 'syt')
   128         self.assertEquals(rset[0][0], 'syt')
   134         self.assertEquals(rset[0][1], None)
   129         self.assertEquals(rset[0][1], None)
   135         self.assertEquals(rset[0][2], None)
   130         self.assertEquals(rset[0][2], None)
   136 
   131 
   137     def test_sort(self):
   132     def test_sort(self):
   138         logins = [l for l, in self.execute('Any L ORDERBY L WHERE X login L')]
   133         logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
   139         self.assertEquals(logins, sorted(logins))
   134         self.assertEquals(logins, sorted(logins))
   140 
   135 
   141     def test_lower_sort(self):
   136     def test_lower_sort(self):
   142         logins = [l for l, in self.execute('Any L ORDERBY lower(L) WHERE X login L')]
   137         logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
   143         self.assertEquals(logins, sorted(logins))
   138         self.assertEquals(logins, sorted(logins))
   144 
   139 
   145     def test_or(self):
   140     def test_or(self):
   146         rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
   141         rset = self.sexecute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
   147         self.assertEquals(len(rset), 2, rset.rows) # syt + admin
   142         self.assertEquals(len(rset), 2, rset.rows) # syt + admin
   148 
   143 
   149     def test_nonregr_set_owned_by(self):
   144     def test_nonregr_set_owned_by(self):
   150         # test that when a user coming from ldap is triggering a transition
   145         # test that when a user coming from ldap is triggering a transition
   151         # the related TrInfo has correct owner information
   146         # the related TrInfo has correct owner information
   152         self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
   147         self.sexecute('SET X in_group G WHERE X login "syt", G name "managers"')
   153         self.commit()
   148         self.commit()
   154         syt = self.execute('CWUser X WHERE X login "syt"').get_entity(0, 0)
   149         syt = self.sexecute('CWUser X WHERE X login "syt"').get_entity(0, 0)
   155         self.assertEquals([g.name for g in syt.in_group], ['managers', 'users'])
   150         self.assertEquals([g.name for g in syt.in_group], ['managers', 'users'])
   156         self.patch_authenticate()
   151         self.patch_authenticate()
   157         cnx = self.login('syt', 'dummypassword')
   152         cnx = self.login('syt', 'dummypassword')
   158         cu = cnx.cursor()
   153         cu = cnx.cursor()
   159         cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"')
   154         cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"')
   160         try:
   155         try:
   161             cnx.commit()
   156             cnx.commit()
   162             alf = self.execute('CWUser X WHERE X login "alf"').get_entity(0, 0)
   157             alf = self.sexecute('CWUser X WHERE X login "alf"').get_entity(0, 0)
   163             self.assertEquals(alf.in_state[0].name, 'deactivated')
   158             self.assertEquals(alf.in_state[0].name, 'deactivated')
   164             trinfo = alf.latest_trinfo()
   159             trinfo = alf.latest_trinfo()
   165             self.assertEquals(trinfo.owned_by[0].login, 'syt')
   160             self.assertEquals(trinfo.owned_by[0].login, 'syt')
   166             # select from_state to skip the user's creation TrInfo
   161             # select from_state to skip the user's creation TrInfo
   167             rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
   162             rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
   168                                 'WF creation_date D, WF from_state FS,'
   163                                 'WF creation_date D, WF from_state FS,'
   169                                 'WF owned_by U?, X eid %(x)s',
   164                                 'WF owned_by U?, X eid %(x)s',
   170                                 {'x': alf.eid}, 'x')
   165                                 {'x': alf.eid}, 'x')
   171             self.assertEquals(rset.rows, [[syt.eid]])
   166             self.assertEquals(rset.rows, [[syt.eid]])
   172         finally:
   167         finally:
   173             # restore db state
   168             # restore db state
   174             self.restore_connection()
   169             self.restore_connection()
   175             self.execute('SET X in_state S WHERE X login "alf", S name "activated"')
   170             self.sexecute('SET X in_state S WHERE X login "alf", S name "activated"')
   176             self.execute('DELETE X in_group G WHERE X login "syt", G name "managers"')
   171             self.sexecute('DELETE X in_group G WHERE X login "syt", G name "managers"')
   177 
   172 
   178     def test_same_column_names(self):
   173     def test_same_column_names(self):
   179         self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
   174         self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
   180 
   175 
   181     def test_multiple_entities_from_different_sources(self):
   176     def test_multiple_entities_from_different_sources(self):
   182         self.create_user('cochon')
   177         self.create_user('cochon', req=self.session)
   183         self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
   178         self.failUnless(self.sexecute('Any X,Y WHERE X login "syt", Y login "cochon"'))
   184 
   179 
   185     def test_exists1(self):
   180     def test_exists1(self):
   186         self.add_entity('CWGroup', name=u'bougloup1')
   181         self.add_entity('CWGroup', name=u'bougloup1', req=self.session)
   187         self.add_entity('CWGroup', name=u'bougloup2')
   182         self.add_entity('CWGroup', name=u'bougloup2', req=self.session)
   188         self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   183         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
   189         self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
   184         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
   190         rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   185         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
   191         self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
   186         self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
   192 
   187 
   193     def test_exists2(self):
   188     def test_exists2(self):
   194         self.create_user('comme')
   189         self.create_user('comme', req=self.session)
   195         self.create_user('cochon')
   190         self.create_user('cochon', req=self.session)
   196         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   191         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   197         rset = self.execute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
   192         rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
   198         self.assertEquals(rset.rows, [['managers'], ['users']])
   193         self.assertEquals(rset.rows, [['managers'], ['users']])
   199 
   194 
   200     def test_exists3(self):
   195     def test_exists3(self):
   201         self.create_user('comme')
   196         self.create_user('comme', req=self.session)
   202         self.create_user('cochon')
   197         self.create_user('cochon', req=self.session)
   203         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   198         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   204         self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   199         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
   205         self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"')
   200         self.sexecute('SET X copain Y WHERE X login "syt", Y login "cochon"')
   206         self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"'))
   201         self.failUnless(self.sexecute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"'))
   207         rset = self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   202         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
   208         self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
   203         self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
   209 
   204 
   210     def test_exists4(self):
   205     def test_exists4(self):
   211         self.create_user('comme')
   206         self.create_user('comme', req=self.session)
   212         self.create_user('cochon', groups=('users', 'guests'))
   207         self.create_user('cochon', groups=('users', 'guests'), req=self.session)
   213         self.create_user('billy')
   208         self.create_user('billy', req=self.session)
   214         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   209         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   215         self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   210         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   216         self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
   211         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
   217         self.execute('SET X copain Y WHERE X login "syt", Y login "billy"')
   212         self.sexecute('SET X copain Y WHERE X login "syt", Y login "billy"')
   218         # search for group name, login where
   213         # search for group name, login where
   219         #   CWUser copain with "comme" or "cochon" AND same login as the copain
   214         #   CWUser copain with "comme" or "cochon" AND same login as the copain
   220         # OR
   215         # OR
   221         #   CWUser in_state activated AND not copain with billy
   216         #   CWUser in_state activated AND not copain with billy
   222         #
   217         #
   223         # SO we expect everybody but "comme" and "syt"
   218         # SO we expect everybody but "comme" and "syt"
   224         rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   219         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   225                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
   220                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
   226                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
   221                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
   227         all = self.execute('Any GN, L WHERE X in_group G, X login L, G name GN')
   222         all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
   228         all.rows.remove(['users', 'comme'])
   223         all.rows.remove(['users', 'comme'])
   229         all.rows.remove(['users', 'syt'])
   224         all.rows.remove(['users', 'syt'])
   230         self.assertEquals(sorted(rset.rows), sorted(all.rows))
   225         self.assertEquals(sorted(rset.rows), sorted(all.rows))
   231 
   226 
   232     def test_exists5(self):
   227     def test_exists5(self):
   233         self.create_user('comme')
   228         self.create_user('comme', req=self.session)
   234         self.create_user('cochon', groups=('users', 'guests'))
   229         self.create_user('cochon', groups=('users', 'guests'), req=self.session)
   235         self.create_user('billy')
   230         self.create_user('billy', req=self.session)
   236         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   231         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
   237         self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   232         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
   238         self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
   233         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
   239         self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"')
   234         self.sexecute('SET X copain Y WHERE X login "syt", Y login "cochon"')
   240         rset= self.execute('Any L WHERE X login L, '
   235         rset= self.sexecute('Any L WHERE X login L, '
   241                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   236                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   242                            'NOT EXISTS(X copain T2, T2 login "billy")')
   237                            'NOT EXISTS(X copain T2, T2 login "billy")')
   243         self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']])
   238         self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']])
   244         rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   239         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
   245                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   240                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
   246                            'NOT EXISTS(X copain T2, T2 login "billy")')
   241                            'NOT EXISTS(X copain T2, T2 login "billy")')
   247         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
   242         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
   248                                               ['users', 'cochon'],
   243                                               ['users', 'cochon'],
   249                                               ['users', 'syt']])
   244                                               ['users', 'syt']])
   250 
   245 
   251     def test_cd_restriction(self):
   246     def test_cd_restriction(self):
   252         rset = self.execute('CWUser X WHERE X creation_date > "2009-02-01"')
   247         rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
   253         self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
   248         self.assertEquals(len(rset), 2) # admin/anon but no ldap user since it doesn't support creation_date
   254 
   249 
   255     def test_union(self):
   250     def test_union(self):
   256         afeids = self.execute('State X')
   251         afeids = self.sexecute('State X')
   257         ueids = self.execute('CWUser X')
   252         ueids = self.sexecute('CWUser X')
   258         rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
   253         rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
   259         self.assertEquals(sorted(r[0] for r in rset.rows),
   254         self.assertEquals(sorted(r[0] for r in rset.rows),
   260                           sorted(r[0] for r in afeids + ueids))
   255                           sorted(r[0] for r in afeids + ueids))
   261 
   256 
   262     def _init_security_test(self):
   257     def _init_security_test(self):
   263         self.create_user('iaminguestsgrouponly', groups=('guests',))
   258         self.create_user('iaminguestsgrouponly', groups=('guests',), req=self.session)
   264         cnx = self.login('iaminguestsgrouponly')
   259         cnx = self.login('iaminguestsgrouponly')
   265         return cnx.cursor()
   260         return cnx.cursor()
   266 
   261 
   267     def test_security1(self):
   262     def test_security1(self):
   268         cu = self._init_security_test()
   263         cu = self._init_security_test()
   284         self.assertEquals(rset.rows, [])
   279         self.assertEquals(rset.rows, [])
   285         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   280         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
   286         self.assertEquals(rset.rows, [[None]])
   281         self.assertEquals(rset.rows, [[None]])
   287 
   282 
   288     def test_nonregr1(self):
   283     def test_nonregr1(self):
   289         self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
   284         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
   290                      'X modification_date AA',
   285                      'X modification_date AA',
   291                      {'x': cnx.user(self.session).eid})
   286                      {'x': self.session.user.eid})
   292 
   287 
   293     def test_nonregr2(self):
   288     def test_nonregr2(self):
   294         self.execute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
   289         self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
   295                      'X login L, X modification_date AA',
   290                      'X login L, X modification_date AA',
   296                      {'x': cnx.user(self.session).eid})
   291                      {'x': self.session.user.eid})
   297 
   292 
   298     def test_nonregr3(self):
   293     def test_nonregr3(self):
   299         self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
   294         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
   300                      'X modification_date AA',
   295                      'X modification_date AA',
   301                      {'x': cnx.user(self.session).eid})
   296                      {'x': self.session.user.eid})
   302 
   297 
   303     def test_nonregr4(self):
   298     def test_nonregr4(self):
   304         emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
   299         emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
   305         self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   300         self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
   306                      {'x': emaileid})
   301                      {'x': emaileid})
   307 
   302 
   308     def test_nonregr5(self):
   303     def test_nonregr5(self):
   309         # original jpl query:
   304         # original jpl query:
   310         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   305         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
   311         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   306         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
   312         self.execute(rql, )#{'x': })
   307         self.sexecute(rql, )#{'x': })
   313 
   308 
   314     def test_nonregr6(self):
   309     def test_nonregr6(self):
   315         self.execute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   310         self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
   316                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   311                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
   317                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   312                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
   318                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   313                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
   319                      {'x': self.session.user.eid})
   314                      {'x': self.session.user.eid})
   320 
   315 
   351         self.assertEquals(res, [[4]])
   346         self.assertEquals(res, [[4]])
   352         trfunc = GlobTrFunc('max', 1)
   347         trfunc = GlobTrFunc('max', 1)
   353         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   348         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
   354         self.assertEquals(res, [[1, 5], [2, 4], [3, 6]])
   349         self.assertEquals(res, [[1, 5], [2, 4], [3, 6]])
   355 
   350 
       
   351 # XXX
       
   352 LDAPUserSourceTC._init_repo()
       
   353 repo = LDAPUserSourceTC.repo
   356 
   354 
   357 class RQL2LDAPFilterTC(RQLGeneratorTC):
   355 class RQL2LDAPFilterTC(RQLGeneratorTC):
   358     schema = repo.schema
   356     schema = repo.schema
   359 
   357 
   360     def setUp(self):
   358     def setUp(self):