[ldapfeed / dataimport] port ldapfeed parser to dataimport API
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Tue, 24 Nov 2015 14:36:41 +0100
changeset 10912 addc7ba36f69
parent 10911 cff2dbc33eff
child 10913 5d7f17054ae6
[ldapfeed / dataimport] port ldapfeed parser to dataimport API This makes the code easier to understand and will allow to deprecate the old multi-sources api, based on creation callback through repo.extid2eid. Currently with this patch, modification dates are not checked, hence entities will be systematically updated. We run the import with no hooks, because RQLObjectStore can only add entities to the system source.
server/test/unittest_ldapsource.py
sobjects/ldapparser.py
--- a/server/test/unittest_ldapsource.py	Mon Nov 23 14:34:13 2015 +0100
+++ b/server/test/unittest_ldapsource.py	Tue Nov 24 14:36:41 2015 +0100
@@ -443,7 +443,6 @@
         try:
             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                    {('add', 'memberUid'): ['syt']})
-            time.sleep(1.1) # timestamps precision is 1s
             with self.repo.internal_cnx() as cnx:
                 self.pull(cnx)
 
@@ -471,7 +470,6 @@
         try:
             self.update_ldap_entry('cn=logilab,ou=Group,dc=cubicweb,dc=test',
                                    {('delete', 'memberUid'): ['adim']})
-            time.sleep(1.1) # timestamps precision is 1s
             with self.repo.internal_cnx() as cnx:
                 self.pull(cnx)
 
--- a/sobjects/ldapparser.py	Mon Nov 23 14:34:13 2015 +0100
+++ b/sobjects/ldapparser.py	Tue Nov 24 14:36:41 2015 +0100
@@ -28,6 +28,20 @@
 from cubicweb import Binary, ConfigurationError
 from cubicweb.server.utils import crypt_password
 from cubicweb.server.sources import datafeed
+from cubicweb.dataimport import stores, importer
+
+
+class UserMetaGenerator(stores.MetaGenerator):
+    """Specific metadata generator, used to see newly created user into their initial state.
+    """
+    @cached
+    def base_etype_dicts(self, entity):
+        entity, rels = super(UserMetaGenerator, self).base_etype_dicts(entity)
+        if entity.cw_etype == 'CWUser':
+            wf_state = self._cnx.execute('Any S WHERE ET default_workflow WF, ET name %(etype)s, '
+                                         'WF initial_state S', {'etype': entity.cw_etype}).one()
+            rels['in_state'] = wf_state.eid
+        return entity, rels
 
 
 class DataFeedLDAPAdapter(datafeed.DataFeedParser):
@@ -72,28 +86,109 @@
                                                         attrs))
         return {}
 
-    def _process(self, etype, sdict, raise_on_error=False):
-        self.debug('fetched %s %s', etype, sdict)
-        extid = sdict['dn']
-        entity = self.extid2entity(extid, etype,
-                                   raise_on_error=raise_on_error, **sdict)
-        if entity is not None and not self.created_during_pull(entity):
-            self.notify_updated(entity)
-            attrs = self.ldap2cwattrs(sdict, etype)
-            self.update_if_necessary(entity, attrs)
-            if etype == 'CWUser':
-                self._process_email(entity, sdict)
-            if etype == 'CWGroup':
-                self._process_membership(entity, sdict)
-
     def process(self, url, raise_on_error=False):
         """IDataFeedParser main entry point"""
         self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr)
-        for userdict in self.user_source_entities_by_extid.values():
-            self._process('CWUser', userdict)
+        self._group_members = {}
+        eeimporter = self.build_importer(raise_on_error)
+        for name in self.source.user_default_groups:
+            geid = self._get_group(name)
+            eeimporter.extid2eid[geid] = geid
+        entities = self.extentities_generator()
+        set_cwuri = importer.use_extid_as_cwuri(eeimporter.extid2eid)
+        eeimporter.import_entities(set_cwuri(entities))
+        self.stats['created'] = eeimporter.created
+        self.stats['updated'] = eeimporter.updated
+        # handle in_group relation
+        for group, members in self._group_members.items():
+            self._cw.execute('DELETE U in_group G WHERE G name %(g)s', {'g': group})
+            if members:
+                members = ["'%s'" % e for e in members]
+                rql = 'SET U in_group G WHERE G name %%(g)s, U login IN (%s)' % ','.join(members)
+                self._cw.execute(rql, {'g': group})
+        # ensure updated users are activated
+        for eid in eeimporter.updated:
+            entity = self._cw.entity_from_eid(eid)
+            if entity.cw_etype == 'CWUser':
+                self.ensure_activated(entity)
+        # manually set primary email if necessary, it's not handled automatically since hooks are
+        # deactivated
+        self._cw.execute('SET X primary_email E WHERE NOT X primary_email E, X use_email E, '
+                         'X cw_source S, S eid %(s)s, X in_state ST, TS name "activated"',
+                         {'s': self.source.eid})
+
+    def build_importer(self, raise_on_error):
+        """Instantiate and configure an importer"""
+        etypes = ('CWUser', 'EmailAddress', 'CWGroup')
+        extid2eid = importer.cwuri2eid(self._cw, etypes, source_eid=self.source.eid)
+        existing_relations = {}
+        for rtype in ('in_group', 'use_email', 'owned_by'):
+            rql = 'Any S,O WHERE S {} O, S cw_source SO, SO eid %(s)s'.format(rtype)
+            rset = self._cw.execute(rql, {'s': self.source.eid})
+            existing_relations[rtype] = set(tuple(x) for x in rset)
+        return importer.ExtEntitiesImporter(self._cw.vreg.schema, self.build_store(),
+                                            extid2eid=extid2eid,
+                                            existing_relations=existing_relations,
+                                            etypes_order_hint=etypes,
+                                            import_log=self.import_log,
+                                            raise_on_error=raise_on_error)
+
+    def build_store(self):
+        """Instantiate and configure a store"""
+        metagenerator = UserMetaGenerator(self._cw, source=self.source)
+        return stores.NoHookRQLObjectStore(self._cw, metagenerator)
+
+    def extentities_generator(self):
         self.debug('processing ldapfeed source %s %s', self.source, self.searchgroupfilterstr)
+        # generate users and email addresses
+        for userdict in self.user_source_entities_by_extid.values():
+            attrs = self.ldap2cwattrs(userdict, 'CWUser')
+            pwd = attrs.get('upassword')
+            if not pwd:
+                # generate a dumb password if not fetched from ldap (see
+                # userPassword)
+                pwd = crypt_password(generate_password())
+                attrs['upassword'] = set([Binary(pwd)])
+            extuser = importer.ExtEntity('CWUser', userdict['dn'], attrs)
+            extuser.values['owned_by'] = set([extuser.extid])
+            for extemail in self._process_email(extuser, userdict):
+                yield extemail
+            groups = list(filter(None, [self._get_group(name)
+                                        for name in self.source.user_default_groups]))
+            if groups:
+                extuser.values['in_group'] = groups
+            yield extuser
+        # generate groups
         for groupdict in self.group_source_entities_by_extid.values():
-            self._process('CWGroup', groupdict, raise_on_error=raise_on_error)
+            attrs = self.ldap2cwattrs(groupdict, 'CWGroup')
+            extgroup = importer.ExtEntity('CWGroup', groupdict['dn'], attrs)
+            yield extgroup
+            # record group membership for later insertion
+            members = groupdict.get(self.source.group_rev_attrs['member'], ())
+            self._group_members[attrs['name']] = members
+
+    def _process_email(self, extuser, userdict):
+        try:
+            emailaddrs = userdict.pop(self.source.user_rev_attrs['email'])
+        except KeyError:
+            return  # no email for that user, nothing to do
+        if not isinstance(emailaddrs, list):
+            emailaddrs = [emailaddrs]
+        for emailaddr in emailaddrs:
+            # search for existing email first, may be coming from another source
+            rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
+                                    {'addr': emailaddr})
+            if not rset:
+                # not found, create it. first forge an external id
+                emailextid = userdict['dn'] + '@@' + emailaddr
+                extuser.values.setdefault('use_email', []).append(emailextid)
+                yield importer.ExtEntity('EmailAddress', emailextid, dict(address=[emailaddr]))
+            elif self.sourceuris:
+                # pop from sourceuris anyway, else email may be removed by the
+                # source once import is finished
+                emailextid = userdict['dn'] + '@@' + emailaddr
+                self.sourceuris.pop(emailextid, None)
+            # XXX else check use_email relation?
 
     def handle_deletion(self, config, cnx, myuris):
         if config['delete-entities']:
@@ -108,22 +203,12 @@
                 wf.fire_transition_if_possible('deactivate')
         cnx.commit()
 
-    def update_if_necessary(self, entity, attrs):
-        # disable read security to allow password selection
-        with entity._cw.security_enabled(read=False):
-            entity.complete(tuple(attrs))
+    def ensure_activated(self, entity):
         if entity.cw_etype == 'CWUser':
             wf = entity.cw_adapt_to('IWorkflowable')
             if wf.state == 'deactivated':
                 wf.fire_transition('activate')
                 self.info('user %s reactivated', entity.login)
-        mdate = attrs.get('modification_date')
-        if not mdate or mdate > entity.modification_date:
-            attrs = dict( (k, v) for k, v in attrs.items()
-                          if v != getattr(entity, k))
-            if attrs:
-                entity.cw_set(**attrs)
-                self.notify_updated(entity)
 
     def ldap2cwattrs(self, sdict, etype):
         """Transform dictionary of LDAP attributes to CW.
@@ -145,40 +230,11 @@
                         'source attribute %s has not been found in the source, '
                         'please check the %s-attrs-map field and the permissions of '
                         'the LDAP binding user' % (sattr, etype[2:].lower()))
+                if not isinstance(value, list):
+                    value = [value]
                 tdict[tattr] = value
         return tdict
 
-    def before_entity_copy(self, entity, sourceparams):
-        etype = entity.cw_etype
-        if etype == 'EmailAddress':
-            entity.cw_edited['address'] = sourceparams['address']
-        else:
-            self.ldap2cwattrs(sourceparams, etype, tdict=entity.cw_edited)
-            if etype == 'CWUser':
-                pwd = entity.cw_edited.get('upassword')
-                if not pwd:
-                    # generate a dumb password if not fetched from ldap (see
-                    # userPassword)
-                    pwd = crypt_password(generate_password())
-                    entity.cw_edited['upassword'] = Binary(pwd)
-        return entity
-
-    def after_entity_copy(self, entity, sourceparams):
-        super(DataFeedLDAPAdapter, self).after_entity_copy(entity, sourceparams)
-        etype = entity.cw_etype
-        if etype == 'EmailAddress':
-            return
-        # all CWUsers must be treated before CWGroups to have the in_group relation
-        # set correctly in _associate_ldapusers
-        elif etype == 'CWUser':
-            groups = list(filter(None, [self._get_group(name)
-                                        for name in self.source.user_default_groups]))
-            if groups:
-                entity.cw_set(in_group=groups)
-            self._process_email(entity, sourceparams)
-        elif etype == 'CWGroup':
-            self._process_membership(entity, sourceparams)
-
     def is_deleted(self, extidplus, etype, eid):
         try:
             extid = extidplus.rsplit(b'@@', 1)[0]
@@ -188,48 +244,11 @@
             extid = extidplus
         return extid not in self.user_source_entities_by_extid
 
-    def _process_email(self, entity, userdict):
-        try:
-            emailaddrs = userdict[self.source.user_rev_attrs['email']]
-        except KeyError:
-            return # no email for that user, nothing to do
-        if not isinstance(emailaddrs, list):
-            emailaddrs = [emailaddrs]
-        for emailaddr in emailaddrs:
-            # search for existing email first, may be coming from another source
-            rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
-                                   {'addr': emailaddr})
-            if not rset:
-                # not found, create it. first forge an external id
-                emailextid = userdict['dn'] + '@@' + emailaddr
-                email = self.extid2entity(emailextid, 'EmailAddress',
-                                          address=emailaddr)
-                entity.cw_set(use_email=email)
-            elif self.sourceuris:
-                # pop from sourceuris anyway, else email may be removed by the
-                # source once import is finished
-                uri = userdict['dn'] + '@@' + emailaddr
-                self.sourceuris.pop(uri, None)
-            # XXX else check use_email relation?
-
-    def _process_membership(self, entity, sourceparams):
-        """ Find existing CWUsers with the same login as the memberUids in the
-        CWGroup entity and create the in_group relationship """
-        mdate = sourceparams.get('modification_date')
-        if (not mdate or mdate > entity.modification_date):
-            self._cw.execute('DELETE U in_group G WHERE G eid %(g)s',
-                             {'g':entity.eid})
-            members = sourceparams.get(self.source.group_rev_attrs['member'])
-            if members:
-                members = ["'%s'" % e for e in members]
-                rql = 'SET U in_group G WHERE G eid %%(g)s, U login IN (%s)' % ','.join(members)
-                self._cw.execute(rql, {'g':entity.eid,  })
-
     @cached
     def _get_group(self, name):
         try:
             return self._cw.execute('Any X WHERE X is CWGroup, X name %(name)s',
-                                    {'name': name}).get_entity(0, 0)
+                                    {'name': name})[0][0]
         except IndexError:
             self.error('group %r referenced by source configuration %r does not exist',
                        name, self.source.uri)