--- a/sobjects/ldapparser.py Mon Nov 23 14:34:13 2015 +0100
+++ b/sobjects/ldapparser.py Tue Nov 24 14:36:41 2015 +0100
@@ -28,6 +28,20 @@
from cubicweb import Binary, ConfigurationError
from cubicweb.server.utils import crypt_password
from cubicweb.server.sources import datafeed
+from cubicweb.dataimport import stores, importer
+
+
+class UserMetaGenerator(stores.MetaGenerator):
+ """Specific metadata generator, used to see newly created user into their initial state.
+ """
+ @cached
+ def base_etype_dicts(self, entity):
+ entity, rels = super(UserMetaGenerator, self).base_etype_dicts(entity)
+ if entity.cw_etype == 'CWUser':
+ wf_state = self._cnx.execute('Any S WHERE ET default_workflow WF, ET name %(etype)s, '
+ 'WF initial_state S', {'etype': entity.cw_etype}).one()
+ rels['in_state'] = wf_state.eid
+ return entity, rels
class DataFeedLDAPAdapter(datafeed.DataFeedParser):
@@ -72,28 +86,109 @@
attrs))
return {}
- def _process(self, etype, sdict, raise_on_error=False):
- self.debug('fetched %s %s', etype, sdict)
- extid = sdict['dn']
- entity = self.extid2entity(extid, etype,
- raise_on_error=raise_on_error, **sdict)
- if entity is not None and not self.created_during_pull(entity):
- self.notify_updated(entity)
- attrs = self.ldap2cwattrs(sdict, etype)
- self.update_if_necessary(entity, attrs)
- if etype == 'CWUser':
- self._process_email(entity, sdict)
- if etype == 'CWGroup':
- self._process_membership(entity, sdict)
-
def process(self, url, raise_on_error=False):
"""IDataFeedParser main entry point"""
self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr)
- for userdict in self.user_source_entities_by_extid.values():
- self._process('CWUser', userdict)
+ self._group_members = {}
+ eeimporter = self.build_importer(raise_on_error)
+ for name in self.source.user_default_groups:
+ geid = self._get_group(name)
+ eeimporter.extid2eid[geid] = geid
+ entities = self.extentities_generator()
+ set_cwuri = importer.use_extid_as_cwuri(eeimporter.extid2eid)
+ eeimporter.import_entities(set_cwuri(entities))
+ self.stats['created'] = eeimporter.created
+ self.stats['updated'] = eeimporter.updated
+ # handle in_group relation
+ for group, members in self._group_members.items():
+ self._cw.execute('DELETE U in_group G WHERE G name %(g)s', {'g': group})
+ if members:
+ members = ["'%s'" % e for e in members]
+ rql = 'SET U in_group G WHERE G name %%(g)s, U login IN (%s)' % ','.join(members)
+ self._cw.execute(rql, {'g': group})
+ # ensure updated users are activated
+ for eid in eeimporter.updated:
+ entity = self._cw.entity_from_eid(eid)
+ if entity.cw_etype == 'CWUser':
+ self.ensure_activated(entity)
+ # manually set primary email if necessary, it's not handled automatically since hooks are
+ # deactivated
+ self._cw.execute('SET X primary_email E WHERE NOT X primary_email E, X use_email E, '
+ 'X cw_source S, S eid %(s)s, X in_state ST, TS name "activated"',
+ {'s': self.source.eid})
+
+ def build_importer(self, raise_on_error):
+ """Instantiate and configure an importer"""
+ etypes = ('CWUser', 'EmailAddress', 'CWGroup')
+ extid2eid = importer.cwuri2eid(self._cw, etypes, source_eid=self.source.eid)
+ existing_relations = {}
+ for rtype in ('in_group', 'use_email', 'owned_by'):
+ rql = 'Any S,O WHERE S {} O, S cw_source SO, SO eid %(s)s'.format(rtype)
+ rset = self._cw.execute(rql, {'s': self.source.eid})
+ existing_relations[rtype] = set(tuple(x) for x in rset)
+ return importer.ExtEntitiesImporter(self._cw.vreg.schema, self.build_store(),
+ extid2eid=extid2eid,
+ existing_relations=existing_relations,
+ etypes_order_hint=etypes,
+ import_log=self.import_log,
+ raise_on_error=raise_on_error)
+
+ def build_store(self):
+ """Instantiate and configure a store"""
+ metagenerator = UserMetaGenerator(self._cw, source=self.source)
+ return stores.NoHookRQLObjectStore(self._cw, metagenerator)
+
+ def extentities_generator(self):
self.debug('processing ldapfeed source %s %s', self.source, self.searchgroupfilterstr)
+ # generate users and email addresses
+ for userdict in self.user_source_entities_by_extid.values():
+ attrs = self.ldap2cwattrs(userdict, 'CWUser')
+ pwd = attrs.get('upassword')
+ if not pwd:
+ # generate a dumb password if not fetched from ldap (see
+ # userPassword)
+ pwd = crypt_password(generate_password())
+ attrs['upassword'] = set([Binary(pwd)])
+ extuser = importer.ExtEntity('CWUser', userdict['dn'], attrs)
+ extuser.values['owned_by'] = set([extuser.extid])
+ for extemail in self._process_email(extuser, userdict):
+ yield extemail
+ groups = list(filter(None, [self._get_group(name)
+ for name in self.source.user_default_groups]))
+ if groups:
+ extuser.values['in_group'] = groups
+ yield extuser
+ # generate groups
for groupdict in self.group_source_entities_by_extid.values():
- self._process('CWGroup', groupdict, raise_on_error=raise_on_error)
+ attrs = self.ldap2cwattrs(groupdict, 'CWGroup')
+ extgroup = importer.ExtEntity('CWGroup', groupdict['dn'], attrs)
+ yield extgroup
+ # record group membership for later insertion
+ members = groupdict.get(self.source.group_rev_attrs['member'], ())
+ self._group_members[attrs['name']] = members
+
+ def _process_email(self, extuser, userdict):
+ try:
+ emailaddrs = userdict.pop(self.source.user_rev_attrs['email'])
+ except KeyError:
+ return # no email for that user, nothing to do
+ if not isinstance(emailaddrs, list):
+ emailaddrs = [emailaddrs]
+ for emailaddr in emailaddrs:
+ # search for existing email first, may be coming from another source
+ rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
+ {'addr': emailaddr})
+ if not rset:
+ # not found, create it. first forge an external id
+ emailextid = userdict['dn'] + '@@' + emailaddr
+ extuser.values.setdefault('use_email', []).append(emailextid)
+ yield importer.ExtEntity('EmailAddress', emailextid, dict(address=[emailaddr]))
+ elif self.sourceuris:
+ # pop from sourceuris anyway, else email may be removed by the
+ # source once import is finished
+ emailextid = userdict['dn'] + '@@' + emailaddr
+ self.sourceuris.pop(emailextid, None)
+ # XXX else check use_email relation?
def handle_deletion(self, config, cnx, myuris):
if config['delete-entities']:
@@ -108,22 +203,12 @@
wf.fire_transition_if_possible('deactivate')
cnx.commit()
- def update_if_necessary(self, entity, attrs):
- # disable read security to allow password selection
- with entity._cw.security_enabled(read=False):
- entity.complete(tuple(attrs))
+ def ensure_activated(self, entity):
if entity.cw_etype == 'CWUser':
wf = entity.cw_adapt_to('IWorkflowable')
if wf.state == 'deactivated':
wf.fire_transition('activate')
self.info('user %s reactivated', entity.login)
- mdate = attrs.get('modification_date')
- if not mdate or mdate > entity.modification_date:
- attrs = dict( (k, v) for k, v in attrs.items()
- if v != getattr(entity, k))
- if attrs:
- entity.cw_set(**attrs)
- self.notify_updated(entity)
def ldap2cwattrs(self, sdict, etype):
"""Transform dictionary of LDAP attributes to CW.
@@ -145,40 +230,11 @@
'source attribute %s has not been found in the source, '
'please check the %s-attrs-map field and the permissions of '
'the LDAP binding user' % (sattr, etype[2:].lower()))
+ if not isinstance(value, list):
+ value = [value]
tdict[tattr] = value
return tdict
- def before_entity_copy(self, entity, sourceparams):
- etype = entity.cw_etype
- if etype == 'EmailAddress':
- entity.cw_edited['address'] = sourceparams['address']
- else:
- self.ldap2cwattrs(sourceparams, etype, tdict=entity.cw_edited)
- if etype == 'CWUser':
- pwd = entity.cw_edited.get('upassword')
- if not pwd:
- # generate a dumb password if not fetched from ldap (see
- # userPassword)
- pwd = crypt_password(generate_password())
- entity.cw_edited['upassword'] = Binary(pwd)
- return entity
-
- def after_entity_copy(self, entity, sourceparams):
- super(DataFeedLDAPAdapter, self).after_entity_copy(entity, sourceparams)
- etype = entity.cw_etype
- if etype == 'EmailAddress':
- return
- # all CWUsers must be treated before CWGroups to have the in_group relation
- # set correctly in _associate_ldapusers
- elif etype == 'CWUser':
- groups = list(filter(None, [self._get_group(name)
- for name in self.source.user_default_groups]))
- if groups:
- entity.cw_set(in_group=groups)
- self._process_email(entity, sourceparams)
- elif etype == 'CWGroup':
- self._process_membership(entity, sourceparams)
-
def is_deleted(self, extidplus, etype, eid):
try:
extid = extidplus.rsplit(b'@@', 1)[0]
@@ -188,48 +244,11 @@
extid = extidplus
return extid not in self.user_source_entities_by_extid
- def _process_email(self, entity, userdict):
- try:
- emailaddrs = userdict[self.source.user_rev_attrs['email']]
- except KeyError:
- return # no email for that user, nothing to do
- if not isinstance(emailaddrs, list):
- emailaddrs = [emailaddrs]
- for emailaddr in emailaddrs:
- # search for existing email first, may be coming from another source
- rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
- {'addr': emailaddr})
- if not rset:
- # not found, create it. first forge an external id
- emailextid = userdict['dn'] + '@@' + emailaddr
- email = self.extid2entity(emailextid, 'EmailAddress',
- address=emailaddr)
- entity.cw_set(use_email=email)
- elif self.sourceuris:
- # pop from sourceuris anyway, else email may be removed by the
- # source once import is finished
- uri = userdict['dn'] + '@@' + emailaddr
- self.sourceuris.pop(uri, None)
- # XXX else check use_email relation?
-
- def _process_membership(self, entity, sourceparams):
- """ Find existing CWUsers with the same login as the memberUids in the
- CWGroup entity and create the in_group relationship """
- mdate = sourceparams.get('modification_date')
- if (not mdate or mdate > entity.modification_date):
- self._cw.execute('DELETE U in_group G WHERE G eid %(g)s',
- {'g':entity.eid})
- members = sourceparams.get(self.source.group_rev_attrs['member'])
- if members:
- members = ["'%s'" % e for e in members]
- rql = 'SET U in_group G WHERE G eid %%(g)s, U login IN (%s)' % ','.join(members)
- self._cw.execute(rql, {'g':entity.eid, })
-
@cached
def _get_group(self, name):
try:
return self._cw.execute('Any X WHERE X is CWGroup, X name %(name)s',
- {'name': name}).get_entity(0, 0)
+ {'name': name})[0][0]
except IndexError:
self.error('group %r referenced by source configuration %r does not exist',
name, self.source.uri)