server/test/unittest_ldapuser.py
branchstable
changeset 9013 b4bcabf55e77
parent 9012 2cf127d4f5fd
parent 9010 1f3d4d829e63
child 9014 dfa4da8a53a0
child 9128 d988eec2d5d3
equal deleted inserted replaced
9012:2cf127d4f5fd 9013:b4bcabf55e77
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """cubicweb.server.sources.ldapusers unit and functional tests"""
       
    19 
       
    20 import os
       
    21 import shutil
       
    22 import time
       
    23 from os.path import join, exists
       
    24 import subprocess
       
    25 import tempfile
       
    26 
       
    27 from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags
       
    28 
       
    29 from cubicweb import AuthenticationError
       
    30 from cubicweb.devtools.testlib import CubicWebTC
       
    31 from cubicweb.devtools.repotest import RQLGeneratorTC
       
    32 from cubicweb.devtools.httptest import get_available_port
       
    33 from cubicweb.devtools import get_test_db_handler
       
    34 
       
    35 from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
       
    36 
       
    37 CONFIG = u'user-base-dn=ou=People,dc=cubicweb,dc=test'
       
    38 URL = None
       
    39 
       
    40 def create_slapd_configuration(cls):
       
    41     global URL
       
    42     slapddir = tempfile.mkdtemp('cw-unittest-ldap')
       
    43     config = cls.config
       
    44     slapdconf = join(config.apphome, "slapd.conf")
       
    45     confin = file(join(config.apphome, "slapd.conf.in")).read()
       
    46     confstream = file(slapdconf, 'w')
       
    47     confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
       
    48     confstream.close()
       
    49     # fill ldap server with some data
       
    50     ldiffile = join(config.apphome, "ldap_test.ldif")
       
    51     config.info('Initing ldap database')
       
    52     cmdline = "/usr/sbin/slapadd -f %s -l %s -c" % (slapdconf, ldiffile)
       
    53     subprocess.call(cmdline, shell=True)
       
    54 
       
    55     #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
       
    56     port = get_available_port(xrange(9000, 9100))
       
    57     host = 'localhost:%s' % port
       
    58     ldapuri = 'ldap://%s' % host
       
    59     cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
       
    60     config.info('Starting slapd:', ' '.join(cmdline))
       
    61     cls.slapd_process = subprocess.Popen(cmdline)
       
    62     time.sleep(0.2)
       
    63     if cls.slapd_process.poll() is None:
       
    64         config.info('slapd started with pid %s' % cls.slapd_process.pid)
       
    65     else:
       
    66         raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
       
    67                                (" ".join(cmdline), os.getcwd()))
       
    68     URL = u'ldap://%s' % host
       
    69     return slapddir
       
    70 
       
    71 def terminate_slapd(cls):
       
    72     config = cls.config
       
    73     if cls.slapd_process and cls.slapd_process.returncode is None:
       
    74         config.info('terminating slapd')
       
    75         if hasattr(cls.slapd_process, 'terminate'):
       
    76             cls.slapd_process.terminate()
       
    77         else:
       
    78             import os, signal
       
    79             os.kill(cls.slapd_process.pid, signal.SIGTERM)
       
    80         cls.slapd_process.wait()
       
    81         config.info('DONE')
       
    82 
       
    83 class LDAPTestBase(CubicWebTC):
       
    84     loglevel = 'ERROR'
       
    85 
       
    86     @classmethod
       
    87     def setUpClass(cls):
       
    88         from cubicweb.cwctl import init_cmdline_log_threshold
       
    89         init_cmdline_log_threshold(cls.config, cls.loglevel)
       
    90         cls._tmpdir = create_slapd_configuration(cls)
       
    91 
       
    92     @classmethod
       
    93     def tearDownClass(cls):
       
    94         terminate_slapd(cls)
       
    95         try:
       
    96             shutil.rmtree(cls._tmpdir)
       
    97         except:
       
    98             pass
       
    99 
       
   100 class CheckWrongGroup(LDAPTestBase):
       
   101 
       
   102     def test_wrong_group(self):
       
   103         self.session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
       
   104                                    url=URL, config=CONFIG)
       
   105         self.commit()
       
   106         with self.session.repo.internal_session(safe=True) as session:
       
   107             source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
       
   108             config = source.repo_source.check_config(source)
       
   109             # inject a bogus group here, along with at least a valid one
       
   110             config['user-default-group'] = ('thisgroupdoesnotexists','users')
       
   111             source.repo_source.update_config(source, config)
       
   112             session.commit(free_cnxset=False)
       
   113             # here we emitted an error log entry
       
   114             stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
       
   115             session.commit()
       
   116 
       
   117 class DeleteStuffFromLDAPFeedSourceTC(LDAPTestBase):
       
   118     test_db_id = 'ldap-feed'
       
   119 
       
   120     @classmethod
       
   121     def pre_setup_database(cls, session, config):
       
   122         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
       
   123                               url=URL, config=CONFIG)
       
   124         session.commit()
       
   125         with session.repo.internal_session(safe=True) as isession:
       
   126             lfsource = isession.repo.sources_by_uri['ldapuser']
       
   127             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   128 
       
   129     def _pull(self):
       
   130         with self.session.repo.internal_session() as isession:
       
   131             lfsource = isession.repo.sources_by_uri['ldapuser']
       
   132             stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   133             isession.commit()
       
   134 
       
   135     def test_a_filter_inactivate(self):
       
   136         """ filtered out people should be deactivated, unable to authenticate """
       
   137         source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
       
   138         config = source.repo_source.check_config(source)
       
   139         # filter with adim's phone number
       
   140         config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
       
   141         source.repo_source.update_config(source, config)
       
   142         self.commit()
       
   143         self._pull()
       
   144         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
       
   145         self.assertEqual(self.execute('Any N WHERE U login "syt", '
       
   146                                       'U in_state S, S name N').rows[0][0],
       
   147                          'deactivated')
       
   148         self.assertEqual(self.execute('Any N WHERE U login "adim", '
       
   149                                       'U in_state S, S name N').rows[0][0],
       
   150                          'activated')
       
   151         # unfilter, syt should be activated again
       
   152         config['user-filter'] = u''
       
   153         source.repo_source.update_config(source, config)
       
   154         self.commit()
       
   155         self._pull()
       
   156         self.assertEqual(self.execute('Any N WHERE U login "syt", '
       
   157                                       'U in_state S, S name N').rows[0][0],
       
   158                          'activated')
       
   159         self.assertEqual(self.execute('Any N WHERE U login "adim", '
       
   160                                       'U in_state S, S name N').rows[0][0],
       
   161                          'activated')
       
   162 
       
   163     def test_delete(self):
       
   164         """ delete syt, pull, check deactivation, repull,
       
   165         readd syt, pull, check activation
       
   166         """
       
   167         uri = self.repo.sources_by_uri['ldapuser'].urls[0]
       
   168         deletecmd = ("ldapdelete -H %s 'uid=syt,ou=People,dc=cubicweb,dc=test' "
       
   169                      "-v -x -D cn=admin,dc=cubicweb,dc=test -w'cw'" % uri)
       
   170         os.system(deletecmd)
       
   171         self._pull()
       
   172         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
       
   173         self.assertEqual(self.execute('Any N WHERE U login "syt", '
       
   174                                       'U in_state S, S name N').rows[0][0],
       
   175                          'deactivated')
       
   176         # check that it doesn't choke
       
   177         self._pull()
       
   178         # reset the fscking ldap thing
       
   179         self.tearDownClass()
       
   180         self.setUpClass()
       
   181         self._pull()
       
   182         self.assertEqual(self.execute('Any N WHERE U login "syt", '
       
   183                                       'U in_state S, S name N').rows[0][0],
       
   184                          'activated')
       
   185         # test reactivating the user isn't enough to authenticate, as the native source
       
   186         # refuse to authenticate user from other sources
       
   187         os.system(deletecmd)
       
   188         self._pull()
       
   189         user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
       
   190         user.cw_adapt_to('IWorkflowable').fire_transition('activate')
       
   191         self.commit()
       
   192         self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
       
   193 
       
   194 class LDAPFeedSourceTC(LDAPTestBase):
       
   195     test_db_id = 'ldap-feed'
       
   196 
       
   197     @classmethod
       
   198     def pre_setup_database(cls, session, config):
       
   199         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapfeed', parser=u'ldapfeed',
       
   200                               url=URL, config=CONFIG)
       
   201         session.commit()
       
   202         isession = session.repo.internal_session(safe=True)
       
   203         lfsource = isession.repo.sources_by_uri['ldapuser']
       
   204         stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
       
   205 
       
   206     def setUp(self):
       
   207         super(LDAPFeedSourceTC, self).setUp()
       
   208         # ldap source url in the database may use a different port as the one
       
   209         # just attributed
       
   210         lfsource = self.repo.sources_by_uri['ldapuser']
       
   211         lfsource.urls = [URL]
       
   212 
       
   213     def assertMetadata(self, entity):
       
   214         self.assertTrue(entity.creation_date)
       
   215         self.assertTrue(entity.modification_date)
       
   216 
       
   217     def test_authenticate(self):
       
   218         source = self.repo.sources_by_uri['ldapuser']
       
   219         self.session.set_cnxset()
       
   220         # ensure we won't be logged against
       
   221         self.assertRaises(AuthenticationError,
       
   222                           source.authenticate, self.session, 'toto', 'toto')
       
   223         self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
       
   224         self.assertTrue(self.repo.connect('syt', password='syt'))
       
   225 
       
   226     def test_base(self):
       
   227         # check a known one
       
   228         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   229         e = rset.get_entity(0, 0)
       
   230         self.assertEqual(e.login, 'syt')
       
   231         e.complete()
       
   232         self.assertMetadata(e)
       
   233         self.assertEqual(e.firstname, None)
       
   234         self.assertEqual(e.surname, None)
       
   235         self.assertEqual(e.in_group[0].name, 'users')
       
   236         self.assertEqual(e.owned_by[0].login, 'syt')
       
   237         self.assertEqual(e.created_by, ())
       
   238         self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault')
       
   239         # email content should be indexed on the user
       
   240         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
       
   241         self.assertEqual(rset.rows, [[e.eid]])
       
   242 
       
   243     def test_copy_to_system_source(self):
       
   244         source = self.repo.sources_by_uri['ldapuser']
       
   245         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   246         self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
       
   247         self.commit()
       
   248         source.reset_caches()
       
   249         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   250         self.assertEqual(len(rset), 1)
       
   251         e = rset.get_entity(0, 0)
       
   252         self.assertEqual(e.eid, eid)
       
   253         self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
       
   254                                                              'uri': u'system',
       
   255                                                              'use-cwuri-as-url': False},
       
   256                                                   'type': 'CWUser',
       
   257                                                   'extid': None})
       
   258         self.assertEqual(e.cw_source[0].name, 'system')
       
   259         self.assertTrue(e.creation_date)
       
   260         self.assertTrue(e.modification_date)
       
   261         source.pull_data(self.session)
       
   262         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   263         self.assertEqual(len(rset), 1)
       
   264         self.assertTrue(self.repo.system_source.authenticate(
       
   265                 self.session, 'syt', password='syt'))
       
   266 
       
   267 
       
   268 class LDAPUserSourceTC(LDAPFeedSourceTC):
       
   269     test_db_id = 'ldap-user'
       
   270     tags = CubicWebTC.tags | Tags(('ldap'))
       
   271 
       
   272     @classmethod
       
   273     def pre_setup_database(cls, session, config):
       
   274         session.create_entity('CWSource', name=u'ldapuser', type=u'ldapuser',
       
   275                               url=URL, config=CONFIG)
       
   276         session.commit()
       
   277         # XXX keep it there
       
   278         session.execute('CWUser U')
       
   279 
       
   280     def assertMetadata(self, entity):
       
   281         self.assertEqual(entity.creation_date, None)
       
   282         self.assertEqual(entity.modification_date, None)
       
   283 
       
   284     def test_synchronize(self):
       
   285         source = self.repo.sources_by_uri['ldapuser']
       
   286         source.synchronize()
       
   287 
       
   288     def test_base(self):
       
   289         # check a known one
       
   290         rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   291         e = rset.get_entity(0, 0)
       
   292         self.assertEqual(e.login, 'syt')
       
   293         e.complete()
       
   294         self.assertMetadata(e)
       
   295         self.assertEqual(e.firstname, None)
       
   296         self.assertEqual(e.surname, None)
       
   297         self.assertEqual(e.in_group[0].name, 'users')
       
   298         self.assertEqual(e.owned_by[0].login, 'syt')
       
   299         self.assertEqual(e.created_by, ())
       
   300         self.assertEqual(e.primary_email[0].address, 'Sylvain Thenault')
       
   301         # email content should be indexed on the user
       
   302         rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
       
   303         self.assertEqual(rset.rows, [[e.eid]])
       
   304 
       
   305     def test_not(self):
       
   306         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   307         rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
       
   308         self.assert_(rset)
       
   309         self.assert_(not eid in (r[0] for r in rset))
       
   310 
       
   311     def test_multiple(self):
       
   312         seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   313         aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
       
   314         rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',
       
   315                             {'syt': 'syt', 'adim': 'adim'})
       
   316         self.assertEqual(rset.rows, [[seid, aeid]])
       
   317         rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',
       
   318                             {'syt': 'syt', 'adim': 'adim'})
       
   319         self.assertEqual(rset.rows, [[seid, aeid, 'syt']])
       
   320 
       
   321     def test_in(self):
       
   322         seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   323         aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
       
   324         rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim'))
       
   325         self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
       
   326 
       
   327     def test_relations(self):
       
   328         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   329         rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
       
   330         self.assert_(eid in (r[0] for r in rset))
       
   331         rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
       
   332         self.assert_('syt' in (r[1] for r in rset))
       
   333 
       
   334     def test_count(self):
       
   335         nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
       
   336         # just check this is a possible number
       
   337         self.assert_(nbusers > 1, nbusers)
       
   338         self.assert_(nbusers < 30, nbusers)
       
   339 
       
   340     def test_upper(self):
       
   341         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   342         rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
       
   343         self.assertEqual(rset[0][0], 'syt'.upper())
       
   344 
       
   345     def test_unknown_attr(self):
       
   346         eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
       
   347         rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
       
   348                             'X creation_date C, X modification_date M' % eid)
       
   349         self.assertEqual(rset[0][0], 'syt')
       
   350         self.assertEqual(rset[0][1], None)
       
   351         self.assertEqual(rset[0][2], None)
       
   352 
       
   353     def test_sort(self):
       
   354         logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
       
   355         self.assertEqual(logins, sorted(logins))
       
   356 
       
   357     def test_lower_sort(self):
       
   358         logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
       
   359         self.assertEqual(logins, sorted(logins))
       
   360 
       
   361     def test_or(self):
       
   362         rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',
       
   363                             {'login': 'syt'})
       
   364         self.assertEqual(len(rset), 2, rset.rows) # syt + admin
       
   365 
       
   366     def test_nonregr_set_owned_by(self):
       
   367         # test that when a user coming from ldap is triggering a transition
       
   368         # the related TrInfo has correct owner information
       
   369         self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
       
   370         self.commit()
       
   371         syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0)
       
   372         self.assertEqual([g.name for g in syt.in_group], ['managers', 'users'])
       
   373         cnx = self.login('syt', password='syt')
       
   374         cu = cnx.cursor()
       
   375         adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
       
   376         iworkflowable = adim.cw_adapt_to('IWorkflowable')
       
   377         iworkflowable.fire_transition('deactivate')
       
   378         try:
       
   379             cnx.commit()
       
   380             adim.cw_clear_all_caches()
       
   381             self.assertEqual(adim.in_state[0].name, 'deactivated')
       
   382             trinfo = iworkflowable.latest_trinfo()
       
   383             self.assertEqual(trinfo.owned_by[0].login, 'syt')
       
   384             # select from_state to skip the user's creation TrInfo
       
   385             rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
       
   386                                 'WF creation_date D, WF from_state FS,'
       
   387                                 'WF owned_by U?, X eid %(x)s',
       
   388                                 {'x': adim.eid})
       
   389             self.assertEqual(rset.rows, [[syt.eid]])
       
   390         finally:
       
   391             # restore db state
       
   392             self.restore_connection()
       
   393             adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
       
   394             adim.cw_adapt_to('IWorkflowable').fire_transition('activate')
       
   395             self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
       
   396 
       
   397     def test_same_column_names(self):
       
   398         self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
       
   399 
       
   400     def test_multiple_entities_from_different_sources(self):
       
   401         req = self.request()
       
   402         self.create_user(req, 'cochon')
       
   403         self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
       
   404 
       
   405     def test_exists1(self):
       
   406         self.session.set_cnxset()
       
   407         self.session.create_entity('CWGroup', name=u'bougloup1')
       
   408         self.session.create_entity('CWGroup', name=u'bougloup2')
       
   409         self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
       
   410         self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'})
       
   411         rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
       
   412                              'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
       
   413         self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
       
   414 
       
   415     def test_exists2(self):
       
   416         req = self.request()
       
   417         self.create_user(req, 'comme')
       
   418         self.create_user(req, 'cochon')
       
   419         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   420         rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
       
   421                              '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
       
   422         self.assertEqual(rset.rows, [['managers'], ['users']])
       
   423 
       
   424     def test_exists3(self):
       
   425         req = self.request()
       
   426         self.create_user(req, 'comme')
       
   427         self.create_user(req, 'cochon')
       
   428         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   429         self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
       
   430         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
       
   431         self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
       
   432         rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
       
   433                              'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
       
   434         self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
       
   435 
       
   436     def test_exists4(self):
       
   437         req = self.request()
       
   438         self.create_user(req, 'comme')
       
   439         self.create_user(req, 'cochon', groups=('users', 'guests'))
       
   440         self.create_user(req, 'billy')
       
   441         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   442         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
       
   443         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
       
   444         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'})
       
   445         # search for group name, login where
       
   446         #   CWUser copain with "comme" or "cochon" AND same login as the copain
       
   447         # OR
       
   448         #   CWUser in_state activated AND not copain with billy
       
   449         #
       
   450         # SO we expect everybody but "comme" and "syt"
       
   451         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
       
   452                            'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
       
   453                            'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
       
   454         all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
       
   455         all.rows.remove(['users', 'comme'])
       
   456         all.rows.remove(['users', 'syt'])
       
   457         self.assertEqual(sorted(rset.rows), sorted(all.rows))
       
   458 
       
   459     def test_exists5(self):
       
   460         req = self.request()
       
   461         self.create_user(req, 'comme')
       
   462         self.create_user(req, 'cochon', groups=('users', 'guests'))
       
   463         self.create_user(req, 'billy')
       
   464         self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
       
   465         self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
       
   466         self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
       
   467         self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
       
   468         rset= self.sexecute('Any L WHERE X login L, '
       
   469                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
       
   470                            'NOT EXISTS(X copain T2, T2 login "billy")')
       
   471         self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']])
       
   472         rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
       
   473                            'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
       
   474                            'NOT EXISTS(X copain T2, T2 login "billy")')
       
   475         self.assertEqual(sorted(rset.rows), [['guests', 'cochon'],
       
   476                                               ['users', 'cochon'],
       
   477                                               ['users', 'syt']])
       
   478 
       
   479     def test_cd_restriction(self):
       
   480         rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
       
   481         # admin/anon but no ldap user since it doesn't support creation_date
       
   482         self.assertEqual(sorted(e.login for e in rset.entities()),
       
   483                           ['admin', 'anon'])
       
   484 
       
   485     def test_union(self):
       
   486         afeids = self.sexecute('State X')
       
   487         ueids = self.sexecute('CWUser X')
       
   488         rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
       
   489         self.assertEqual(sorted(r[0] for r in rset.rows),
       
   490                           sorted(r[0] for r in afeids + ueids))
       
   491 
       
   492     def _init_security_test(self):
       
   493         req = self.request()
       
   494         self.create_user(req, 'iaminguestsgrouponly', groups=('guests',))
       
   495         cnx = self.login('iaminguestsgrouponly')
       
   496         return cnx.cursor()
       
   497 
       
   498     def test_security1(self):
       
   499         cu = self._init_security_test()
       
   500         rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
       
   501         self.assertEqual(rset.rows, [])
       
   502         rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
       
   503         self.assertEqual(len(rset.rows), 1)
       
   504 
       
   505     def test_security2(self):
       
   506         cu = self._init_security_test()
       
   507         rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'})
       
   508         self.assertEqual(rset.rows, [])
       
   509         rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
       
   510         self.assertEqual(len(rset.rows), 1)
       
   511 
       
   512     def test_security3(self):
       
   513         cu = self._init_security_test()
       
   514         rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
       
   515         self.assertEqual(rset.rows, [])
       
   516         rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
       
   517         self.assertEqual(rset.rows, [[None]])
       
   518 
       
   519     def test_nonregr1(self):
       
   520         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
       
   521                      'X modification_date AA',
       
   522                      {'x': self.session.user.eid})
       
   523 
       
   524     def test_nonregr2(self):
       
   525         self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
       
   526                      'X login L, X modification_date AA',
       
   527                      {'x': self.session.user.eid})
       
   528 
       
   529     def test_nonregr3(self):
       
   530         self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
       
   531                      'X modification_date AA',
       
   532                      {'x': self.session.user.eid})
       
   533 
       
   534     def test_nonregr4(self):
       
   535         emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
       
   536         self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
       
   537                      {'x': emaileid})
       
   538 
       
   539     def test_nonregr5(self):
       
   540         # original jpl query:
       
   541         # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
       
   542         # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
       
   543         rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
       
   544                'U login "%s", P is X, X creation_date CD') % self.session.user.login
       
   545         self.sexecute(rql, )#{'x': })
       
   546 
       
   547     def test_nonregr6(self):
       
   548         self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
       
   549                      'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
       
   550                      'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
       
   551                      'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
       
   552                      {'x': self.session.user.eid})
       
   553 
       
   554 class GlobTrFuncTC(TestCase):
       
   555 
       
   556     def test_count(self):
       
   557         trfunc = GlobTrFunc('count', 0)
       
   558         res = trfunc.apply([[1], [2], [3], [4]])
       
   559         self.assertEqual(res, [[4]])
       
   560         trfunc = GlobTrFunc('count', 1)
       
   561         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   562         self.assertEqual(res, [[1, 2], [2, 1], [3, 1]])
       
   563 
       
   564     def test_sum(self):
       
   565         trfunc = GlobTrFunc('sum', 0)
       
   566         res = trfunc.apply([[1], [2], [3], [4]])
       
   567         self.assertEqual(res, [[10]])
       
   568         trfunc = GlobTrFunc('sum', 1)
       
   569         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   570         self.assertEqual(res, [[1, 7], [2, 4], [3, 6]])
       
   571 
       
   572     def test_min(self):
       
   573         trfunc = GlobTrFunc('min', 0)
       
   574         res = trfunc.apply([[1], [2], [3], [4]])
       
   575         self.assertEqual(res, [[1]])
       
   576         trfunc = GlobTrFunc('min', 1)
       
   577         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   578         self.assertEqual(res, [[1, 2], [2, 4], [3, 6]])
       
   579 
       
   580     def test_max(self):
       
   581         trfunc = GlobTrFunc('max', 0)
       
   582         res = trfunc.apply([[1], [2], [3], [4]])
       
   583         self.assertEqual(res, [[4]])
       
   584         trfunc = GlobTrFunc('max', 1)
       
   585         res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
       
   586         self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
       
   587 
       
   588 
       
   589 class RQL2LDAPFilterTC(RQLGeneratorTC):
       
   590 
       
   591     tags = RQLGeneratorTC.tags | Tags(('ldap'))
       
   592 
       
   593     @property
       
   594     def schema(self):
       
   595         """return the application schema"""
       
   596         return self._schema
       
   597 
       
   598     def setUp(self):
       
   599         self.handler = get_test_db_handler(LDAPUserSourceTC.config)
       
   600         self.handler.build_db_cache('ldap-user', LDAPUserSourceTC.pre_setup_database)
       
   601         self.handler.restore_database('ldap-user')
       
   602         self._repo = repo = self.handler.get_repo()
       
   603         self._schema = repo.schema
       
   604         super(RQL2LDAPFilterTC, self).setUp()
       
   605         ldapsource = repo.sources[-1]
       
   606         self.cnxset = repo._get_cnxset()
       
   607         session = mock_object(cnxset=self.cnxset)
       
   608         self.o = RQL2LDAPFilter(ldapsource, session)
       
   609         self.ldapclasses = ''.join(ldapsource.base_filters)
       
   610 
       
   611     def tearDown(self):
       
   612         self._repo.turn_repo_off()
       
   613         super(RQL2LDAPFilterTC, self).tearDown()
       
   614 
       
   615     def test_base(self):
       
   616         rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
       
   617         self.assertEqual(self.o.generate(rqlst, 'X')[1],
       
   618                           '(&%s(uid=toto))' % self.ldapclasses)
       
   619 
       
   620     def test_kwargs(self):
       
   621         rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
       
   622         self.o._args = {'x': "toto"}
       
   623         self.assertEqual(self.o.generate(rqlst, 'X')[1],
       
   624                           '(&%s(uid=toto))' % self.ldapclasses)
       
   625 
       
   626     def test_get_attr(self):
       
   627         rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
       
   628         self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')
       
   629 
       
   630 
       
   631 if __name__ == '__main__':
       
   632     unittest_main()