# HG changeset patch # User Pierre-Yves David # Date 1371137263 -7200 # Node ID 65b8236e1bb4431263215727955ab21817119fdc # Parent dfa4da8a53a08c02c94c9759bb3b04813b286c4e [sources] drop support for ldapuser source (closes #2936496) The ldapfeed source is a replacement for ldapuser. Use it instead. diff -r dfa4da8a53a0 -r 65b8236e1bb4 doc/4.0.rst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/4.0.rst Thu Jun 13 17:27:43 2013 +0200 @@ -0,0 +1,8 @@ +What's new in CubicWeb 4.0? +============================ + +Deprecated Code Drops +---------------------- + +* The ldapuser source has been dropped. ldapfeed is the only official source + remaining for ldap. diff -r dfa4da8a53a0 -r 65b8236e1bb4 server/__init__.py --- a/server/__init__.py Fri Jun 14 17:13:17 2013 +0200 +++ b/server/__init__.py Thu Jun 13 17:27:43 2013 +0200 @@ -305,7 +305,6 @@ SOURCE_TYPES = {'native': LazyObject('cubicweb.server.sources.native', 'NativeSQLSource'), 'datafeed': LazyObject('cubicweb.server.sources.datafeed', 'DataFeedSource'), 'ldapfeed': LazyObject('cubicweb.server.sources.ldapfeed', 'LDAPFeedSource'), - 'ldapuser': LazyObject('cubicweb.server.sources.ldapuser', 'LDAPUserSource'), 'pyrorql': LazyObject('cubicweb.server.sources.pyrorql', 'PyroRQLSource'), 'zmqrql': LazyObject('cubicweb.server.sources.zmqrql', 'ZMQRQLSource'), } diff -r dfa4da8a53a0 -r 65b8236e1bb4 server/sources/ldapuser.py --- a/server/sources/ldapuser.py Fri Jun 14 17:13:17 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,551 +0,0 @@ -# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""cubicweb ldap user source - -this source is for now limited to a read-only CWUser source -""" -from __future__ import division, with_statement -from base64 import b64decode - -import ldap -from ldap.filter import escape_filter_chars - -from rql.nodes import Relation, VariableRef, Constant, Function - -import warnings -from cubicweb import UnknownEid, RepositoryError -from cubicweb.server import ldaputils -from cubicweb.server.utils import cartesian_product -from cubicweb.server.sources import (AbstractSource, TrFunc, GlobTrFunc, - TimedCache) - -# search scopes -BASE = ldap.SCOPE_BASE -ONELEVEL = ldap.SCOPE_ONELEVEL -SUBTREE = ldap.SCOPE_SUBTREE - -# map ldap protocol to their standard port -PROTO_PORT = {'ldap': 389, - 'ldaps': 636, - 'ldapi': None, - } - - -# module is lazily imported -warnings.warn('Imminent drop of ldapuser. Switch to ldapfeed now!', - DeprecationWarning) - - -class LDAPUserSource(ldaputils.LDAPSourceMixIn, AbstractSource): - """LDAP read-only CWUser source""" - support_entities = {'CWUser': False} - - options = ldaputils.LDAPSourceMixIn.options + ( - - ('synchronization-interval', - {'type' : 'time', - 'default': '1d', - 'help': 'interval between synchronization with the ldap \ -directory (default to once a day).', - 'group': 'ldap-source', 'level': 3, - }), - ('cache-life-time', - {'type' : 'time', - 'default': '2h', - 'help': 'life time of query cache (default to two hours).', - 'group': 'ldap-source', 'level': 3, - }), - - ) - - def update_config(self, source_entity, typedconfig): - """update configuration from source entity. `typedconfig` is config - properly typed with defaults set - """ - super(LDAPUserSource, self).update_config(source_entity, typedconfig) - self._interval = typedconfig['synchronization-interval'] - self._cache_ttl = max(71, typedconfig['cache-life-time']) - self.reset_caches() - # XXX copy from datafeed source - if source_entity is not None: - self._entity_update(source_entity) - self.config = typedconfig - # /end XXX - - def reset_caches(self): - """method called during test to reset potential source caches""" - self._cache = {} - self._query_cache = TimedCache(self._cache_ttl) - - def init(self, activated, source_entity): - """method called by the repository once ready to handle request""" - super(LDAPUserSource, self).init(activated, source_entity) - if activated: - self.info('ldap init') - # set minimum period of 5min 1s (the additional second is to - # minimize resonnance effet) - if self.user_rev_attrs['email']: - self.repo.looping_task(max(301, self._interval), self.synchronize) - self.repo.looping_task(self._cache_ttl // 10, - self._query_cache.clear_expired) - - def synchronize(self): - with self.repo.internal_session() as session: - self.pull_data(session) - - def pull_data(self, session, force=False, raise_on_error=False): - """synchronize content known by this repository with content in the - external repository - """ - self.info('synchronizing ldap source %s', self.uri) - ldap_emailattr = self.user_rev_attrs['email'] - assert ldap_emailattr - execute = session.execute - cursor = session.system_sql("SELECT eid, extid FROM entities WHERE " - "source='%s'" % self.uri) - for eid, b64extid in cursor.fetchall(): - extid = b64decode(b64extid) - self.debug('ldap eid %s', eid) - # if no result found, _search automatically delete entity information - res = self._search(session, extid, BASE) - self.debug('ldap search %s', res) - if res: - ldapemailaddr = res[0].get(ldap_emailattr) - if ldapemailaddr: - if isinstance(ldapemailaddr, list): - ldapemailaddr = ldapemailaddr[0] # XXX consider only the first email in the list - rset = execute('Any X,A WHERE ' - 'X address A, U use_email X, U eid %(u)s', - {'u': eid}) - ldapemailaddr = unicode(ldapemailaddr) - for emaileid, emailaddr, in rset: - if emailaddr == ldapemailaddr: - break - else: - self.debug('updating email address of user %s to %s', - extid, ldapemailaddr) - emailrset = execute('EmailAddress A WHERE A address %(addr)s', - {'addr': ldapemailaddr}) - if emailrset: - execute('SET U use_email X WHERE ' - 'X eid %(x)s, U eid %(u)s', - {'x': emailrset[0][0], 'u': eid}) - elif rset: - if not execute('SET X address %(addr)s WHERE ' - 'U primary_email X, U eid %(u)s', - {'addr': ldapemailaddr, 'u': eid}): - execute('SET X address %(addr)s WHERE ' - 'X eid %(x)s', - {'addr': ldapemailaddr, 'x': rset[0][0]}) - else: - # no email found, create it - _insert_email(session, ldapemailaddr, eid) - session.commit() - - def ldap_name(self, var): - if var.stinfo['relations']: - relname = iter(var.stinfo['relations']).next().r_type - return self.user_rev_attrs.get(relname) - return None - - def prepare_columns(self, mainvars, rqlst): - """return two list describing how to build the final results - from the result of an ldap search (ie a list of dictionary) - """ - columns = [] - global_transforms = [] - for i, term in enumerate(rqlst.selection): - if isinstance(term, Constant): - columns.append(term) - continue - if isinstance(term, Function): # LOWER, UPPER, COUNT... - var = term.get_nodes(VariableRef)[0] - var = var.variable - try: - mainvar = var.stinfo['attrvar'].name - except AttributeError: # no attrvar set - mainvar = var.name - assert mainvar in mainvars - trname = term.name - ldapname = self.ldap_name(var) - if trname in ('COUNT', 'MIN', 'MAX', 'SUM'): - global_transforms.append(GlobTrFunc(trname, i, ldapname)) - columns.append((mainvar, ldapname)) - continue - if trname in ('LOWER', 'UPPER'): - columns.append((mainvar, TrFunc(trname, i, ldapname))) - continue - raise NotImplementedError('no support for %s function' % trname) - if term.name in mainvars: - columns.append((term.name, 'dn')) - continue - var = term.variable - mainvar = var.stinfo['attrvar'].name - columns.append((mainvar, self.ldap_name(var))) - #else: - # # probably a bug in rql splitting if we arrive here - # raise NotImplementedError - return columns, global_transforms - - def syntax_tree_search(self, session, union, - args=None, cachekey=None, varmap=None, debug=0): - """return result from this source for a rql query (actually from a rql - syntax tree and a solution dictionary mapping each used variable to a - possible type). If cachekey is given, the query necessary to fetch the - results (but not the results themselves) may be cached using this key. - """ - self.debug('ldap syntax tree search') - # XXX not handled : transform/aggregat function, join on multiple users... - assert len(union.children) == 1, 'union not supported' - rqlst = union.children[0] - assert not rqlst.with_, 'subquery not supported' - rqlkey = rqlst.as_string(kwargs=args) - try: - results = self._query_cache[rqlkey] - except KeyError: - try: - results = self.rqlst_search(session, rqlst, args) - self._query_cache[rqlkey] = results - except ldap.SERVER_DOWN: - # cant connect to server - msg = session._("can't connect to source %s, some data may be missing") - session.set_shared_data('sources_error', msg % self.uri, txdata=True) - return [] - return results - - def rqlst_search(self, session, rqlst, args): - mainvars = [] - for varname in rqlst.defined_vars: - for sol in rqlst.solutions: - if sol[varname] == 'CWUser': - mainvars.append(varname) - break - assert mainvars, rqlst - columns, globtransforms = self.prepare_columns(mainvars, rqlst) - eidfilters = [lambda x: x > 0] - allresults = [] - generator = RQL2LDAPFilter(self, session, args, mainvars) - for mainvar in mainvars: - # handle restriction - try: - eidfilters_, ldapfilter = generator.generate(rqlst, mainvar) - except GotDN as ex: - assert ex.dn, 'no dn!' - try: - res = [self._cache[ex.dn]] - except KeyError: - res = self._search(session, ex.dn, BASE) - except UnknownEid as ex: - # raised when we are looking for the dn of an eid which is not - # coming from this source - res = [] - else: - eidfilters += eidfilters_ - res = self._search(session, self.user_base_dn, - self.user_base_scope, ldapfilter) - allresults.append(res) - # 1. get eid for each dn and filter according to that eid if necessary - for i, res in enumerate(allresults): - filteredres = [] - for resdict in res: - # get sure the entity exists in the system table - eid = self.repo.extid2eid(self, resdict['dn'], 'CWUser', session) - for eidfilter in eidfilters: - if not eidfilter(eid): - break - else: - resdict['eid'] = eid - filteredres.append(resdict) - allresults[i] = filteredres - # 2. merge result for each "mainvar": cartesian product - allresults = cartesian_product(allresults) - # 3. build final result according to column definition - result = [] - for rawline in allresults: - rawline = dict(zip(mainvars, rawline)) - line = [] - for varname, ldapname in columns: - if ldapname is None: - value = None # no mapping available - elif ldapname == 'dn': - value = rawline[varname]['eid'] - elif isinstance(ldapname, Constant): - if ldapname.type == 'Substitute': - value = args[ldapname.value] - else: - value = ldapname.value - elif isinstance(ldapname, TrFunc): - value = ldapname.apply(rawline[varname]) - else: - value = rawline[varname].get(ldapname) - line.append(value) - result.append(line) - for trfunc in globtransforms: - result = trfunc.apply(result) - #print '--> ldap result', result - return result - - def _process_ldap_item(self, dn, iterator): - itemdict = super(LDAPUserSource, self)._process_ldap_item(dn, iterator) - self._cache[dn] = itemdict - return itemdict - - def _process_no_such_object(self, session, dn): - eid = self.repo.extid2eid(self, dn, 'CWUser', session, insert=False) - if eid: - self.warning('deleting ldap user with eid %s and dn %s', eid, dn) - entity = session.entity_from_eid(eid, 'CWUser') - self.repo.delete_info(session, entity, self.uri) - self.reset_caches() - - def before_entity_insertion(self, session, lid, etype, eid, sourceparams): - """called by the repository when an eid has been attributed for an - entity stored here but the entity has not been inserted in the system - table yet. - - This method must return the an Entity instance representation of this - entity. - """ - self.debug('ldap before entity insertion') - entity = super(LDAPUserSource, self).before_entity_insertion( - session, lid, etype, eid, sourceparams) - res = self._search(session, lid, BASE)[0] - for attr in entity.e_schema.indexable_attributes(): - entity.cw_edited[attr] = res[self.user_rev_attrs[attr]] - return entity - - def after_entity_insertion(self, session, lid, entity, sourceparams): - """called by the repository after an entity stored here has been - inserted in the system table. - """ - self.debug('ldap after entity insertion') - super(LDAPUserSource, self).after_entity_insertion( - session, lid, entity, sourceparams) - for group in self.user_default_groups: - session.execute('SET X in_group G WHERE X eid %(x)s, G name %(group)s', - {'x': entity.eid, 'group': group}) - # search for existant email first - try: - # lid = dn - emailaddr = self._cache[lid][self.user_rev_attrs['email']] - except KeyError: - return - if isinstance(emailaddr, list): - emailaddr = emailaddr[0] # XXX consider only the first email in the list - rset = session.execute('EmailAddress X WHERE X address %(addr)s', - {'addr': emailaddr}) - if rset: - session.execute('SET U primary_email X WHERE U eid %(u)s, X eid %(x)s', - {'x': rset[0][0], 'u': entity.eid}) - else: - # not found, create it - _insert_email(session, emailaddr, entity.eid) - - def update_entity(self, session, entity): - """replace an entity in the source""" - raise RepositoryError('this source is read only') - - def delete_entity(self, session, entity): - """delete an entity from the source""" - raise RepositoryError('this source is read only') - - -def _insert_email(session, emailaddr, ueid): - session.execute('INSERT EmailAddress X: X address %(addr)s, U primary_email X ' - 'WHERE U eid %(x)s', {'addr': emailaddr, 'x': ueid}) - -class GotDN(Exception): - """exception used when a dn localizing the searched user has been found""" - def __init__(self, dn): - self.dn = dn - - -class RQL2LDAPFilter(object): - """generate an LDAP filter for a rql query""" - def __init__(self, source, session, args=None, mainvars=()): - self.source = source - self.repo = source.repo - self._ldap_attrs = source.user_rev_attrs - self._base_filters = source.base_filters - self._session = session - if args is None: - args = {} - self._args = args - self.mainvars = mainvars - - def generate(self, selection, mainvarname): - self._filters = res = self._base_filters[:] - self._mainvarname = mainvarname - self._eidfilters = [] - self._done_not = set() - restriction = selection.where - if isinstance(restriction, Relation): - # only a single relation, need to append result here (no AND/OR) - filter = restriction.accept(self) - if filter is not None: - res.append(filter) - elif restriction: - restriction.accept(self) - if len(res) > 1: - return self._eidfilters, '(&%s)' % ''.join(res) - return self._eidfilters, res[0] - - def visit_and(self, et): - """generate filter for a AND subtree""" - for c in et.children: - part = c.accept(self) - if part: - self._filters.append(part) - - def visit_or(self, ou): - """generate filter for a OR subtree""" - res = [] - for c in ou.children: - part = c.accept(self) - if part: - res.append(part) - if res: - if len(res) > 1: - part = '(|%s)' % ''.join(res) - else: - part = res[0] - self._filters.append(part) - - def visit_not(self, node): - """generate filter for a OR subtree""" - part = node.children[0].accept(self) - if part: - self._filters.append('(!(%s))'% part) - - def visit_relation(self, relation): - """generate filter for a relation""" - rtype = relation.r_type - # don't care of type constraint statement (i.e. relation_type = 'is') - if rtype == 'is': - return '' - lhs, rhs = relation.get_parts() - # attribute relation - if self.source.schema.rschema(rtype).final: - # dunno what to do here, don't pretend anything else - if lhs.name != self._mainvarname: - if lhs.name in self.mainvars: - # XXX check we don't have variable as rhs - return - raise NotImplementedError - rhs_vars = rhs.get_nodes(VariableRef) - if rhs_vars: - if len(rhs_vars) > 1: - raise NotImplementedError - # selected variable, nothing to do here - return - # no variables in the RHS - if isinstance(rhs.children[0], Function): - res = rhs.children[0].accept(self) - elif rtype != 'has_text': - res = self._visit_attribute_relation(relation) - else: - raise NotImplementedError(relation) - # regular relation XXX todo: in_group - else: - raise NotImplementedError(relation) - return res - - def _visit_attribute_relation(self, relation): - """generate filter for an attribute relation""" - lhs, rhs = relation.get_parts() - lhsvar = lhs.variable - if relation.r_type == 'eid': - # XXX hack - # skip comparison sign - eid = int(rhs.children[0].accept(self)) - if relation.neged(strict=True): - self._done_not.add(relation.parent) - self._eidfilters.append(lambda x: not x == eid) - return - if rhs.operator != '=': - filter = {'>': lambda x: x > eid, - '>=': lambda x: x >= eid, - '<': lambda x: x < eid, - '<=': lambda x: x <= eid, - }[rhs.operator] - self._eidfilters.append(filter) - return - dn = self.repo.eid2extid(self.source, eid, self._session) - raise GotDN(dn) - try: - filter = '(%s%s)' % (self._ldap_attrs[relation.r_type], - rhs.accept(self)) - except KeyError: - # unsupported attribute - self.source.warning('%s source can\'t handle relation %s, no ' - 'results will be returned from this source', - self.source.uri, relation) - raise UnknownEid # trick to return no result - return filter - - def visit_comparison(self, cmp): - """generate filter for a comparaison""" - return '%s%s'% (cmp.operator, cmp.children[0].accept(self)) - - def visit_mathexpression(self, mexpr): - """generate filter for a mathematic expression""" - raise NotImplementedError - - def visit_function(self, function): - """generate filter name for a function""" - if function.name == 'IN': - return self.visit_in(function) - raise NotImplementedError - - def visit_in(self, function): - grandpapa = function.parent.parent - ldapattr = self._ldap_attrs[grandpapa.r_type] - res = [] - for c in function.children: - part = c.accept(self) - if part: - res.append(part) - if res: - if len(res) > 1: - part = '(|%s)' % ''.join('(%s=%s)' % (ldapattr, v) for v in res) - else: - part = '(%s=%s)' % (ldapattr, res[0]) - return part - - def visit_constant(self, constant): - """generate filter name for a constant""" - value = constant.value - if constant.type is None: - raise NotImplementedError - if constant.type == 'Date': - raise NotImplementedError - #value = self.keyword_map[value]() - elif constant.type == 'Substitute': - value = self._args[constant.value] - else: - value = constant.value - if isinstance(value, unicode): - value = value.encode('utf8') - else: - value = str(value) - return escape_filter_chars(value) - - def visit_variableref(self, variableref): - """get the sql name for a variable reference""" - pass - diff -r dfa4da8a53a0 -r 65b8236e1bb4 server/test/unittest_ldapsource.py --- a/server/test/unittest_ldapsource.py Fri Jun 14 17:13:17 2013 +0200 +++ b/server/test/unittest_ldapsource.py Thu Jun 13 17:27:43 2013 +0200 @@ -33,7 +33,6 @@ from cubicweb.devtools.httptest import get_available_port from cubicweb.devtools import get_test_db_handler -from cubicweb.server.sources.ldapuser import GlobTrFunc, UnknownEid, RQL2LDAPFilter CONFIG_LDAPFEED = u''' user-base-dn=ou=People,dc=cubicweb,dc=test @@ -429,373 +428,8 @@ self.setUpClass() -class LDAPUserSourceTC(LDAPFeedTestBase): - test_db_id = 'ldap-user' - tags = CubicWebTC.tags | Tags(('ldap')) - - @classmethod - def pre_setup_database(cls, session, config): - session.create_entity('CWSource', name=u'ldap', type=u'ldapuser', - url=URL, config=CONFIG_LDAPUSER) - session.commit() - # XXX keep it there - session.execute('CWUser U') - - def assertMetadata(self, entity): - self.assertEqual(entity.creation_date, None) - self.assertEqual(entity.modification_date, None) - - def test_synchronize(self): - source = self.repo.sources_by_uri['ldap'] - source.synchronize() - - def test_base(self): - # check a known one - rset = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - e = rset.get_entity(0, 0) - self.assertEqual(e.login, 'syt') - e.complete() - self.assertMetadata(e) - self.assertEqual(e.firstname, None) - self.assertEqual(e.surname, None) - self.assertEqual(e.in_group[0].name, 'users') - self.assertEqual(e.owned_by[0].login, 'syt') - self.assertEqual(e.created_by, ()) - addresses = [pe.address for pe in e.use_email] - addresses.sort() - # should habe two element but ldapuser seems buggy. It's going to be dropped anyway. - self.assertEqual(['sylvain.thenault@logilab.fr',], # 'syt@logilab.fr'], - addresses) - self.assertIn(e.primary_email[0].address, - ['sylvain.thenault@logilab.fr', 'syt@logilab.fr']) - # email content should be indexed on the user - rset = self.sexecute('CWUser X WHERE X has_text "thenault"') - self.assertEqual(rset.rows, [[e.eid]]) - - def test_not(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('CWUser X WHERE NOT X eid %s' % eid) - self.assert_(rset) - self.assert_(not eid in (r[0] for r in rset)) - - def test_multiple(self): - seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] - rset = self.sexecute('CWUser X, Y WHERE X login %(syt)s, Y login %(adim)s', - {'syt': 'syt', 'adim': 'adim'}) - self.assertEqual(rset.rows, [[seid, aeid]]) - rset = self.sexecute('Any X,Y,L WHERE X login L, X login %(syt)s, Y login %(adim)s', - {'syt': 'syt', 'adim': 'adim'}) - self.assertEqual(rset.rows, [[seid, aeid, 'syt']]) - - def test_in(self): - seid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - aeid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'})[0][0] - rset = self.sexecute('Any X,L ORDERBY L WHERE X login IN("%s", "%s"), X login L' % ('syt', 'adim')) - self.assertEqual(rset.rows, [[aeid, 'adim'], [seid, 'syt']]) - - def test_relations(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('Any X,E WHERE X is CWUser, X login L, X primary_email E') - self.assert_(eid in (r[0] for r in rset)) - rset = self.sexecute('Any X,L,E WHERE X is CWUser, X login L, X primary_email E') - self.assert_('syt' in (r[1] for r in rset)) - - def test_count(self): - nbusers = self.sexecute('Any COUNT(X) WHERE X is CWUser')[0][0] - # just check this is a possible number - self.assert_(nbusers > 1, nbusers) - self.assert_(nbusers < 30, nbusers) - - def test_upper(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('Any UPPER(L) WHERE X eid %s, X login L' % eid) - self.assertEqual(rset[0][0], 'syt'.upper()) - - def test_unknown_attr(self): - eid = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'})[0][0] - rset = self.sexecute('Any L,C,M WHERE X eid %s, X login L, ' - 'X creation_date C, X modification_date M' % eid) - self.assertEqual(rset[0][0], 'syt') - self.assertEqual(rset[0][1], None) - self.assertEqual(rset[0][2], None) - - def test_sort(self): - logins = [l for l, in self.sexecute('Any L ORDERBY L WHERE X login L')] - self.assertEqual(logins, sorted(logins)) - - def test_lower_sort(self): - logins = [l for l, in self.sexecute('Any L ORDERBY lower(L) WHERE X login L')] - self.assertEqual(logins, sorted(logins)) - - def test_or(self): - rset = self.sexecute('DISTINCT Any X WHERE X login %(login)s OR (X in_group G, G name "managers")', - {'login': 'syt'}) - self.assertEqual(len(rset), 2, rset.rows) # syt + admin - - def test_nonregr_set_owned_by(self): - # test that when a user coming from ldap is triggering a transition - # the related TrInfo has correct owner information - self.sexecute('SET X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) - self.commit() - syt = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'syt'}).get_entity(0, 0) - self.assertEqual([g.name for g in syt.in_group], ['managers', 'users']) - cnx = self.login('syt', password='syt') - cu = cnx.cursor() - adim = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) - iworkflowable = adim.cw_adapt_to('IWorkflowable') - iworkflowable.fire_transition('deactivate') - try: - cnx.commit() - adim.cw_clear_all_caches() - self.assertEqual(adim.in_state[0].name, 'deactivated') - trinfo = iworkflowable.latest_trinfo() - self.assertEqual(trinfo.owned_by[0].login, 'syt') - # select from_state to skip the user's creation TrInfo - rset = self.sexecute('Any U ORDERBY D DESC WHERE WF wf_info_for X,' - 'WF creation_date D, WF from_state FS,' - 'WF owned_by U?, X eid %(x)s', - {'x': adim.eid}) - self.assertEqual(rset.rows, [[syt.eid]]) - finally: - # restore db state - self.restore_connection() - adim = self.sexecute('CWUser X WHERE X login %(login)s', {'login': 'adim'}).get_entity(0, 0) - adim.cw_adapt_to('IWorkflowable').fire_transition('activate') - self.sexecute('DELETE X in_group G WHERE X login %(syt)s, G name "managers"', {'syt': 'syt'}) - - def test_same_column_names(self): - self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"') - - def test_multiple_entities_from_different_sources(self): - req = self.request() - self.create_user(req, 'cochon') - self.assertTrue(self.sexecute('Any X,Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) - - def test_exists1(self): - self.session.set_cnxset() - self.session.create_entity('CWGroup', name=u'bougloup1') - self.session.create_entity('CWGroup', name=u'bougloup2') - self.sexecute('SET U in_group G WHERE G name ~= "bougloup%", U login "admin"') - self.sexecute('SET U in_group G WHERE G name = "bougloup1", U login %(syt)s', {'syt': 'syt'}) - rset = self.sexecute('Any L,SN ORDERBY L WHERE X in_state S, ' - 'S name SN, X login L, EXISTS(X in_group G, G name ~= "bougloup%")') - self.assertEqual(rset.rows, [['admin', 'activated'], ['syt', 'activated']]) - - def test_exists2(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - rset = self.sexecute('Any GN ORDERBY GN WHERE X in_group G, G name GN, ' - '(G name "managers" OR EXISTS(X copain T, T login in ("comme", "cochon")))') - self.assertEqual(rset.rows, [['managers'], ['users']]) - - def test_exists3(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login "comme", Y login "cochon"')) - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) - self.assertTrue(self.sexecute('Any X, Y WHERE X copain Y, X login %(syt)s, Y login "cochon"', {'syt': 'syt'})) - rset = self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, G name "managers" ' - 'OR EXISTS(X copain T, T login in ("comme", "cochon"))') - self.assertEqual(sorted(rset.rows), [['managers', 'admin'], ['users', 'comme'], ['users', 'syt']]) - - def test_exists4(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon', groups=('users', 'guests')) - self.create_user(req, 'billy') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "billy"', {'syt': 'syt'}) - # search for group name, login where - # CWUser copain with "comme" or "cochon" AND same login as the copain - # OR - # CWUser in_state activated AND not copain with billy - # - # SO we expect everybody but "comme" and "syt" - rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' - 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' - 'EXISTS(X in_state S, S name "activated", NOT X copain T2, T2 login "billy")') - all = self.sexecute('Any GN, L WHERE X in_group G, X login L, G name GN') - all.rows.remove(['users', 'comme']) - all.rows.remove(['users', 'syt']) - self.assertEqual(sorted(rset.rows), sorted(all.rows)) - - def test_exists5(self): - req = self.request() - self.create_user(req, 'comme') - self.create_user(req, 'cochon', groups=('users', 'guests')) - self.create_user(req, 'billy') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "cochon", Y login "cochon"') - self.sexecute('SET X copain Y WHERE X login "comme", Y login "billy"') - self.sexecute('SET X copain Y WHERE X login %(syt)s, Y login "cochon"', {'syt': 'syt'}) - rset= self.sexecute('Any L WHERE X login L, ' - 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' - 'NOT EXISTS(X copain T2, T2 login "billy")') - self.assertEqual(sorted(rset.rows), [['cochon'], ['syt']]) - rset= self.sexecute('Any GN,L WHERE X in_group G, X login L, G name GN, ' - 'EXISTS(X copain T, T login in ("comme", "cochon")) AND ' - 'NOT EXISTS(X copain T2, T2 login "billy")') - self.assertEqual(sorted(rset.rows), [['guests', 'cochon'], - ['users', 'cochon'], - ['users', 'syt']]) - - def test_cd_restriction(self): - rset = self.sexecute('CWUser X WHERE X creation_date > "2009-02-01"') - # admin/anon but no ldap user since it doesn't support creation_date - self.assertEqual(sorted(e.login for e in rset.entities()), - ['admin', 'anon']) - - def test_union(self): - afeids = self.sexecute('State X') - ueids = self.sexecute('CWUser X') - rset = self.sexecute('(Any X WHERE X is State) UNION (Any X WHERE X is CWUser)') - self.assertEqual(sorted(r[0] for r in rset.rows), - sorted(r[0] for r in afeids + ueids)) - - def _init_security_test(self): - req = self.request() - self.create_user(req, 'iaminguestsgrouponly', groups=('guests',)) - cnx = self.login('iaminguestsgrouponly') - return cnx.cursor() - - def test_security1(self): - cu = self._init_security_test() - rset = cu.execute('CWUser X WHERE X login %(login)s', {'login': 'syt'}) - self.assertEqual(rset.rows, []) - rset = cu.execute('Any X WHERE X login "iaminguestsgrouponly"') - self.assertEqual(len(rset.rows), 1) - - def test_security2(self): - cu = self._init_security_test() - rset = cu.execute('Any X WHERE X has_text %(syt)s', {'syt': 'syt'}) - self.assertEqual(rset.rows, []) - rset = cu.execute('Any X WHERE X has_text "iaminguestsgrouponly"') - self.assertEqual(len(rset.rows), 1) - - def test_security3(self): - cu = self._init_security_test() - rset = cu.execute('Any F WHERE X has_text %(syt)s, X firstname F', {'syt': 'syt'}) - self.assertEqual(rset.rows, []) - rset = cu.execute('Any F WHERE X has_text "iaminguestsgrouponly", X firstname F') - self.assertEqual(rset.rows, [[None]]) - - def test_nonregr1(self): - self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, E owned_by X, ' - 'X modification_date AA', - {'x': self.session.user.eid}) - - def test_nonregr2(self): - self.sexecute('Any X,L,AA WHERE E eid %(x)s, E owned_by X, ' - 'X login L, X modification_date AA', - {'x': self.session.user.eid}) - - def test_nonregr3(self): - self.sexecute('Any X,AA ORDERBY AA DESC WHERE E eid %(x)s, ' - 'X modification_date AA', - {'x': self.session.user.eid}) - - def test_nonregr4(self): - emaileid = self.sexecute('INSERT EmailAddress X: X address "toto@logilab.org"')[0][0] - self.sexecute('Any X,AA WHERE X use_email Y, Y eid %(x)s, X modification_date AA', - {'x': emaileid}) - - def test_nonregr5(self): - # original jpl query: - # Any X, NOW - CD, P WHERE P is Project, U interested_in P, U is CWUser, - # U login "sthenault", X concerns P, X creation_date CD ORDERBY CD DESC LIMIT 5 - rql = ('Any X, NOW - CD, P ORDERBY CD DESC LIMIT 5 WHERE P bookmarked_by U, ' - 'U login "%s", P is X, X creation_date CD') % self.session.user.login - self.sexecute(rql, )#{'x': }) - - def test_nonregr6(self): - self.sexecute('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' - 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (EXISTS(U identity ME) ' - 'OR (EXISTS(U in_group G, G name IN("managers", "staff")))) ' - 'OR (EXISTS(U in_group H, ME in_group H, NOT H name "users")), U login UL, U is CWUser)', - {'x': self.session.user.eid}) - -class GlobTrFuncTC(TestCase): - - def test_count(self): - trfunc = GlobTrFunc('count', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[4]]) - trfunc = GlobTrFunc('count', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 2], [2, 1], [3, 1]]) - - def test_sum(self): - trfunc = GlobTrFunc('sum', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[10]]) - trfunc = GlobTrFunc('sum', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 7], [2, 4], [3, 6]]) - - def test_min(self): - trfunc = GlobTrFunc('min', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[1]]) - trfunc = GlobTrFunc('min', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 2], [2, 4], [3, 6]]) - - def test_max(self): - trfunc = GlobTrFunc('max', 0) - res = trfunc.apply([[1], [2], [3], [4]]) - self.assertEqual(res, [[4]]) - trfunc = GlobTrFunc('max', 1) - res = trfunc.apply([[1, 2], [2, 4], [3, 6], [1, 5]]) - self.assertEqual(res, [[1, 5], [2, 4], [3, 6]]) -class RQL2LDAPFilterTC(RQLGeneratorTC): - - tags = RQLGeneratorTC.tags | Tags(('ldap')) - - @property - def schema(self): - """return the application schema""" - return self._schema - - def setUp(self): - self.handler = get_test_db_handler(LDAPUserSourceTC.config) - self.handler.build_db_cache('ldap-rqlgenerator', LDAPUserSourceTC.pre_setup_database) - self.handler.restore_database('ldap-rqlgenerator') - self._repo = repo = self.handler.get_repo() - self._schema = repo.schema - super(RQL2LDAPFilterTC, self).setUp() - ldapsource = repo.sources[-1] - self.cnxset = repo._get_cnxset() - session = mock_object(cnxset=self.cnxset) - self.o = RQL2LDAPFilter(ldapsource, session) - self.ldapclasses = ''.join(ldapsource.base_filters) - - def tearDown(self): - self._repo.turn_repo_off() - super(RQL2LDAPFilterTC, self).tearDown() - - def test_base(self): - rqlst = self._prepare('CWUser X WHERE X login "toto"').children[0] - self.assertEqual(self.o.generate(rqlst, 'X')[1], - '(&%s(uid=toto))' % self.ldapclasses) - - def test_kwargs(self): - rqlst = self._prepare('CWUser X WHERE X login %(x)s').children[0] - self.o._args = {'x': "toto"} - self.assertEqual(self.o.generate(rqlst, 'X')[1], - '(&%s(uid=toto))' % self.ldapclasses) - - def test_get_attr(self): - rqlst = self._prepare('Any X WHERE E firstname X, E eid 12').children[0] - self.assertRaises(UnknownEid, self.o.generate, rqlst, 'E') if __name__ == '__main__': diff -r dfa4da8a53a0 -r 65b8236e1bb4 web/views/cwuser.py --- a/web/views/cwuser.py Fri Jun 14 17:13:17 2013 +0200 +++ b/web/views/cwuser.py Thu Jun 13 17:27:43 2013 +0200 @@ -189,8 +189,8 @@ __select__ = StartupView.__select__ & match_user_groups('managers') cache_max_age = 0 # disable caching # XXX one could wish to display for instance only user's firstname/surname - # for non managers but filtering out NULL cause crash with an ldapuser - # source. + # for non managers but filtering out NULL caused crash with an ldapuser + # source. The ldapuser source has been dropped and this code can be updated. rql = ('Any U,US,F,S,U,UAA,UDS, L,UAA,USN,UDSN ORDERBY L WHERE U is CWUser, ' 'U login L, U firstname F, U surname S, ' 'U in_state US, US name USN, ' diff -r dfa4da8a53a0 -r 65b8236e1bb4 web/views/sessions.py --- a/web/views/sessions.py Fri Jun 14 17:13:17 2013 +0200 +++ b/web/views/sessions.py Thu Jun 13 17:27:43 2013 +0200 @@ -100,8 +100,6 @@ def _update_last_login_time(self, req): # XXX should properly detect missing permission / non writeable source # and avoid "except (RepositoryError, Unauthorized)" below - if req.user.cw_metainformation()['source']['type'] == 'ldapuser': - return try: req.execute('SET X last_login_time NOW WHERE X eid %(x)s', {'x' : req.user.eid})