server/test/unittest_ldapuser.py
changeset 360 600dd2fe8b40
child 975 0928daea04e9
equal deleted inserted replaced
359:164307023401 360:600dd2fe8b40
       
     1 """cubicweb.server.sources.ldapusers unit and functional tests"""
       
     2 
       
     3 from logilab.common.testlib import TestCase, unittest_main, mock_object
       
     4 from cubicweb.devtools import init_test_database, TestServerConfiguration
       
     5 from cubicweb.devtools.apptest import RepositoryBasedTC
       
     6 from cubicweb.devtools.repotest import RQLGeneratorTC
       
     7 
       
     8 from cubicweb.server.sources.ldapuser import *
       
     9 
       
    10 def nopwd_authenticate(self, session, login, upassword):
       
    11     """used to monkey patch the source to get successful authentication without
       
    12     upassword checking
       
    13     """
       
    14     assert login, 'no login!'
       
    15     searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))]
       
    16     searchfilter.extend([filter_format('(%s=%s)', ('objectClass', o))
       
    17                          for o in self.user_classes])
       
    18     searchstr = '(&%s)' % ''.join(searchfilter)
       
    19     # first search the user
       
    20     try:
       
    21         user = self._search(session, self.user_base_dn, self.user_base_scope,
       
    22                             searchstr)[0]
       
    23     except IndexError:
       
    24         # no such user
       
    25         raise AuthenticationError()
       
    26     # don't check upassword !
       
    27     return self.extid2eid(user['dn'], 'EUser', session)
       
    28 
       
    29 
       
    30 
       
    31 config = TestServerConfiguration('data')
       
    32 config.sources_file = lambda : 'data/sourcesldap'
       
    33 repo, cnx = init_test_database('sqlite', config=config)
       
    34 
       
    35 class LDAPUserSourceTC(RepositoryBasedTC):
       
    36     repo = repo
       
    37         
       
    38     def patch_authenticate(self):
       
    39         self._orig_authenticate = LDAPUserSource.authenticate
       
    40         LDAPUserSource.authenticate = nopwd_authenticate
       
    41 
       
    42     def setUp(self):
       
    43         self._prepare()
       
    44         # XXX: need this first query else we get 'database is locked' from 
       
    45         # sqlite since it doesn't support multiple connections on the same
       
    46         # database
       
    47         # so doing, ldap inserted users don't get removed between each test
       
    48         rset = self.execute('EUser X')
       
    49         self.commit()
       
    50         # check we get some users from ldap
       
    51         self.assert_(len(rset) > 1)
       
    52         self.maxeid = self.execute('Any MAX(X)')[0][0]
       
    53         
       
    54     def tearDown(self):
       
    55         if hasattr(self, '_orig_authenticate'):
       
    56             LDAPUserSource.authenticate = self._orig_authenticate
       
    57         RepositoryBasedTC.tearDown(self)
       
    58             
       
    59     def test_authenticate(self):
       
    60         source = self.repo.sources_by_uri['ldapuser']
       
    61         self.assertRaises(AuthenticationError,
       
    62                           source.authenticate, self.session, 'toto', 'toto')
       
    63         
       
    64     def test_synchronize(self):
       
    65         source = self.repo.sources_by_uri['ldapuser']
       
    66         source.synchronize()
       
    67         
       
    68     def test_base(self):
       
    69         # check a known one
       
    70         e = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0)
       
    71         self.assertEquals(e.login, 'syt')
       
    72         e.complete()
       
    73         self.assertEquals(e.creation_date, None)
       
    74         self.assertEquals(e.modification_date, None)
       
    75         self.assertEquals(e.firstname, None)
       
    76         self.assertEquals(e.surname, None)
       
    77         self.assertEquals(e.in_group[0].name, 'users')
       
    78         self.assertEquals(e.owned_by[0].login, 'syt')
       
    79         self.assertEquals(e.created_by, [])
       
    80         self.assertEquals(e.primary_email[0].address, 'Sylvain Thenault')
       
    81         # email content should be indexed on the user
       
    82         rset = self.execute('EUser X WHERE X has_text "thenault"')
       
    83         self.assertEquals(rset.rows, [[e.eid]])
       
    84 
       
    85     def test_not(self):
       
    86         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
       
    87         rset = self.execute('EUser X WHERE NOT X eid %s' % eid)
       
    88         self.assert_(rset)
       
    89         self.assert_(not eid in (r[0] for r in rset))
       
    90 
       
    91     def test_multiple(self):
       
    92         seid = self.execute('EUser X WHERE X login "syt"')[0][0]
       
    93         aeid = self.execute('EUser X WHERE X login "adim"')[0][0]
       
    94         rset = self.execute('EUser X, Y WHERE X login "syt", Y login "adim"')
       
    95         self.assertEquals(rset.rows, [[seid, aeid]])
       
    96         rset = self.execute('Any X,Y,L WHERE X login L, X login "syt", Y login "adim"')
       
    97         self.assertEquals(rset.rows, [[seid, aeid, 'syt']])
       
    98 
       
    99     def test_in(self):
       
   100         seid = self.execute('EUser X WHERE X login "syt"')[0][0]
       
   101         aeid = self.execute('EUser X WHERE X login "adim"')[0][0]
       
   102         rset = self.execute('Any X,L ORDERBY L WHERE X login IN("syt", "adim"), X login L')
       
   103         self.assertEquals(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
       
   104 
       
   105     def test_relations(self):
       
   106         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
       
   107         rset = self.execute('Any X,E WHERE X is EUser, X login L, X primary_email E')
       
   108         self.assert_(eid in (r[0] for r in rset))
       
   109         rset = self.execute('Any X,L,E WHERE X is EUser, X login L, X primary_email E')
       
   110         self.assert_('syt' in (r[1] for r in rset))
       
   111 
       
   112     def test_count(self):
       
   113         nbusers = self.execute('Any COUNT(X) WHERE X is EUser')[0][0]
       
   114         # just check this is a possible number
       
   115         self.assert_(nbusers > 1, nbusers)
       
   116         self.assert_(nbusers < 30, nbusers)
       
   117 
       
   118     def test_upper(self):
       
   119         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
       
   120         rset = self.execute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
       
   121         self.assertEquals(rset[0][0], 'SYT')
       
   122 
       
   123     def test_unknown_attr(self):
       
   124         eid = self.execute('EUser X WHERE X login "syt"')[0][0]
       
   125         rset = self.execute('Any L,C,M WHERE X eid %s, X login L, '
       
   126                             'X creation_date C, X modification_date M' % eid)
       
   127         self.assertEquals(rset[0][0], 'syt')
       
   128         self.assertEquals(rset[0][1], None)
       
   129         self.assertEquals(rset[0][2], None)
       
   130 
       
   131     def test_sort(self):
       
   132         logins = [l for l, in self.execute('Any L ORDERBY L WHERE X login L')]
       
   133         self.assertEquals(logins, sorted(logins))
       
   134 
       
   135     def test_lower_sort(self):
       
   136         logins = [l for l, in self.execute('Any L ORDERBY lower(L) WHERE X login L')]
       
   137         self.assertEquals(logins, sorted(logins))
       
   138 
       
   139     def test_or(self):
       
   140         rset = self.execute('DISTINCT Any X WHERE X login "syt" OR (X in_group G, G name "managers")')
       
   141         self.assertEquals(len(rset), 2, rset.rows) # syt + admin
       
   142         
       
   143     def test_nonregr_set_owned_by(self):
       
   144         # test that when a user coming from ldap is triggering a transition
       
   145         # the related TrInfo has correct owner information
       
   146         self.execute('SET X in_group G WHERE X login "syt", G name "managers"')
       
   147         self.commit()
       
   148         syt = self.execute('EUser X WHERE X login "syt"').get_entity(0, 0)
       
   149         self.assertEquals([g.name for g in syt.in_group], ['managers', 'users'])
       
   150         self.patch_authenticate()
       
   151         cnx = self.login('syt', 'dummypassword')
       
   152         cu = cnx.cursor()
       
   153         cu.execute('SET X in_state S WHERE X login "alf", S name "deactivated"')
       
   154         try:
       
   155             cnx.commit()
       
   156             alf = self.execute('EUser X WHERE X login "alf"').get_entity(0, 0)
       
   157             self.assertEquals(alf.in_state[0].name, 'deactivated')
       
   158             trinfo = alf.latest_trinfo()
       
   159             self.assertEquals(trinfo.owned_by[0].login, 'syt')
       
   160             # select from_state to skip the user's creation TrInfo
       
   161             rset = self.execute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
       
   162                                 'WF creation_date D, WF from_state FS,'
       
   163                                 'WF owned_by U?, X eid %(x)s',
       
   164                                 {'x': alf.eid}, 'x')
       
   165             self.assertEquals(rset.rows, [[syt.eid]])
       
   166         finally:
       
   167             # restore db state
       
   168             self.restore_connection()
       
   169             self.execute('SET X in_state S WHERE X login "alf", S name "activated"')
       
   170             self.execute('DELETE X in_group G WHERE X login "syt", G name "managers"')
       
   171 
       
   172     def test_same_column_names(self):
       
   173         self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
       
   174 
       
   175     def test_multiple_entities_from_different_sources(self):
       
   176         self.create_user('cochon')
       
   177         self.failUnless(self.execute('Any X,Y WHERE X login "syt", Y login "cochon"'))
       
   178         
       
   179     def test_exists1(self):
       
   180         self.add_entity('EGroup', name=u'bougloup1')
       
   181         self.add_entity('EGroup', name=u'bougloup2')
       
   182         self.execute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
       
   183         self.execute('SET U in_group G WHERE G name = "bougloup1", U login "syt"')
       
   184         rset = self.execute('Any L,SN ORDERBY L WHERE X in_state S, S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
       
   185         self.assertEquals(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
       
   186 
       
   187     def test_exists2(self):
       
   188         self.create_user('comme')
       
   189         self.create_user('cochon')
       
   190         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   191         rset = self.execute('Any GN ORDERBY GN WHERE X in_group G, G name GN, (G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
       
   192         self.assertEquals(rset.rows, [['managers'], ['users']])
       
   193 
       
   194     def test_exists3(self):
       
   195         self.create_user('comme')
       
   196         self.create_user('cochon')
       
   197         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   198         self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
       
   199         self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"')
       
   200         self.failUnless(self.execute('Any X, Y WHERE X copain Y, X login "syt", Y login "cochon"'))
       
   201         rset = self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon"))')
       
   202         self.assertEquals(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
       
   203 
       
   204     def test_exists4(self):
       
   205         self.create_user('comme')
       
   206         self.create_user('cochon', groups=('users', 'guests'))
       
   207         self.create_user('billy')
       
   208         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   209         self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
       
   210         self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
       
   211         self.execute('SET X copain Y WHERE X login "syt", Y login "billy"')
       
   212         # search for group name, login where
       
   213         #   EUser copain with "comme" or "cochon" AND same login as the copain
       
   214         # OR
       
   215         #   EUser in_state activated AND not copain with billy
       
   216         #
       
   217         # SO we expect everybody but "comme" and "syt"
       
   218         rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
       
   219                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
       
   220                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
       
   221         all = self.execute('Any GN, L WHERE X in_group G, X login L, G name GN')
       
   222         all.rows.remove(['users', 'comme'])
       
   223         all.rows.remove(['users', 'syt'])
       
   224         self.assertEquals(sorted(rset.rows), sorted(all.rows))
       
   225 
       
   226     def test_exists5(self):
       
   227         self.create_user('comme')
       
   228         self.create_user('cochon', groups=('users', 'guests'))
       
   229         self.create_user('billy')
       
   230         self.execute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   231         self.execute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
       
   232         self.execute('SET X copain Y WHERE X login "comme", Y login "billy"')
       
   233         self.execute('SET X copain Y WHERE X login "syt", Y login "cochon"')
       
   234         rset= self.execute('Any L WHERE X login L, '
       
   235                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
       
   236                            'NOT EXISTS(X copain T2, T2 login "billy")')
       
   237         self.assertEquals(sorted(rset.rows), [['cochon'], ['syt']])
       
   238         rset= self.execute('Any GN,L WHERE X in_group G, X login L, G name GN, '
       
   239                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
       
   240                            'NOT EXISTS(X copain T2, T2 login "billy")')
       
   241         self.assertEquals(sorted(rset.rows), [['guests', 'cochon'],
       
   242                                               ['users', 'cochon'],
       
   243                                               ['users', 'syt']])
       
   244         
       
   245 
       
   246     def test_union(self):
       
   247         afeids = self.execute('State X')
       
   248         ueids = self.execute('EUser X')
       
   249         rset = self.execute('(Any X WHERE X is State) UNION (Any X WHERE X is EUser)')
       
   250         self.assertEquals(sorted(r[0] for r in rset.rows),
       
   251                           sorted(r[0] for r in afeids + ueids))
       
   252 
       
   253     def _init_security_test(self):
       
   254         self.create_user('iaminguestsgrouponly', groups=('guests',))
       
   255         cnx = self.login('iaminguestsgrouponly')
       
   256         return cnx.cursor()
       
   257     
       
   258     def test_security1(self):
       
   259         cu = self._init_security_test()
       
   260         rset = cu.execute('Any X WHERE X login "syt"')
       
   261         self.assertEquals(rset.rows, [])
       
   262         rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
       
   263         self.assertEquals(len(rset.rows), 1)
       
   264     
       
   265     def test_security2(self):
       
   266         cu = self._init_security_test()
       
   267         rset = cu.execute('Any X WHERE X has_text "syt"')
       
   268         self.assertEquals(rset.rows, [])
       
   269         rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
       
   270         self.assertEquals(len(rset.rows), 1)
       
   271     
       
   272     def test_security3(self):
       
   273         cu = self._init_security_test()
       
   274         rset = cu.execute('Any F WHERE X has_text "syt", X firstname F')
       
   275         self.assertEquals(rset.rows, [])
       
   276         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
       
   277         self.assertEquals(rset.rows, [[None]])
       
   278 
       
   279     def test_nonregr_1(self):
       
   280         self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
       
   281                      'X modification_date AA',
       
   282                      {'x': cnx.user(self.session).eid})
       
   283 
       
   284     def test_nonregr_2(self):
       
   285         self.execute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
       
   286                      'X login L, X modification_date AA',
       
   287                      {'x': cnx.user(self.session).eid})
       
   288 
       
   289     def test_nonregr_3(self):
       
   290         self.execute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
       
   291                      'X modification_date AA',
       
   292                      {'x': cnx.user(self.session).eid})
       
   293 
       
   294     def test_nonregr_4(self):
       
   295         emaileid = self.execute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
       
   296         self.execute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
       
   297                      {'x': emaileid})
       
   298         
       
   299     def test_nonregr_5(self):
       
   300         # original jpl query:
       
   301         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is EUser, U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
       
   302         rql = 'Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, U login "%s", P is X, X creation_date CD' % self.session.user.login
       
   303         self.execute(rql, )#{'x': })
       
   304         
       
   305 
       
   306 class GlobTrFuncTC(TestCase):
       
   307 
       
   308     def test_count(self):
       
   309         trfunc = GlobTrFunc('count', 0)
       
   310         res = trfunc.apply([[1], [2], [3], [4]])
       
   311         self.assertEquals(res, [[4]])
       
   312         trfunc = GlobTrFunc('count', 1)
       
   313         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   314         self.assertEquals(res, [[1, 2], [2, 1], [3, 1]])
       
   315 
       
   316     def test_sum(self):
       
   317         trfunc = GlobTrFunc('sum', 0)
       
   318         res = trfunc.apply([[1], [2], [3], [4]])
       
   319         self.assertEquals(res, [[10]])
       
   320         trfunc = GlobTrFunc('sum', 1)
       
   321         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   322         self.assertEquals(res, [[1, 7], [2, 4], [3, 6]])
       
   323 
       
   324     def test_min(self):
       
   325         trfunc = GlobTrFunc('min', 0)
       
   326         res = trfunc.apply([[1], [2], [3], [4]])
       
   327         self.assertEquals(res, [[1]])
       
   328         trfunc = GlobTrFunc('min', 1)
       
   329         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   330         self.assertEquals(res, [[1, 2], [2, 4], [3, 6]])
       
   331 
       
   332     def test_max(self):
       
   333         trfunc = GlobTrFunc('max', 0)
       
   334         res = trfunc.apply([[1], [2], [3], [4]])
       
   335         self.assertEquals(res, [[4]])
       
   336         trfunc = GlobTrFunc('max', 1)
       
   337         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   338         self.assertEquals(res, [[1, 5], [2, 4], [3, 6]])
       
   339 
       
   340 
       
   341 class RQL2LDAPFilterTC(RQLGeneratorTC):
       
   342     schema = repo.schema
       
   343     
       
   344     def setUp(self):
       
   345         RQLGeneratorTC.setUp(self)
       
   346         ldapsource = repo.sources[-1]
       
   347         self.pool = repo._get_pool()
       
   348         session = mock_object(pool=self.pool)
       
   349         self.o = RQL2LDAPFilter(ldapsource, session)
       
   350         
       
   351     def tearDown(self):
       
   352         repo._free_pool(self.pool)
       
   353         RQLGeneratorTC.tearDown(self)
       
   354         
       
   355     def test_base(self):
       
   356         rqlst = self._prepare('EUser X WHERE X login "toto"').children[0]
       
   357         self.assertEquals(self.o.generate(rqlst, 'X')[1],
       
   358                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
       
   359         
       
   360     def test_kwargs(self):
       
   361         rqlst = self._prepare('EUser X WHERE X login %(x)s').children[0]
       
   362         self.o._args = {'x': "toto"}
       
   363         self.assertEquals(self.o.generate(rqlst, 'X')[1],
       
   364                           '(&(objectClass=top)(objectClass=posixAccount)(uid=toto))')
       
   365         
       
   366     def test_get_attr(self):
       
   367         rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
       
   368         self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')
       
   369         
       
   370         
       
   371 if __name__ == '__main__':
       
   372     unittest_main()