[ldaputils,ldapparser] correctly compute "deleted" users (closes #2542083)
* in ldaputils.py, removal of an unneeded method (checking existence
of an object in the ldap, but not using the configuration filters)
* in the ldapparser, memoize the (filtered) search so as to serve both
.process and .is_deleted
* add a non regression test for the 'user-filter' source config entry
+ deletion
--- a/server/ldaputils.py Tue Dec 18 11:40:51 2012 +0100
+++ b/server/ldaputils.py Tue Jan 08 15:53:22 2013 +0100
@@ -222,19 +222,6 @@
raise AuthenticationError()
return eid
- def object_exists_in_ldap(self, dn):
- cnx = self.get_connection().cnx #session.cnxset.connection(self.uri).cnx
- if cnx is None:
- self.warning('Could not establish connexion with LDAP server, assuming dn %s exists', dn)
- return True # ldap unreachable, let's not touch it
- try:
- cnx.search_s(dn, self.user_base_scope)
- except ldap.PARTIAL_RESULTS:
- self.warning('PARTIAL RESULTS for dn %s', dn)
- except ldap.NO_SUCH_OBJECT:
- return False
- return True
-
def _connect(self, user=None, userpwd=None):
protocol, hostport = self.connection_info()
self.info('connecting %s://%s as %s', protocol, hostport,
--- a/server/test/unittest_ldapuser.py Tue Dec 18 11:40:51 2012 +0100
+++ b/server/test/unittest_ldapuser.py Tue Jan 08 15:53:22 2013 +0100
@@ -113,6 +113,23 @@
stats = lfsource.pull_data(isession, force=True, raise_on_error=True)
isession.commit()
+ def test_filter_inactivate(self):
+ """ filtered out people should be deactivated, unable to authenticate """
+ source = self.session.execute('CWSource S WHERE S type="ldapfeed"').get_entity(0,0)
+ config = source.repo_source.check_config(source)
+ # filter with adim's phone number
+ config['user-filter'] = u'(%s=%s)' % ('telephoneNumber', '109')
+ source.repo_source.update_config(source, config)
+ self.commit()
+ self._pull()
+ self.assertRaises(AuthenticationError, self.repo.connect, 'syt', password='syt')
+ self.assertEqual(self.execute('Any N WHERE U login "syt", '
+ 'U in_state S, S name N').rows[0][0],
+ 'deactivated')
+ self.assertEqual(self.execute('Any N WHERE U login "adim", '
+ 'U in_state S, S name N').rows[0][0],
+ 'activated')
+
def test_delete(self):
""" delete syt, pull, check deactivation, repull,
readd syt, pull, check activation
--- a/sobjects/ldapparser.py Tue Dec 18 11:40:51 2012 +0100
+++ b/sobjects/ldapparser.py Tue Jan 08 15:53:22 2013 +0100
@@ -22,7 +22,7 @@
"""
from __future__ import with_statement
-from logilab.common.decorators import cached
+from logilab.common.decorators import cached, cachedproperty
from logilab.common.shellutils import generate_password
from cubicweb import Binary, ConfigurationError
@@ -36,15 +36,27 @@
# attributes of the cw user
non_attribute_keys = set(('email',))
+ @cachedproperty
+ def searchfilterstr(self):
+ """ ldap search string, including user-filter """
+ return '(&%s)' % ''.join(self.source.base_filters)
+
+ @cachedproperty
+ def source_entities_by_extid(self):
+ source = self.source
+ return dict((userdict['dn'], userdict)
+ for userdict in source._search(self._cw,
+ source.user_base_dn,
+ source.user_base_scope,
+ self.searchfilterstr))
+
def process(self, url, raise_on_error=False):
"""IDataFeedParser main entry point"""
- source = self.source
- searchstr = '(&%s)' % ''.join(source.base_filters)
- self.debug('processing ldapfeed stuff %s %s', source, searchstr)
- for userdict in source._search(self._cw, source.user_base_dn,
- source.user_base_scope, searchstr):
+ self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr)
+ for userdict in self.source_entities_by_extid.itervalues():
self.warning('fetched user %s', userdict)
- entity = self.extid2entity(userdict['dn'], 'CWUser', **userdict)
+ extid = userdict['dn']
+ entity = self.extid2entity(extid, 'CWUser', **userdict)
if entity is not None and not self.created_during_pull(entity):
self.notify_updated(entity)
attrs = self.ldap2cwattrs(userdict)
@@ -121,12 +133,14 @@
entity.set_relations(in_group=groups)
self._process_email(entity, sourceparams)
- def is_deleted(self, extid, etype, eid):
+ def is_deleted(self, extidplus, etype, eid):
try:
- extid, _ = extid.rsplit('@@', 1)
+ extid, _ = extidplus.rsplit('@@', 1)
except ValueError:
- pass
- return not self.source.object_exists_in_ldap(extid)
+ # for some reason extids here tend to come in both forms, e.g:
+ # dn, dn@@Babar
+ extid = extidplus
+ return extid not in self.source_entities_by_extid
def _process_email(self, entity, userdict):
try: