# HG changeset patch # User Sylvain Thénault # Date 1448372201 -3600 # Node ID addc7ba36f69c6124ece2a47a6deb4a433505591 # Parent cff2dbc33efff884e8da7c93d57304776effd85c [ldapfeed / dataimport] port ldapfeed parser to dataimport API This makes the code easier to understand and will allow to deprecate the old multi-sources api, based on creation callback through repo.extid2eid. Currently with this patch, modification dates are not checked, hence entities will be systematically updated. We run the import with no hooks, because RQLObjectStore can only add entities to the system source. diff -r cff2dbc33eff -r addc7ba36f69 server/test/unittest_ldapsource.py --- a/server/test/unittest_ldapsource.py Mon Nov 23 14:34:13 2015 +0100 +++ b/server/test/unittest_ldapsource.py Tue Nov 24 14:36:41 2015 +0100 @@ -443,7 +443,6 @@ try: self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', {('add', 'memberUid'): ['syt']}) - time.sleep(1.1) # timestamps precision is 1s with self.repo.internal_cnx() as cnx: self.pull(cnx) @@ -471,7 +470,6 @@ try: self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test', {('delete', 'memberUid'): ['adim']}) - time.sleep(1.1) # timestamps precision is 1s with self.repo.internal_cnx() as cnx: self.pull(cnx) diff -r cff2dbc33eff -r addc7ba36f69 sobjects/ldapparser.py --- a/sobjects/ldapparser.py Mon Nov 23 14:34:13 2015 +0100 +++ b/sobjects/ldapparser.py Tue Nov 24 14:36:41 2015 +0100 @@ -28,6 +28,20 @@ from cubicweb import Binary, ConfigurationError from cubicweb.server.utils import crypt_password from cubicweb.server.sources import datafeed +from cubicweb.dataimport import stores, importer + + +class UserMetaGenerator(stores.MetaGenerator): + """Specific metadata generator, used to see newly created user into their initial state. + """ + @cached + def base_etype_dicts(self, entity): + entity, rels = super(UserMetaGenerator, self).base_etype_dicts(entity) + if entity.cw_etype == 'CWUser': + wf_state = self._cnx.execute('Any S WHERE ET default_workflow WF, ET name %(etype)s, ' + 'WF initial_state S', {'etype': entity.cw_etype}).one() + rels['in_state'] = wf_state.eid + return entity, rels class DataFeedLDAPAdapter(datafeed.DataFeedParser): @@ -72,28 +86,109 @@ attrs)) return {} - def _process(self, etype, sdict, raise_on_error=False): - self.debug('fetched %s %s', etype, sdict) - extid = sdict['dn'] - entity = self.extid2entity(extid, etype, - raise_on_error=raise_on_error, **sdict) - if entity is not None and not self.created_during_pull(entity): - self.notify_updated(entity) - attrs = self.ldap2cwattrs(sdict, etype) - self.update_if_necessary(entity, attrs) - if etype == 'CWUser': - self._process_email(entity, sdict) - if etype == 'CWGroup': - self._process_membership(entity, sdict) - def process(self, url, raise_on_error=False): """IDataFeedParser main entry point""" self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr) - for userdict in self.user_source_entities_by_extid.values(): - self._process('CWUser', userdict) + self._group_members = {} + eeimporter = self.build_importer(raise_on_error) + for name in self.source.user_default_groups: + geid = self._get_group(name) + eeimporter.extid2eid[geid] = geid + entities = self.extentities_generator() + set_cwuri = importer.use_extid_as_cwuri(eeimporter.extid2eid) + eeimporter.import_entities(set_cwuri(entities)) + self.stats['created'] = eeimporter.created + self.stats['updated'] = eeimporter.updated + # handle in_group relation + for group, members in self._group_members.items(): + self._cw.execute('DELETE U in_group G WHERE G name %(g)s', {'g': group}) + if members: + members = ["'%s'" % e for e in members] + rql = 'SET U in_group G WHERE G name %%(g)s, U login IN (%s)' % ','.join(members) + self._cw.execute(rql, {'g': group}) + # ensure updated users are activated + for eid in eeimporter.updated: + entity = self._cw.entity_from_eid(eid) + if entity.cw_etype == 'CWUser': + self.ensure_activated(entity) + # manually set primary email if necessary, it's not handled automatically since hooks are + # deactivated + self._cw.execute('SET X primary_email E WHERE NOT X primary_email E, X use_email E, ' + 'X cw_source S, S eid %(s)s, X in_state ST, TS name "activated"', + {'s': self.source.eid}) + + def build_importer(self, raise_on_error): + """Instantiate and configure an importer""" + etypes = ('CWUser', 'EmailAddress', 'CWGroup') + extid2eid = importer.cwuri2eid(self._cw, etypes, source_eid=self.source.eid) + existing_relations = {} + for rtype in ('in_group', 'use_email', 'owned_by'): + rql = 'Any S,O WHERE S {} O, S cw_source SO, SO eid %(s)s'.format(rtype) + rset = self._cw.execute(rql, {'s': self.source.eid}) + existing_relations[rtype] = set(tuple(x) for x in rset) + return importer.ExtEntitiesImporter(self._cw.vreg.schema, self.build_store(), + extid2eid=extid2eid, + existing_relations=existing_relations, + etypes_order_hint=etypes, + import_log=self.import_log, + raise_on_error=raise_on_error) + + def build_store(self): + """Instantiate and configure a store""" + metagenerator = UserMetaGenerator(self._cw, source=self.source) + return stores.NoHookRQLObjectStore(self._cw, metagenerator) + + def extentities_generator(self): self.debug('processing ldapfeed source %s %s', self.source, self.searchgroupfilterstr) + # generate users and email addresses + for userdict in self.user_source_entities_by_extid.values(): + attrs = self.ldap2cwattrs(userdict, 'CWUser') + pwd = attrs.get('upassword') + if not pwd: + # generate a dumb password if not fetched from ldap (see + # userPassword) + pwd = crypt_password(generate_password()) + attrs['upassword'] = set([Binary(pwd)]) + extuser = importer.ExtEntity('CWUser', userdict['dn'], attrs) + extuser.values['owned_by'] = set([extuser.extid]) + for extemail in self._process_email(extuser, userdict): + yield extemail + groups = list(filter(None, [self._get_group(name) + for name in self.source.user_default_groups])) + if groups: + extuser.values['in_group'] = groups + yield extuser + # generate groups for groupdict in self.group_source_entities_by_extid.values(): - self._process('CWGroup', groupdict, raise_on_error=raise_on_error) + attrs = self.ldap2cwattrs(groupdict, 'CWGroup') + extgroup = importer.ExtEntity('CWGroup', groupdict['dn'], attrs) + yield extgroup + # record group membership for later insertion + members = groupdict.get(self.source.group_rev_attrs['member'], ()) + self._group_members[attrs['name']] = members + + def _process_email(self, extuser, userdict): + try: + emailaddrs = userdict.pop(self.source.user_rev_attrs['email']) + except KeyError: + return # no email for that user, nothing to do + if not isinstance(emailaddrs, list): + emailaddrs = [emailaddrs] + for emailaddr in emailaddrs: + # search for existing email first, may be coming from another source + rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s', + {'addr': emailaddr}) + if not rset: + # not found, create it. first forge an external id + emailextid = userdict['dn'] + '@@' + emailaddr + extuser.values.setdefault('use_email', []).append(emailextid) + yield importer.ExtEntity('EmailAddress', emailextid, dict(address=[emailaddr])) + elif self.sourceuris: + # pop from sourceuris anyway, else email may be removed by the + # source once import is finished + emailextid = userdict['dn'] + '@@' + emailaddr + self.sourceuris.pop(emailextid, None) + # XXX else check use_email relation? def handle_deletion(self, config, cnx, myuris): if config['delete-entities']: @@ -108,22 +203,12 @@ wf.fire_transition_if_possible('deactivate') cnx.commit() - def update_if_necessary(self, entity, attrs): - # disable read security to allow password selection - with entity._cw.security_enabled(read=False): - entity.complete(tuple(attrs)) + def ensure_activated(self, entity): if entity.cw_etype == 'CWUser': wf = entity.cw_adapt_to('IWorkflowable') if wf.state == 'deactivated': wf.fire_transition('activate') self.info('user %s reactivated', entity.login) - mdate = attrs.get('modification_date') - if not mdate or mdate > entity.modification_date: - attrs = dict( (k, v) for k, v in attrs.items() - if v != getattr(entity, k)) - if attrs: - entity.cw_set(**attrs) - self.notify_updated(entity) def ldap2cwattrs(self, sdict, etype): """Transform dictionary of LDAP attributes to CW. @@ -145,40 +230,11 @@ 'source attribute %s has not been found in the source, ' 'please check the %s-attrs-map field and the permissions of ' 'the LDAP binding user' % (sattr, etype[2:].lower())) + if not isinstance(value, list): + value = [value] tdict[tattr] = value return tdict - def before_entity_copy(self, entity, sourceparams): - etype = entity.cw_etype - if etype == 'EmailAddress': - entity.cw_edited['address'] = sourceparams['address'] - else: - self.ldap2cwattrs(sourceparams, etype, tdict=entity.cw_edited) - if etype == 'CWUser': - pwd = entity.cw_edited.get('upassword') - if not pwd: - # generate a dumb password if not fetched from ldap (see - # userPassword) - pwd = crypt_password(generate_password()) - entity.cw_edited['upassword'] = Binary(pwd) - return entity - - def after_entity_copy(self, entity, sourceparams): - super(DataFeedLDAPAdapter, self).after_entity_copy(entity, sourceparams) - etype = entity.cw_etype - if etype == 'EmailAddress': - return - # all CWUsers must be treated before CWGroups to have the in_group relation - # set correctly in _associate_ldapusers - elif etype == 'CWUser': - groups = list(filter(None, [self._get_group(name) - for name in self.source.user_default_groups])) - if groups: - entity.cw_set(in_group=groups) - self._process_email(entity, sourceparams) - elif etype == 'CWGroup': - self._process_membership(entity, sourceparams) - def is_deleted(self, extidplus, etype, eid): try: extid = extidplus.rsplit(b'@@', 1)[0] @@ -188,48 +244,11 @@ extid = extidplus return extid not in self.user_source_entities_by_extid - def _process_email(self, entity, userdict): - try: - emailaddrs = userdict[self.source.user_rev_attrs['email']] - except KeyError: - return # no email for that user, nothing to do - if not isinstance(emailaddrs, list): - emailaddrs = [emailaddrs] - for emailaddr in emailaddrs: - # search for existing email first, may be coming from another source - rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s', - {'addr': emailaddr}) - if not rset: - # not found, create it. first forge an external id - emailextid = userdict['dn'] + '@@' + emailaddr - email = self.extid2entity(emailextid, 'EmailAddress', - address=emailaddr) - entity.cw_set(use_email=email) - elif self.sourceuris: - # pop from sourceuris anyway, else email may be removed by the - # source once import is finished - uri = userdict['dn'] + '@@' + emailaddr - self.sourceuris.pop(uri, None) - # XXX else check use_email relation? - - def _process_membership(self, entity, sourceparams): - """ Find existing CWUsers with the same login as the memberUids in the - CWGroup entity and create the in_group relationship """ - mdate = sourceparams.get('modification_date') - if (not mdate or mdate > entity.modification_date): - self._cw.execute('DELETE U in_group G WHERE G eid %(g)s', - {'g':entity.eid}) - members = sourceparams.get(self.source.group_rev_attrs['member']) - if members: - members = ["'%s'" % e for e in members] - rql = 'SET U in_group G WHERE G eid %%(g)s, U login IN (%s)' % ','.join(members) - self._cw.execute(rql, {'g':entity.eid, }) - @cached def _get_group(self, name): try: return self._cw.execute('Any X WHERE X is CWGroup, X name %(name)s', - {'name': name}).get_entity(0, 0) + {'name': name})[0][0] except IndexError: self.error('group %r referenced by source configuration %r does not exist', name, self.source.uri)