--- a/server/sources/ldapuser.py Mon Feb 07 18:19:36 2011 +0100
+++ b/server/sources/ldapuser.py Mon Feb 07 18:19:39 2011 +0100
@@ -34,15 +34,13 @@
from __future__ import division
from base64 import b64decode
-from logilab.common.textutils import splitstrip
-from rql.nodes import Relation, VariableRef, Constant, Function
-
import ldap
from ldap.ldapobject import ReconnectLDAPObject
from ldap.filter import filter_format, escape_filter_chars
from ldapurl import LDAPUrl
-from logilab.common.configuration import time_validator
+from rql.nodes import Relation, VariableRef, Constant, Function
+
from cubicweb import AuthenticationError, UnknownEid, RepositoryError
from cubicweb.server.utils import cartesian_product
from cubicweb.server.sources import (AbstractSource, TrFunc, GlobTrFunc,
@@ -168,44 +166,36 @@
)
- def __init__(self, repo, source_config, *args, **kwargs):
- AbstractSource.__init__(self, repo, source_config, *args, **kwargs)
- self.host = source_config['host']
- self.protocol = source_config.get('protocol', 'ldap')
- self.authmode = source_config.get('auth-mode', 'simple')
+ def __init__(self, repo, source_config, eid=None):
+ AbstractSource.__init__(self, repo, source_config, eid)
+ self.update_config(None, self.check_conf_dict(eid, source_config))
+ self._conn = None
+
+ def update_config(self, source_entity, typedconfig):
+ """update configuration from source entity. `typedconfig` is config
+ properly typed with defaults set
+ """
+ self.host = typedconfig['host']
+ self.protocol = typedconfig['protocol']
+ self.authmode = typedconfig['auth-mode']
self._authenticate = getattr(self, '_auth_%s' % self.authmode)
- self.cnx_dn = source_config.get('data-cnx-dn') or ''
- self.cnx_pwd = source_config.get('data-cnx-password') or ''
- self.user_base_scope = globals()[source_config['user-scope']]
- self.user_base_dn = str(source_config['user-base-dn'])
- self.user_base_scope = globals()[source_config['user-scope']]
- self.user_classes = splitstrip(source_config['user-classes'])
- self.user_login_attr = source_config['user-login-attr']
- self.user_default_groups = splitstrip(source_config['user-default-group'])
- self.user_attrs = dict(v.split(':', 1) for v in splitstrip(source_config['user-attrs-map']))
- self.user_filter = source_config.get('user-filter')
+ self.cnx_dn = typedconfig['data-cnx-dn']
+ self.cnx_pwd = typedconfig['data-cnx-password']
+ self.user_base_dn = str(typedconfig['user-base-dn'])
+ self.user_base_scope = globals()[typedconfig['user-scope']]
+ self.user_login_attr = typedconfig['user-login-attr']
+ self.user_default_groups = typedconfig['user-default-group']
+ self.user_attrs = typedconfig['user-attrs-map']
self.user_rev_attrs = {'eid': 'dn'}
for ldapattr, cwattr in self.user_attrs.items():
self.user_rev_attrs[cwattr] = ldapattr
- self.base_filters = self._make_base_filters()
- self._conn = None
- self._cache = {}
- # ttlm is in minutes!
- self._cache_ttl = time_validator(None, None,
- source_config.get('cache-life-time', 2*60*60))
- self._cache_ttl = max(71, self._cache_ttl)
- self._query_cache = TimedCache(self._cache_ttl)
- # interval is in seconds !
- self._interval = time_validator(None, None,
- source_config.get('synchronization-interval',
- 24*60*60))
-
- def _make_base_filters(self):
- filters = [filter_format('(%s=%s)', ('objectClass', o))
- for o in self.user_classes]
- if self.user_filter:
- filters += [self.user_filter]
- return filters
+ self.base_filters = [filter_format('(%s=%s)', ('objectClass', o))
+ for o in typedconfig['user-classes']]
+ if typedconfig['user-filter']:
+ self.base_filters.append(typedconfig['user-filter'])
+ self._interval = typedconfig['synchronization-interval']
+ self._cache_ttl = max(71, typedconfig['cache-life-time'])
+ self.reset_caches()
def reset_caches(self):
"""method called during test to reset potential source caches"""
@@ -300,7 +290,7 @@
# we really really don't want that
raise AuthenticationError()
searchfilter = [filter_format('(%s=%s)', (self.user_login_attr, login))]
- searchfilter.extend(self._make_base_filters())
+ searchfilter.extend(self.base_filters)
searchstr = '(&%s)' % ''.join(searchfilter)
# first search the user
try: