server/test/unittest_ldapsource.py
author Pierre-Yves David <pierre-yves.david@logilab.fr>
Mon, 22 Apr 2013 17:57:46 +0200
changeset 8934 48a6e6b88b16
parent 8922 715b9eec6da9
child 8959 69a78922114b
permissions -rw-r--r--
[session] deprecate `hijack_user` method The semantic of this method is wicked and lead to very hard violation of sanity. (multiple transaction using the very same cursors). We deprecated the API to be able to drop it as soon as possible. The method was added long ago for some notification trick. It is not needed since we sent notification on ``postcommit_event``. (closes #2781782)

# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""cubicweb.server.sources.ldapusers unit and functional tests"""

import os
import sys
import shutil
import time
from os.path import join, exists
import subprocess
import tempfile

from logilab.common.testlib import TestCase, unittest_main, mock_object, Tags

from cubicweb import AuthenticationError
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import RQLGeneratorTC
from cubicweb.devtools.httptest import get_available_port
from cubicweb.devtools import get_test_db_handler

from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter

CONFIG_LDAPFEED = u'''
user-base-dn=ou=People,dc=cubicweb,dc=test
group-base-dn=ou=Group,dc=cubicweb,dc=test
user-attrs-map=uid=login,mail=email,userPassword=upassword
group-attrs-map=cn=name,memberUid=member
'''
CONFIG_LDAPUSER = u'''
user-base-dn=ou=People,dc=cubicweb,dc=test
user-attrs-map=uid=login,mail=email,userPassword=upassword
'''

URL = None

def create_slapd_configuration(cls):
    global URL
    slapddir = tempfile.mkdtemp('cw-unittest-ldap')
    config = cls.config
    slapdconf = join(config.apphome, "slapd.conf")
    confin = file(join(config.apphome, "slapd.conf.in")).read()
    confstream = file(slapdconf, 'w')
    confstream.write(confin % {'apphome': config.apphome, 'testdir': slapddir})
    confstream.close()
    # fill ldap server with some data
    ldiffile = join(config.apphome, "ldap_test.ldif")
    config.info('Initing ldap database')
    cmdline = ['/usr/sbin/slapadd', '-f', slapdconf, '-l', ldiffile, '-c']
    PIPE = subprocess.PIPE
    slapproc = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
    stdout, stderr = slapproc.communicate()
    if slapproc.returncode:
        print >> sys.stderr, ('slapadd returned with status: %s'
                              % slapproc.returncode)
        sys.stdout.write(stdout)
        sys.stderr.write(stderr)

    #ldapuri = 'ldapi://' + join(basedir, "ldapi").replace('/', '%2f')
    port = get_available_port(xrange(9000, 9100))
    host = 'localhost:%s' % port
    ldapuri = 'ldap://%s' % host
    cmdline = ["/usr/sbin/slapd", "-f",  slapdconf,  "-h",  ldapuri, "-d", "0"]
    config.info('Starting slapd:', ' '.join(cmdline))
    PIPE = subprocess.PIPE
    cls.slapd_process = subprocess.Popen(cmdline, stdout=PIPE, stderr=PIPE)
    time.sleep(0.2)
    if cls.slapd_process.poll() is None:
        config.info('slapd started with pid %s', cls.slapd_process.pid)
    else:
        raise EnvironmentError('Cannot start slapd with cmdline="%s" (from directory "%s")' %
                               (" ".join(cmdline), os.getcwd()))
    URL = u'ldap://%s' % host
    return slapddir

def terminate_slapd(cls):
    config = cls.config
    if cls.slapd_process and cls.slapd_process.returncode is None:
        config.info('terminating slapd')
        if hasattr(cls.slapd_process, 'terminate'):
            cls.slapd_process.terminate()
        else:
            import os, signal
            os.kill(cls.slapd_process.pid, signal.SIGTERM)
        stdout, stderr = cls.slapd_process.communicate()
        if cls.slapd_process.returncode:
            print >> sys.stderr, ('slapd returned with status: %s'
                                  % cls.slapd_process.returncode)
            sys.stdout.write(stdout)
            sys.stderr.write(stderr)
        config.info('DONE')


class LDAPFeedTestBase(CubicWebTC):
    test_db_id = 'ldap-feed'
    loglevel = 'ERROR'

    @classmethod
    def setUpClass(cls):
        from cubicweb.cwctl import init_cmdline_log_threshold
        init_cmdline_log_threshold(cls.config, cls.loglevel)
        cls._tmpdir = create_slapd_configuration(cls)

    @classmethod
    def tearDownClass(cls):
        terminate_slapd(cls)
        try:
            shutil.rmtree(cls._tmpdir)
        except:
            pass

    @classmethod
    def pre_setup_database(cls, session, config):
        session.create_entity('CWSource', name=u'ldap', type=u'ldapfeed', parser=u'ldapfeed',
                              url=URL, config=CONFIG_LDAPFEED)

        session.commit()
        return cls._pull(session)

    @classmethod
    def _pull(cls, session):
        with session.repo.internal_session() as isession:
            lfsource = isession.repo.sources_by_uri['ldap']
            stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
            isession.commit()
            return stats

    def pull(self):
        return self._pull(self.session)

    def setup_database(self):
        if self.test_db_id == 'ldap-feed':
            with self.session.repo.internal_session(safe=True) as session:
                session.execute('DELETE Any E WHERE E cw_source S, S name "ldap"')
                session.commit()
        if self.test_db_id == 'ldap-feed':
            src = self.sexecute('CWSource S WHERE S name "ldap"').get_entity(0,0)
            src.cw_set(config=CONFIG_LDAPFEED)
        self.session.commit()
        self.pull()

    def delete_ldap_entry(self, dn):
        """
        delete an LDAP entity
        """
        modcmd = ['dn: %s'%dn, 'changetype: delete']
        self._ldapmodify(modcmd)

    def update_ldap_entry(self, dn, mods):
        """
        modify one or more attributes of an LDAP entity
        """
        modcmd = ['dn: %s'%dn, 'changetype: modify']
        for (kind, key), values in mods.iteritems():
            modcmd.append('%s: %s' % (kind, key))
            if isinstance(values, basestring):
                values = [values]
            for value in values:
                modcmd.append('%s: %s'%(key, value))
            modcmd.append('-')
        self._ldapmodify(modcmd)

    def _ldapmodify(self, modcmd):
        uri = self.repo.sources_by_uri['ldap'].urls[0]
        updatecmd = ['ldapmodify', '-H', uri, '-v', '-x', '-D',
                     'cn=admin,dc=cubicweb,dc=test', '-w', 'cw']
        PIPE = subprocess.PIPE
        p = subprocess.Popen(updatecmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
        p.stdin.write('\n'.join(modcmd))
        p.stdin.close()
        if p.wait():
            raise RuntimeError("ldap update failed: %s"%('\n'.join(p.stderr.readlines())))

class CheckWrongGroup(LDAPFeedTestBase):
    """
    A testcase for situations where the default group for CWUser
    created from LDAP is wrongly configured.
    """

    def test_wrong_group(self):
        with self.session.repo.internal_session(safe=True) as session:
            source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
            config = source.repo_source.check_config(source)
            # inject a bogus group here, along with at least a valid one
            config['user-default-group'] = ('thisgroupdoesnotexists','users')
            source.repo_source.update_config(source, config)
            session.commit(free_cnxset=False)
            # here we emitted an error log entry
            stats = source.repo_source.pull_data(session, force=True, raise_on_error=True)
            session.commit()



class LDAPFeedUserTC(LDAPFeedTestBase):
    """
    A testcase for CWUser support in ldapfeed (basic tests and authentication).
    """

    def assertMetadata(self, entity):
        self.assertTrue(entity.creation_date)
        self.assertTrue(entity.modification_date)

    def test_authenticate(self):
        source = self.repo.sources_by_uri['ldap']
        self.session.set_cnxset()
        # ensure we won't be logged against
        self.assertRaises(AuthenticationError,
                          source.authenticate, self.session, 'toto', 'toto')
        self.assertTrue(source.authenticate(self.session, 'syt', 'syt'))
        self.assertTrue(self.repo.connect('syt', password='syt'))

    def test_base(self):
        # check a known one
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        e = rset.get_entity(0, 0)
        self.assertEqual(e.login, 'syt')
        e.complete()
        self.assertMetadata(e)
        self.assertEqual(e.firstname, None)
        self.assertEqual(e.surname, None)
        self.assertTrue('users' in [g.name for g in e.in_group])
        self.assertEqual(e.owned_by[0].login, 'syt')
        self.assertEqual(e.created_by, ())
        addresses = [pe.address for pe in e.use_email]
        addresses.sort()
        self.assertEqual(['sylvain.thenault@logilab.fr', 'syt@logilab.fr'],
                         addresses)
        self.assertIn(e.primary_email[0].address, ['sylvain.thenault@logilab.fr',
                                                   'syt@logilab.fr'])
        # email content should be indexed on the user
        rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
        self.assertEqual(rset.rows, [[e.eid]])

    def test_copy_to_system_source(self):
        "make sure we can 'convert' an LDAP user into a system one"
        source = self.repo.sources_by_uri['ldap']
        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': eid})
        self.commit()
        source.reset_caches()
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        self.assertEqual(len(rset), 1)
        e = rset.get_entity(0, 0)
        self.assertEqual(e.eid, eid)
        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
                                                             'uri': u'system',
                                                             'use-cwuri-as-url': False},
                                                  'type': 'CWUser',
                                                  'extid': None})
        self.assertEqual(e.cw_source[0].name, 'system')
        self.assertTrue(e.creation_date)
        self.assertTrue(e.modification_date)
        source.pull_data(self.session)
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        self.assertEqual(len(rset), 1)
        self.assertTrue(self.repo.system_source.authenticate(
                self.session, 'syt', password='syt'))
        # make sure the pull from ldap have not "reverted" user as a ldap-feed user
        self.assertEqual(e.cw_metainformation(), {'source': {'type': u'native',
                                                             'uri': u'system',
                                                             'use-cwuri-as-url': False},
                                                  'type': 'CWUser',
                                                  'extid': None})
        # and that the password stored in the system source is not empty or so
        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
        user.cw_clear_all_caches()
        pwd = self.session.system_sql("SELECT cw_upassword FROM cw_cwuser WHERE cw_login='syt';").fetchall()[0][0]
        self.assertIsNotNone(pwd)
        self.assertTrue(str(pwd))



class LDAPFeedUserDeletionTC(LDAPFeedTestBase):
    """
    A testcase for situations where users are deleted from or
    unavailabe in the LDAP database.
    """
    def test_a_filter_inactivate(self):
        """ filtered out people should be deactivated, unable to authenticate """
        source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
        config = source.repo_source.check_config(source)
        # filter with adim's phone number
        config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
        source.repo_source.update_config(source, config)
        self.commit()
        self.pull()
        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'deactivated')
        self.assertEqual(self.execute('Any N WHERE U login "adim", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')
        # unfilter, syt should be activated again
        config['user-filter'] = u''
        source.repo_source.update_config(source, config)
        self.commit()
        self.pull()
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')
        self.assertEqual(self.execute('Any N WHERE U login "adim", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')

    def test_delete(self):
        """ delete syt, pull, check deactivation, repull,
        read syt, pull, check activation
        """
        self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
        self.pull()
        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'deactivated')
        # check that it doesn't choke
        self.pull()
        # reset the ldap database
        self.tearDownClass()
        self.setUpClass()
        self.pull()
        self.assertEqual(self.execute('Any N WHERE U login "syt", '
                                      'U in_state S, S name N').rows[0][0],
                         'activated')

    def test_reactivate_deleted(self):
        # test reactivating BY HAND the user isn't enough to
        # authenticate, as the native source refuse to authenticate
        # user from other sources
        self.delete_ldap_entry('uid=syt,ou=People,dc=cubicweb,dc=test')
        self.pull()
        # reactivate user (which source is still ldap-feed)
        user = self.execute('CWUser U WHERE U login "syt"').get_entity(0, 0)
        user.cw_adapt_to('IWorkflowable').fire_transition('activate')
        self.commit()
        with self.assertRaises(AuthenticationError):
            self.repo.connect('syt', password='syt')

        # ok now let's try to make it a system user
        self.sexecute('SET X cw_source S WHERE X eid %(x)s, S name "system"', {'x': user.eid})
        self.commit()
        # and that we can now authenticate again
        self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='toto')
        self.assertTrue(self.repo.connect('syt', password='syt'))

class LDAPFeedGroupTC(LDAPFeedTestBase):
    """
    A testcase for group support in ldapfeed.
    """

    def test_groups_exist(self):
        rset = self.sexecute('CWGroup X WHERE X name "dir"')
        self.assertEqual(len(rset), 1)

        rset = self.sexecute('CWGroup X WHERE X cw_source S, S name "ldap"')
        self.assertEqual(len(rset), 2)

    def test_group_deleted(self):
        rset = self.sexecute('CWGroup X WHERE X name "dir"')
        self.assertEqual(len(rset), 1)

    def test_in_group(self):
        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'dir'})
        dirgroup = rset.get_entity(0, 0)
        self.assertEqual(set(['syt', 'adim']),
                         set([u.login for u in dirgroup.reverse_in_group]))
        rset = self.sexecute('CWGroup X WHERE X name %(name)s', {'name': 'logilab'})
        logilabgroup = rset.get_entity(0, 0)
        self.assertEqual(set(['adim']),
                         set([u.login for u in logilabgroup.reverse_in_group]))

    def test_group_member_added(self):
        self.pull()
        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                             {'name': 'logilab'})
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][0], 'adim')

        try:
            self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                   {('add', 'memberUid'): ['syt']})
            time.sleep(1.1) # timestamps precision is 1s
            self.pull()

            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                                 {'name': 'logilab'})
            self.assertEqual(len(rset), 2)
            self.assertEqual(rset[0][0], 'adim')
            self.assertEqual(rset[1][0], 'syt')

        finally:
            # back to normal ldap setup
            self.tearDownClass()
            self.setUpClass()

    def test_group_member_deleted(self):
        self.pull() # ensure we are sync'ed
        rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                             {'name': 'logilab'})
        self.assertEqual(len(rset), 1)
        self.assertEqual(rset[0][0], 'adim')

        try:
            self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                   {('delete', 'memberUid'): ['adim']})
            time.sleep(1.1) # timestamps precision is 1s
            self.pull()

            rset = self.sexecute('Any L WHERE U in_group G, G name %(name)s, U login L',
                                 {'name': 'logilab'})
            self.assertEqual(len(rset), 0)
        finally:
            # back to normal ldap setup
            self.tearDownClass()
            self.setUpClass()


class LDAPUserSourceTC(LDAPFeedTestBase):
    test_db_id = 'ldap-user'
    tags = CubicWebTC.tags | Tags(('ldap'))

    @classmethod
    def pre_setup_database(cls, session, config):
        session.create_entity('CWSource', name=u'ldap', type=u'ldapuser',
                              url=URL, config=CONFIG_LDAPUSER)
        session.commit()
        # XXX keep it there
        session.execute('CWUser U')

    def assertMetadata(self, entity):
        self.assertEqual(entity.creation_date, None)
        self.assertEqual(entity.modification_date, None)

    def test_synchronize(self):
        source = self.repo.sources_by_uri['ldap']
        source.synchronize()

    def test_base(self):
        # check a known one
        rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        e = rset.get_entity(0, 0)
        self.assertEqual(e.login, 'syt')
        e.complete()
        self.assertMetadata(e)
        self.assertEqual(e.firstname, None)
        self.assertEqual(e.surname, None)
        self.assertEqual(e.in_group[0].name, 'users')
        self.assertEqual(e.owned_by[0].login, 'syt')
        self.assertEqual(e.created_by, ())
        addresses = [pe.address for pe in e.use_email]
        addresses.sort()
        # should habe two element but ldapuser seems buggy. It's going to be dropped anyway.
        self.assertEqual(['sylvain.thenault@logilab.fr',], # 'syt@logilab.fr'],
                         addresses)
        self.assertIn(e.primary_email[0].address,
                      ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'])
        # email content should be indexed on the user
        rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
        self.assertEqual(rset.rows, [[e.eid]])

    def test_not(self):
        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
        self.assert_(rset)
        self.assert_(not eid in (r[0] for r in rset))

    def test_multiple(self):
        seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
        rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',
                            {'syt': 'syt', 'adim': 'adim'})
        self.assertEqual(rset.rows, [[seid, aeid]])
        rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',
                            {'syt': 'syt', 'adim': 'adim'})
        self.assertEqual(rset.rows, [[seid, aeid, 'syt']])

    def test_in(self):
        seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
        rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim'))
        self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']])

    def test_relations(self):
        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
        self.assert_(eid in (r[0] for r in rset))
        rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
        self.assert_('syt' in (r[1] for r in rset))

    def test_count(self):
        nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
        # just check this is a possible number
        self.assert_(nbusers > 1, nbusers)
        self.assert_(nbusers < 30, nbusers)

    def test_upper(self):
        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
        self.assertEqual(rset[0][0], 'syt'.upper())

    def test_unknown_attr(self):
        eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
        rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
                            'X creation_date C, X modification_date M' % eid)
        self.assertEqual(rset[0][0], 'syt')
        self.assertEqual(rset[0][1], None)
        self.assertEqual(rset[0][2], None)

    def test_sort(self):
        logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
        self.assertEqual(logins, sorted(logins))

    def test_lower_sort(self):
        logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
        self.assertEqual(logins, sorted(logins))

    def test_or(self):
        rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',
                            {'login': 'syt'})
        self.assertEqual(len(rset), 2, rset.rows) # syt + admin

    def test_nonregr_set_owned_by(self):
        # test that when a user coming from ldap is triggering a transition
        # the related TrInfo has correct owner information
        self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
        self.commit()
        syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0)
        self.assertEqual([g.name for g in syt.in_group], ['managers', 'users'])
        cnx = self.login('syt', password='syt')
        cu = cnx.cursor()
        adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
        iworkflowable = adim.cw_adapt_to('IWorkflowable')
        iworkflowable.fire_transition('deactivate')
        try:
            cnx.commit()
            adim.cw_clear_all_caches()
            self.assertEqual(adim.in_state[0].name, 'deactivated')
            trinfo = iworkflowable.latest_trinfo()
            self.assertEqual(trinfo.owned_by[0].login, 'syt')
            # select from_state to skip the user's creation TrInfo
            rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
                                'WF creation_date D, WF from_state FS,'
                                'WF owned_by U?, X eid %(x)s',
                                {'x': adim.eid})
            self.assertEqual(rset.rows, [[syt.eid]])
        finally:
            # restore db state
            self.restore_connection()
            adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
            adim.cw_adapt_to('IWorkflowable').fire_transition('activate')
            self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})

    def test_same_column_names(self):
        self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')

    def test_multiple_entities_from_different_sources(self):
        req = self.request()
        self.create_user(req, 'cochon')
        self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))

    def test_exists1(self):
        self.session.set_cnxset()
        self.session.create_entity('CWGroup', name=u'bougloup1')
        self.session.create_entity('CWGroup', name=u'bougloup2')
        self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
        self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'})
        rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
                             'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
        self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']])

    def test_exists2(self):
        req = self.request()
        self.create_user(req, 'comme')
        self.create_user(req, 'cochon')
        self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
        rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
                             '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
        self.assertEqual(rset.rows, [['managers'], ['users']])

    def test_exists3(self):
        req = self.request()
        self.create_user(req, 'comme')
        self.create_user(req, 'cochon')
        self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
        self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
        self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
        self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
        rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
                             'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
        self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])

    def test_exists4(self):
        req = self.request()
        self.create_user(req, 'comme')
        self.create_user(req, 'cochon', groups=('users', 'guests'))
        self.create_user(req, 'billy')
        self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
        self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
        self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
        self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'})
        # search for group name, login where
        #   CWUser copain with "comme" or "cochon" AND same login as the copain
        # OR
        #   CWUser in_state activated AND not copain with billy
        #
        # SO we expect everybody but "comme" and "syt"
        rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
                           'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
                           'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
        all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
        all.rows.remove(['users', 'comme'])
        all.rows.remove(['users', 'syt'])
        self.assertEqual(sorted(rset.rows), sorted(all.rows))

    def test_exists5(self):
        req = self.request()
        self.create_user(req, 'comme')
        self.create_user(req, 'cochon', groups=('users', 'guests'))
        self.create_user(req, 'billy')
        self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
        self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
        self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
        self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
        rset= self.sexecute('Any L WHERE X login L, '
                           'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
                           'NOT EXISTS(X copain T2, T2 login "billy")')
        self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']])
        rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
                           'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
                           'NOT EXISTS(X copain T2, T2 login "billy")')
        self.assertEqual(sorted(rset.rows), [['guests', 'cochon'],
                                              ['users', 'cochon'],
                                              ['users', 'syt']])

    def test_cd_restriction(self):
        rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
        # admin/anon but no ldap user since it doesn't support creation_date
        self.assertEqual(sorted(e.login for e in rset.entities()),
                          ['admin', 'anon'])

    def test_union(self):
        afeids = self.sexecute('State X')
        ueids = self.sexecute('CWUser X')
        rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
        self.assertEqual(sorted(r[0] for r in rset.rows),
                          sorted(r[0] for r in afeids + ueids))

    def _init_security_test(self):
        req = self.request()
        self.create_user(req, 'iaminguestsgrouponly', groups=('guests',))
        cnx = self.login('iaminguestsgrouponly')
        return cnx.cursor()

    def test_security1(self):
        cu = self._init_security_test()
        rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
        self.assertEqual(rset.rows, [])
        rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
        self.assertEqual(len(rset.rows), 1)

    def test_security2(self):
        cu = self._init_security_test()
        rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'})
        self.assertEqual(rset.rows, [])
        rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
        self.assertEqual(len(rset.rows), 1)

    def test_security3(self):
        cu = self._init_security_test()
        rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
        self.assertEqual(rset.rows, [])
        rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
        self.assertEqual(rset.rows, [[None]])

    def test_nonregr1(self):
        self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
                     'X modification_date AA',
                     {'x': self.session.user.eid})

    def test_nonregr2(self):
        self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
                     'X login L, X modification_date AA',
                     {'x': self.session.user.eid})

    def test_nonregr3(self):
        self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
                     'X modification_date AA',
                     {'x': self.session.user.eid})

    def test_nonregr4(self):
        emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
        self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
                     {'x': emaileid})

    def test_nonregr5(self):
        # original jpl query:
        # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
        # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
        rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
               'U login "%s", P is X, X creation_date CD') % self.session.user.login
        self.sexecute(rql, )#{'x': })

    def test_nonregr6(self):
        self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
                     'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
                     'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
                     'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
                     {'x': self.session.user.eid})

class GlobTrFuncTC(TestCase):

    def test_count(self):
        trfunc = GlobTrFunc('count', 0)
        res = trfunc.apply([[1], [2], [3], [4]])
        self.assertEqual(res, [[4]])
        trfunc = GlobTrFunc('count', 1)
        res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
        self.assertEqual(res, [[1, 2], [2, 1], [3, 1]])

    def test_sum(self):
        trfunc = GlobTrFunc('sum', 0)
        res = trfunc.apply([[1], [2], [3], [4]])
        self.assertEqual(res, [[10]])
        trfunc = GlobTrFunc('sum', 1)
        res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
        self.assertEqual(res, [[1, 7], [2, 4], [3, 6]])

    def test_min(self):
        trfunc = GlobTrFunc('min', 0)
        res = trfunc.apply([[1], [2], [3], [4]])
        self.assertEqual(res, [[1]])
        trfunc = GlobTrFunc('min', 1)
        res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
        self.assertEqual(res, [[1, 2], [2, 4], [3, 6]])

    def test_max(self):
        trfunc = GlobTrFunc('max', 0)
        res = trfunc.apply([[1], [2], [3], [4]])
        self.assertEqual(res, [[4]])
        trfunc = GlobTrFunc('max', 1)
        res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
        self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])


class RQL2LDAPFilterTC(RQLGeneratorTC):

    tags = RQLGeneratorTC.tags | Tags(('ldap'))

    @property
    def schema(self):
        """return the application schema"""
        return self._schema

    def setUp(self):
        self.handler = get_test_db_handler(LDAPUserSourceTC.config)
        self.handler.build_db_cache('ldap-rqlgenerator', LDAPUserSourceTC.pre_setup_database)
        self.handler.restore_database('ldap-rqlgenerator')
        self._repo = repo = self.handler.get_repo()
        self._schema = repo.schema
        super(RQL2LDAPFilterTC, self).setUp()
        ldapsource = repo.sources[-1]
        self.cnxset = repo._get_cnxset()
        session = mock_object(cnxset=self.cnxset)
        self.o = RQL2LDAPFilter(ldapsource, session)
        self.ldapclasses = ''.join(ldapsource.base_filters)

    def tearDown(self):
        self._repo.turn_repo_off()
        super(RQL2LDAPFilterTC, self).tearDown()

    def test_base(self):
        rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
        self.assertEqual(self.o.generate(rqlst, 'X')[1],
                          '(&%s(uid=toto))' % self.ldapclasses)

    def test_kwargs(self):
        rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
        self.o._args = {'x': "toto"}
        self.assertEqual(self.o.generate(rqlst, 'X')[1],
                          '(&%s(uid=toto))' % self.ldapclasses)

    def test_get_attr(self):
        rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
        self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')


if __name__ == '__main__':
    unittest_main()