--- a/server/sources/ldapuser.py Fri Jun 14 17:13:17 2013 +0200
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,551 +0,0 @@
-# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""cubicweb ldap user source
-
-this source is for now limited to a read-only CWUser source
-"""
-from __future__ import division, with_statement
-from base64 import b64decode
-
-import ldap
-from ldap.filter import escape_filter_chars
-
-from rql.nodes import Relation, VariableRef, Constant, Function
-
-import warnings
-from cubicweb import UnknownEid, RepositoryError
-from cubicweb.server import ldaputils
-from cubicweb.server.utils import cartesian_product
-from cubicweb.server.sources import (AbstractSource, TrFunc, GlobTrFunc,
- TimedCache)
-
-# search scopes
-BASE = ldap.SCOPE_BASE
-ONELEVEL = ldap.SCOPE_ONELEVEL
-SUBTREE = ldap.SCOPE_SUBTREE
-
-# map ldap protocol to their standard port
-PROTO_PORT = {'ldap': 389,
- 'ldaps': 636,
- 'ldapi': None,
- }
-
-
-# module is lazily imported
-warnings.warn('Imminent drop of ldapuser. Switch to ldapfeed now!',
- DeprecationWarning)
-
-
-class LDAPUserSource(ldaputils.LDAPSourceMixIn, AbstractSource):
- """LDAP read-only CWUser source"""
- support_entities = {'CWUser': False}
-
- options = ldaputils.LDAPSourceMixIn.options + (
-
- ('synchronization-interval',
- {'type' : 'time',
- 'default': '1d',
- 'help': 'interval between synchronization with the ldap \
-directory (default to once a day).',
- 'group': 'ldap-source', 'level': 3,
- }),
- ('cache-life-time',
- {'type' : 'time',
- 'default': '2h',
- 'help': 'life time of query cache (default to two hours).',
- 'group': 'ldap-source', 'level': 3,
- }),
-
- )
-
- def update_config(self, source_entity, typedconfig):
- """update configuration from source entity. `typedconfig` is config
- properly typed with defaults set
- """
- super(LDAPUserSource, self).update_config(source_entity, typedconfig)
- self._interval = typedconfig['synchronization-interval']
- self._cache_ttl = max(71, typedconfig['cache-life-time'])
- self.reset_caches()
- # XXX copy from datafeed source
- if source_entity is not None:
- self._entity_update(source_entity)
- self.config = typedconfig
- # /end XXX
-
- def reset_caches(self):
- """method called during test to reset potential source caches"""
- self._cache = {}
- self._query_cache = TimedCache(self._cache_ttl)
-
- def init(self, activated, source_entity):
- """method called by the repository once ready to handle request"""
- super(LDAPUserSource, self).init(activated, source_entity)
- if activated:
- self.info('ldap init')
- # set minimum period of 5min 1s (the additional second is to
- # minimize resonnance effet)
- if self.user_rev_attrs['email']:
- self.repo.looping_task(max(301, self._interval), self.synchronize)
- self.repo.looping_task(self._cache_ttl // 10,
- self._query_cache.clear_expired)
-
- def synchronize(self):
- with self.repo.internal_session() as session:
- self.pull_data(session)
-
- def pull_data(self, session, force=False, raise_on_error=False):
- """synchronize content known by this repository with content in the
- external repository
- """
- self.info('synchronizing ldap source %s', self.uri)
- ldap_emailattr = self.user_rev_attrs['email']
- assert ldap_emailattr
- execute = session.execute
- cursor = session.system_sql("SELECT eid, extid FROM entities WHERE "
- "source='%s'" % self.uri)
- for eid, b64extid in cursor.fetchall():
- extid = b64decode(b64extid)
- self.debug('ldap eid %s', eid)
- # if no result found, _search automatically delete entity information
- res = self._search(session, extid, BASE)
- self.debug('ldap search %s', res)
- if res:
- ldapemailaddr = res[0].get(ldap_emailattr)
- if ldapemailaddr:
- if isinstance(ldapemailaddr, list):
- ldapemailaddr = ldapemailaddr[0] # XXX consider only the first email in the list
- rset = execute('Any X,A WHERE '
- 'X address A, U use_email X, U eid %(u)s',
- {'u': eid})
- ldapemailaddr = unicode(ldapemailaddr)
- for emaileid, emailaddr, in rset:
- if emailaddr == ldapemailaddr:
- break
- else:
- self.debug('updating email address of user %s to %s',
- extid, ldapemailaddr)
- emailrset = execute('EmailAddress A WHERE A address %(addr)s',
- {'addr': ldapemailaddr})
- if emailrset:
- execute('SET U use_email X WHERE '
- 'X eid %(x)s, U eid %(u)s',
- {'x': emailrset[0][0], 'u': eid})
- elif rset:
- if not execute('SET X address %(addr)s WHERE '
- 'U primary_email X, U eid %(u)s',
- {'addr': ldapemailaddr, 'u': eid}):
- execute('SET X address %(addr)s WHERE '
- 'X eid %(x)s',
- {'addr': ldapemailaddr, 'x': rset[0][0]})
- else:
- # no email found, create it
- _insert_email(session, ldapemailaddr, eid)
- session.commit()
-
- def ldap_name(self, var):
- if var.stinfo['relations']:
- relname = iter(var.stinfo['relations']).next().r_type
- return self.user_rev_attrs.get(relname)
- return None
-
- def prepare_columns(self, mainvars, rqlst):
- """return two list describing how to build the final results
- from the result of an ldap search (ie a list of dictionary)
- """
- columns = []
- global_transforms = []
- for i, term in enumerate(rqlst.selection):
- if isinstance(term, Constant):
- columns.append(term)
- continue
- if isinstance(term, Function): # LOWER, UPPER, COUNT...
- var = term.get_nodes(VariableRef)[0]
- var = var.variable
- try:
- mainvar = var.stinfo['attrvar'].name
- except AttributeError: # no attrvar set
- mainvar = var.name
- assert mainvar in mainvars
- trname = term.name
- ldapname = self.ldap_name(var)
- if trname in ('COUNT', 'MIN', 'MAX', 'SUM'):
- global_transforms.append(GlobTrFunc(trname, i, ldapname))
- columns.append((mainvar, ldapname))
- continue
- if trname in ('LOWER', 'UPPER'):
- columns.append((mainvar, TrFunc(trname, i, ldapname)))
- continue
- raise NotImplementedError('no support for %s function' % trname)
- if term.name in mainvars:
- columns.append((term.name, 'dn'))
- continue
- var = term.variable
- mainvar = var.stinfo['attrvar'].name
- columns.append((mainvar, self.ldap_name(var)))
- #else:
- # # probably a bug in rql splitting if we arrive here
- # raise NotImplementedError
- return columns, global_transforms
-
- def syntax_tree_search(self, session, union,
- args=None, cachekey=None, varmap=None, debug=0):
- """return result from this source for a rql query (actually from a rql
- syntax tree and a solution dictionary mapping each used variable to a
- possible type). If cachekey is given, the query necessary to fetch the
- results (but not the results themselves) may be cached using this key.
- """
- self.debug('ldap syntax tree search')
- # XXX not handled : transform/aggregat function, join on multiple users...
- assert len(union.children) == 1, 'union not supported'
- rqlst = union.children[0]
- assert not rqlst.with_, 'subquery not supported'
- rqlkey = rqlst.as_string(kwargs=args)
- try:
- results = self._query_cache[rqlkey]
- except KeyError:
- try:
- results = self.rqlst_search(session, rqlst, args)
- self._query_cache[rqlkey] = results
- except ldap.SERVER_DOWN:
- # cant connect to server
- msg = session._("can't connect to source %s, some data may be missing")
- session.set_shared_data('sources_error', msg % self.uri, txdata=True)
- return []
- return results
-
- def rqlst_search(self, session, rqlst, args):
- mainvars = []
- for varname in rqlst.defined_vars:
- for sol in rqlst.solutions:
- if sol[varname] == 'CWUser':
- mainvars.append(varname)
- break
- assert mainvars, rqlst
- columns, globtransforms = self.prepare_columns(mainvars, rqlst)
- eidfilters = [lambda x: x > 0]
- allresults = []
- generator = RQL2LDAPFilter(self, session, args, mainvars)
- for mainvar in mainvars:
- # handle restriction
- try:
- eidfilters_, ldapfilter = generator.generate(rqlst, mainvar)
- except GotDN as ex:
- assert ex.dn, 'no dn!'
- try:
- res = [self._cache[ex.dn]]
- except KeyError:
- res = self._search(session, ex.dn, BASE)
- except UnknownEid as ex:
- # raised when we are looking for the dn of an eid which is not
- # coming from this source
- res = []
- else:
- eidfilters += eidfilters_
- res = self._search(session, self.user_base_dn,
- self.user_base_scope, ldapfilter)
- allresults.append(res)
- # 1. get eid for each dn and filter according to that eid if necessary
- for i, res in enumerate(allresults):
- filteredres = []
- for resdict in res:
- # get sure the entity exists in the system table
- eid = self.repo.extid2eid(self, resdict['dn'], 'CWUser', session)
- for eidfilter in eidfilters:
- if not eidfilter(eid):
- break
- else:
- resdict['eid'] = eid
- filteredres.append(resdict)
- allresults[i] = filteredres
- # 2. merge result for each "mainvar": cartesian product
- allresults = cartesian_product(allresults)
- # 3. build final result according to column definition
- result = []
- for rawline in allresults:
- rawline = dict(zip(mainvars, rawline))
- line = []
- for varname, ldapname in columns:
- if ldapname is None:
- value = None # no mapping available
- elif ldapname == 'dn':
- value = rawline[varname]['eid']
- elif isinstance(ldapname, Constant):
- if ldapname.type == 'Substitute':
- value = args[ldapname.value]
- else:
- value = ldapname.value
- elif isinstance(ldapname, TrFunc):
- value = ldapname.apply(rawline[varname])
- else:
- value = rawline[varname].get(ldapname)
- line.append(value)
- result.append(line)
- for trfunc in globtransforms:
- result = trfunc.apply(result)
- #print '--> ldap result', result
- return result
-
- def _process_ldap_item(self, dn, iterator):
- itemdict = super(LDAPUserSource, self)._process_ldap_item(dn, iterator)
- self._cache[dn] = itemdict
- return itemdict
-
- def _process_no_such_object(self, session, dn):
- eid = self.repo.extid2eid(self, dn, 'CWUser', session, insert=False)
- if eid:
- self.warning('deleting ldap user with eid %s and dn %s', eid, dn)
- entity = session.entity_from_eid(eid, 'CWUser')
- self.repo.delete_info(session, entity, self.uri)
- self.reset_caches()
-
- def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
- """called by the repository when an eid has been attributed for an
- entity stored here but the entity has not been inserted in the system
- table yet.
-
- This method must return the an Entity instance representation of this
- entity.
- """
- self.debug('ldap before entity insertion')
- entity = super(LDAPUserSource, self).before_entity_insertion(
- session, lid, etype, eid, sourceparams)
- res = self._search(session, lid, BASE)[0]
- for attr in entity.e_schema.indexable_attributes():
- entity.cw_edited[attr] = res[self.user_rev_attrs[attr]]
- return entity
-
- def after_entity_insertion(self, session, lid, entity, sourceparams):
- """called by the repository after an entity stored here has been
- inserted in the system table.
- """
- self.debug('ldap after entity insertion')
- super(LDAPUserSource, self).after_entity_insertion(
- session, lid, entity, sourceparams)
- for group in self.user_default_groups:
- session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s',
- {'x': entity.eid, 'group': group})
- # search for existant email first
- try:
- # lid = dn
- emailaddr = self._cache[lid][self.user_rev_attrs['email']]
- except KeyError:
- return
- if isinstance(emailaddr, list):
- emailaddr = emailaddr[0] # XXX consider only the first email in the list
- rset = session.execute('EmailAddress X WHERE X address %(addr)s',
- {'addr': emailaddr})
- if rset:
- session.execute('SET U primary_email X WHERE U eid %(u)s, X eid %(x)s',
- {'x': rset[0][0], 'u': entity.eid})
- else:
- # not found, create it
- _insert_email(session, emailaddr, entity.eid)
-
- def update_entity(self, session, entity):
- """replace an entity in the source"""
- raise RepositoryError('this source is read only')
-
- def delete_entity(self, session, entity):
- """delete an entity from the source"""
- raise RepositoryError('this source is read only')
-
-
-def _insert_email(session, emailaddr, ueid):
- session.execute('INSERT EmailAddress X: X address %(addr)s, U primary_email X '
- 'WHERE U eid %(x)s', {'addr': emailaddr, 'x': ueid})
-
-class GotDN(Exception):
- """exception used when a dn localizing the searched user has been found"""
- def __init__(self, dn):
- self.dn = dn
-
-
-class RQL2LDAPFilter(object):
- """generate an LDAP filter for a rql query"""
- def __init__(self, source, session, args=None, mainvars=()):
- self.source = source
- self.repo = source.repo
- self._ldap_attrs = source.user_rev_attrs
- self._base_filters = source.base_filters
- self._session = session
- if args is None:
- args = {}
- self._args = args
- self.mainvars = mainvars
-
- def generate(self, selection, mainvarname):
- self._filters = res = self._base_filters[:]
- self._mainvarname = mainvarname
- self._eidfilters = []
- self._done_not = set()
- restriction = selection.where
- if isinstance(restriction, Relation):
- # only a single relation, need to append result here (no AND/OR)
- filter = restriction.accept(self)
- if filter is not None:
- res.append(filter)
- elif restriction:
- restriction.accept(self)
- if len(res) > 1:
- return self._eidfilters, '(&%s)' % ''.join(res)
- return self._eidfilters, res[0]
-
- def visit_and(self, et):
- """generate filter for a AND subtree"""
- for c in et.children:
- part = c.accept(self)
- if part:
- self._filters.append(part)
-
- def visit_or(self, ou):
- """generate filter for a OR subtree"""
- res = []
- for c in ou.children:
- part = c.accept(self)
- if part:
- res.append(part)
- if res:
- if len(res) > 1:
- part = '(|%s)' % ''.join(res)
- else:
- part = res[0]
- self._filters.append(part)
-
- def visit_not(self, node):
- """generate filter for a OR subtree"""
- part = node.children[0].accept(self)
- if part:
- self._filters.append('(!(%s))'% part)
-
- def visit_relation(self, relation):
- """generate filter for a relation"""
- rtype = relation.r_type
- # don't care of type constraint statement (i.e. relation_type = 'is')
- if rtype == 'is':
- return ''
- lhs, rhs = relation.get_parts()
- # attribute relation
- if self.source.schema.rschema(rtype).final:
- # dunno what to do here, don't pretend anything else
- if lhs.name != self._mainvarname:
- if lhs.name in self.mainvars:
- # XXX check we don't have variable as rhs
- return
- raise NotImplementedError
- rhs_vars = rhs.get_nodes(VariableRef)
- if rhs_vars:
- if len(rhs_vars) > 1:
- raise NotImplementedError
- # selected variable, nothing to do here
- return
- # no variables in the RHS
- if isinstance(rhs.children[0], Function):
- res = rhs.children[0].accept(self)
- elif rtype != 'has_text':
- res = self._visit_attribute_relation(relation)
- else:
- raise NotImplementedError(relation)
- # regular relation XXX todo: in_group
- else:
- raise NotImplementedError(relation)
- return res
-
- def _visit_attribute_relation(self, relation):
- """generate filter for an attribute relation"""
- lhs, rhs = relation.get_parts()
- lhsvar = lhs.variable
- if relation.r_type == 'eid':
- # XXX hack
- # skip comparison sign
- eid = int(rhs.children[0].accept(self))
- if relation.neged(strict=True):
- self._done_not.add(relation.parent)
- self._eidfilters.append(lambda x: not x == eid)
- return
- if rhs.operator != '=':
- filter = {'>': lambda x: x > eid,
- '>=': lambda x: x >= eid,
- '<': lambda x: x < eid,
- '<=': lambda x: x <= eid,
- }[rhs.operator]
- self._eidfilters.append(filter)
- return
- dn = self.repo.eid2extid(self.source, eid, self._session)
- raise GotDN(dn)
- try:
- filter = '(%s%s)' % (self._ldap_attrs[relation.r_type],
- rhs.accept(self))
- except KeyError:
- # unsupported attribute
- self.source.warning('%s source can\'t handle relation %s, no '
- 'results will be returned from this source',
- self.source.uri, relation)
- raise UnknownEid # trick to return no result
- return filter
-
- def visit_comparison(self, cmp):
- """generate filter for a comparaison"""
- return '%s%s'% (cmp.operator, cmp.children[0].accept(self))
-
- def visit_mathexpression(self, mexpr):
- """generate filter for a mathematic expression"""
- raise NotImplementedError
-
- def visit_function(self, function):
- """generate filter name for a function"""
- if function.name == 'IN':
- return self.visit_in(function)
- raise NotImplementedError
-
- def visit_in(self, function):
- grandpapa = function.parent.parent
- ldapattr = self._ldap_attrs[grandpapa.r_type]
- res = []
- for c in function.children:
- part = c.accept(self)
- if part:
- res.append(part)
- if res:
- if len(res) > 1:
- part = '(|%s)' % ''.join('(%s=%s)' % (ldapattr, v) for v in res)
- else:
- part = '(%s=%s)' % (ldapattr, res[0])
- return part
-
- def visit_constant(self, constant):
- """generate filter name for a constant"""
- value = constant.value
- if constant.type is None:
- raise NotImplementedError
- if constant.type == 'Date':
- raise NotImplementedError
- #value = self.keyword_map[value]()
- elif constant.type == 'Substitute':
- value = self._args[constant.value]
- else:
- value = constant.value
- if isinstance(value, unicode):
- value = value.encode('utf8')
- else:
- value = str(value)
- return escape_filter_chars(value)
-
- def visit_variableref(self, variableref):
- """get the sql name for a variable reference"""
- pass
-
--- a/server/test/unittest_ldapsource.py Fri Jun 14 17:13:17 2013 +0200
+++ b/server/test/unittest_ldapsource.py Thu Jun 13 17:27:43 2013 +0200
@@ -33,7 +33,6 @@
from cubicweb.devtools.httptest import get_available_port
from cubicweb.devtools import get_test_db_handler
-from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter
CONFIG_LDAPFEED = u'''
user-base-dn=ou=People,dc=cubicweb,dc=test
@@ -429,373 +428,8 @@
self.setUpClass()
-class LDAPUserSourceTC(LDAPFeedTestBase):
- test_db_id = 'ldap-user'
- tags = CubicWebTC.tags | Tags(('ldap'))
-
- @classmethod
- def pre_setup_database(cls, session, config):
- session.create_entity('CWSource', name=u'ldap', type=u'ldapuser',
- url=URL, config=CONFIG_LDAPUSER)
- session.commit()
- # XXX keep it there
- session.execute('CWUser U')
-
- def assertMetadata(self, entity):
- self.assertEqual(entity.creation_date, None)
- self.assertEqual(entity.modification_date, None)
-
- def test_synchronize(self):
- source = self.repo.sources_by_uri['ldap']
- source.synchronize()
-
- def test_base(self):
- # check a known one
- rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- e = rset.get_entity(0, 0)
- self.assertEqual(e.login, 'syt')
- e.complete()
- self.assertMetadata(e)
- self.assertEqual(e.firstname, None)
- self.assertEqual(e.surname, None)
- self.assertEqual(e.in_group[0].name, 'users')
- self.assertEqual(e.owned_by[0].login, 'syt')
- self.assertEqual(e.created_by, ())
- addresses = [pe.address for pe in e.use_email]
- addresses.sort()
- # should habe two element but ldapuser seems buggy. It's going to be dropped anyway.
- self.assertEqual(['sylvain.thenault@logilab.fr',], # 'syt@logilab.fr'],
- addresses)
- self.assertIn(e.primary_email[0].address,
- ['sylvain.thenault@logilab.fr', 'syt@logilab.fr'])
- # email content should be indexed on the user
- rset = self.sexecute('CWUser X WHERE X has_text "thenault"')
- self.assertEqual(rset.rows, [[e.eid]])
-
- def test_not(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid)
- self.assert_(rset)
- self.assert_(not eid in (r[0] for r in rset))
-
- def test_multiple(self):
- seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
- rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s',
- {'syt': 'syt', 'adim': 'adim'})
- self.assertEqual(rset.rows, [[seid, aeid]])
- rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s',
- {'syt': 'syt', 'adim': 'adim'})
- self.assertEqual(rset.rows, [[seid, aeid, 'syt']])
-
- def test_in(self):
- seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0]
- rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim'))
- self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']])
-
- def test_relations(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E')
- self.assert_(eid in (r[0] for r in rset))
- rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E')
- self.assert_('syt' in (r[1] for r in rset))
-
- def test_count(self):
- nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0]
- # just check this is a possible number
- self.assert_(nbusers > 1, nbusers)
- self.assert_(nbusers < 30, nbusers)
-
- def test_upper(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid)
- self.assertEqual(rset[0][0], 'syt'.upper())
-
- def test_unknown_attr(self):
- eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0]
- rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, '
- 'X creation_date C, X modification_date M' % eid)
- self.assertEqual(rset[0][0], 'syt')
- self.assertEqual(rset[0][1], None)
- self.assertEqual(rset[0][2], None)
-
- def test_sort(self):
- logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')]
- self.assertEqual(logins, sorted(logins))
-
- def test_lower_sort(self):
- logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')]
- self.assertEqual(logins, sorted(logins))
-
- def test_or(self):
- rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")',
- {'login': 'syt'})
- self.assertEqual(len(rset), 2, rset.rows) # syt + admin
-
- def test_nonregr_set_owned_by(self):
- # test that when a user coming from ldap is triggering a transition
- # the related TrInfo has correct owner information
- self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
- self.commit()
- syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0)
- self.assertEqual([g.name for g in syt.in_group], ['managers', 'users'])
- cnx = self.login('syt', password='syt')
- cu = cnx.cursor()
- adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
- iworkflowable = adim.cw_adapt_to('IWorkflowable')
- iworkflowable.fire_transition('deactivate')
- try:
- cnx.commit()
- adim.cw_clear_all_caches()
- self.assertEqual(adim.in_state[0].name, 'deactivated')
- trinfo = iworkflowable.latest_trinfo()
- self.assertEqual(trinfo.owned_by[0].login, 'syt')
- # select from_state to skip the user's creation TrInfo
- rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,'
- 'WF creation_date D, WF from_state FS,'
- 'WF owned_by U?, X eid %(x)s',
- {'x': adim.eid})
- self.assertEqual(rset.rows, [[syt.eid]])
- finally:
- # restore db state
- self.restore_connection()
- adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0)
- adim.cw_adapt_to('IWorkflowable').fire_transition('activate')
- self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'})
-
- def test_same_column_names(self):
- self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')
-
- def test_multiple_entities_from_different_sources(self):
- req = self.request()
- self.create_user(req, 'cochon')
- self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
-
- def test_exists1(self):
- self.session.set_cnxset()
- self.session.create_entity('CWGroup', name=u'bougloup1')
- self.session.create_entity('CWGroup', name=u'bougloup2')
- self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"')
- self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'})
- rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, '
- 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")')
- self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']])
-
- def test_exists2(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, '
- '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))')
- self.assertEqual(rset.rows, [['managers'], ['users']])
-
- def test_exists3(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"'))
- self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
- self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'}))
- rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" '
- 'OR EXISTS(X copain T, T login in ("comme", "cochon"))')
- self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']])
-
- def test_exists4(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon', groups=('users', 'guests'))
- self.create_user(req, 'billy')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
- self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'})
- # search for group name, login where
- # CWUser copain with "comme" or "cochon" AND same login as the copain
- # OR
- # CWUser in_state activated AND not copain with billy
- #
- # SO we expect everybody but "comme" and "syt"
- rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
- 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR '
- 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")')
- all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN')
- all.rows.remove(['users', 'comme'])
- all.rows.remove(['users', 'syt'])
- self.assertEqual(sorted(rset.rows), sorted(all.rows))
-
- def test_exists5(self):
- req = self.request()
- self.create_user(req, 'comme')
- self.create_user(req, 'cochon', groups=('users', 'guests'))
- self.create_user(req, 'billy')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"')
- self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"')
- self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})
- rset= self.sexecute('Any L WHERE X login L, '
- 'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
- 'NOT EXISTS(X copain T2, T2 login "billy")')
- self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']])
- rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, '
- 'EXISTS(X copain T, T login in ("comme", "cochon")) AND '
- 'NOT EXISTS(X copain T2, T2 login "billy")')
- self.assertEqual(sorted(rset.rows), [['guests', 'cochon'],
- ['users', 'cochon'],
- ['users', 'syt']])
-
- def test_cd_restriction(self):
- rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"')
- # admin/anon but no ldap user since it doesn't support creation_date
- self.assertEqual(sorted(e.login for e in rset.entities()),
- ['admin', 'anon'])
-
- def test_union(self):
- afeids = self.sexecute('State X')
- ueids = self.sexecute('CWUser X')
- rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)')
- self.assertEqual(sorted(r[0] for r in rset.rows),
- sorted(r[0] for r in afeids + ueids))
-
- def _init_security_test(self):
- req = self.request()
- self.create_user(req, 'iaminguestsgrouponly', groups=('guests',))
- cnx = self.login('iaminguestsgrouponly')
- return cnx.cursor()
-
- def test_security1(self):
- cu = self._init_security_test()
- rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'})
- self.assertEqual(rset.rows, [])
- rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"')
- self.assertEqual(len(rset.rows), 1)
-
- def test_security2(self):
- cu = self._init_security_test()
- rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'})
- self.assertEqual(rset.rows, [])
- rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"')
- self.assertEqual(len(rset.rows), 1)
-
- def test_security3(self):
- cu = self._init_security_test()
- rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'})
- self.assertEqual(rset.rows, [])
- rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F')
- self.assertEqual(rset.rows, [[None]])
-
- def test_nonregr1(self):
- self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, '
- 'X modification_date AA',
- {'x': self.session.user.eid})
-
- def test_nonregr2(self):
- self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, '
- 'X login L, X modification_date AA',
- {'x': self.session.user.eid})
-
- def test_nonregr3(self):
- self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, '
- 'X modification_date AA',
- {'x': self.session.user.eid})
-
- def test_nonregr4(self):
- emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0]
- self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA',
- {'x': emaileid})
-
- def test_nonregr5(self):
- # original jpl query:
- # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser,
- # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5
- rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, '
- 'U login "%s", P is X, X creation_date CD') % self.session.user.login
- self.sexecute(rql, )#{'x': })
-
- def test_nonregr6(self):
- self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File '
- 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) '
- 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) '
- 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)',
- {'x': self.session.user.eid})
-
-class GlobTrFuncTC(TestCase):
-
- def test_count(self):
- trfunc = GlobTrFunc('count', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[4]])
- trfunc = GlobTrFunc('count', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 2], [2, 1], [3, 1]])
-
- def test_sum(self):
- trfunc = GlobTrFunc('sum', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[10]])
- trfunc = GlobTrFunc('sum', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 7], [2, 4], [3, 6]])
-
- def test_min(self):
- trfunc = GlobTrFunc('min', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[1]])
- trfunc = GlobTrFunc('min', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 2], [2, 4], [3, 6]])
-
- def test_max(self):
- trfunc = GlobTrFunc('max', 0)
- res = trfunc.apply([[1], [2], [3], [4]])
- self.assertEqual(res, [[4]])
- trfunc = GlobTrFunc('max', 1)
- res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]])
- self.assertEqual(res, [[1, 5], [2, 4], [3, 6]])
-class RQL2LDAPFilterTC(RQLGeneratorTC):
-
- tags = RQLGeneratorTC.tags | Tags(('ldap'))
-
- @property
- def schema(self):
- """return the application schema"""
- return self._schema
-
- def setUp(self):
- self.handler = get_test_db_handler(LDAPUserSourceTC.config)
- self.handler.build_db_cache('ldap-rqlgenerator', LDAPUserSourceTC.pre_setup_database)
- self.handler.restore_database('ldap-rqlgenerator')
- self._repo = repo = self.handler.get_repo()
- self._schema = repo.schema
- super(RQL2LDAPFilterTC, self).setUp()
- ldapsource = repo.sources[-1]
- self.cnxset = repo._get_cnxset()
- session = mock_object(cnxset=self.cnxset)
- self.o = RQL2LDAPFilter(ldapsource, session)
- self.ldapclasses = ''.join(ldapsource.base_filters)
-
- def tearDown(self):
- self._repo.turn_repo_off()
- super(RQL2LDAPFilterTC, self).tearDown()
-
- def test_base(self):
- rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0]
- self.assertEqual(self.o.generate(rqlst, 'X')[1],
- '(&%s(uid=toto))' % self.ldapclasses)
-
- def test_kwargs(self):
- rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0]
- self.o._args = {'x': "toto"}
- self.assertEqual(self.o.generate(rqlst, 'X')[1],
- '(&%s(uid=toto))' % self.ldapclasses)
-
- def test_get_attr(self):
- rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0]
- self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E')
if __name__ == '__main__':