sobjects/ldapparser.py
changeset 10912 addc7ba36f69
parent 10910 5ba4de264be4
child 11037 621ad47c7c86
--- a/sobjects/ldapparser.py	Mon Nov 23 14:34:13 2015 +0100
+++ b/sobjects/ldapparser.py	Tue Nov 24 14:36:41 2015 +0100
@@ -28,6 +28,20 @@
 from cubicweb import Binary, ConfigurationError
 from cubicweb.server.utils import crypt_password
 from cubicweb.server.sources import datafeed
+from cubicweb.dataimport import stores, importer
+
+
+class UserMetaGenerator(stores.MetaGenerator):
+    """Specific metadata generator, used to see newly created user into their initial state.
+    """
+    @cached
+    def base_etype_dicts(self, entity):
+        entity, rels = super(UserMetaGenerator, self).base_etype_dicts(entity)
+        if entity.cw_etype == 'CWUser':
+            wf_state = self._cnx.execute('Any S WHERE ET default_workflow WF, ET name %(etype)s, '
+                                         'WF initial_state S', {'etype': entity.cw_etype}).one()
+            rels['in_state'] = wf_state.eid
+        return entity, rels
 
 
 class DataFeedLDAPAdapter(datafeed.DataFeedParser):
@@ -72,28 +86,109 @@
                                                         attrs))
         return {}
 
-    def _process(self, etype, sdict, raise_on_error=False):
-        self.debug('fetched %s %s', etype, sdict)
-        extid = sdict['dn']
-        entity = self.extid2entity(extid, etype,
-                                   raise_on_error=raise_on_error, **sdict)
-        if entity is not None and not self.created_during_pull(entity):
-            self.notify_updated(entity)
-            attrs = self.ldap2cwattrs(sdict, etype)
-            self.update_if_necessary(entity, attrs)
-            if etype == 'CWUser':
-                self._process_email(entity, sdict)
-            if etype == 'CWGroup':
-                self._process_membership(entity, sdict)
-
     def process(self, url, raise_on_error=False):
         """IDataFeedParser main entry point"""
         self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr)
-        for userdict in self.user_source_entities_by_extid.values():
-            self._process('CWUser', userdict)
+        self._group_members = {}
+        eeimporter = self.build_importer(raise_on_error)
+        for name in self.source.user_default_groups:
+            geid = self._get_group(name)
+            eeimporter.extid2eid[geid] = geid
+        entities = self.extentities_generator()
+        set_cwuri = importer.use_extid_as_cwuri(eeimporter.extid2eid)
+        eeimporter.import_entities(set_cwuri(entities))
+        self.stats['created'] = eeimporter.created
+        self.stats['updated'] = eeimporter.updated
+        # handle in_group relation
+        for group, members in self._group_members.items():
+            self._cw.execute('DELETE U in_group G WHERE G name %(g)s', {'g': group})
+            if members:
+                members = ["'%s'" % e for e in members]
+                rql = 'SET U in_group G WHERE G name %%(g)s, U login IN (%s)' % ','.join(members)
+                self._cw.execute(rql, {'g': group})
+        # ensure updated users are activated
+        for eid in eeimporter.updated:
+            entity = self._cw.entity_from_eid(eid)
+            if entity.cw_etype == 'CWUser':
+                self.ensure_activated(entity)
+        # manually set primary email if necessary, it's not handled automatically since hooks are
+        # deactivated
+        self._cw.execute('SET X primary_email E WHERE NOT X primary_email E, X use_email E, '
+                         'X cw_source S, S eid %(s)s, X in_state ST, TS name "activated"',
+                         {'s': self.source.eid})
+
+    def build_importer(self, raise_on_error):
+        """Instantiate and configure an importer"""
+        etypes = ('CWUser', 'EmailAddress', 'CWGroup')
+        extid2eid = importer.cwuri2eid(self._cw, etypes, source_eid=self.source.eid)
+        existing_relations = {}
+        for rtype in ('in_group', 'use_email', 'owned_by'):
+            rql = 'Any S,O WHERE S {} O, S cw_source SO, SO eid %(s)s'.format(rtype)
+            rset = self._cw.execute(rql, {'s': self.source.eid})
+            existing_relations[rtype] = set(tuple(x) for x in rset)
+        return importer.ExtEntitiesImporter(self._cw.vreg.schema, self.build_store(),
+                                            extid2eid=extid2eid,
+                                            existing_relations=existing_relations,
+                                            etypes_order_hint=etypes,
+                                            import_log=self.import_log,
+                                            raise_on_error=raise_on_error)
+
+    def build_store(self):
+        """Instantiate and configure a store"""
+        metagenerator = UserMetaGenerator(self._cw, source=self.source)
+        return stores.NoHookRQLObjectStore(self._cw, metagenerator)
+
+    def extentities_generator(self):
         self.debug('processing ldapfeed source %s %s', self.source, self.searchgroupfilterstr)
+        # generate users and email addresses
+        for userdict in self.user_source_entities_by_extid.values():
+            attrs = self.ldap2cwattrs(userdict, 'CWUser')
+            pwd = attrs.get('upassword')
+            if not pwd:
+                # generate a dumb password if not fetched from ldap (see
+                # userPassword)
+                pwd = crypt_password(generate_password())
+                attrs['upassword'] = set([Binary(pwd)])
+            extuser = importer.ExtEntity('CWUser', userdict['dn'], attrs)
+            extuser.values['owned_by'] = set([extuser.extid])
+            for extemail in self._process_email(extuser, userdict):
+                yield extemail
+            groups = list(filter(None, [self._get_group(name)
+                                        for name in self.source.user_default_groups]))
+            if groups:
+                extuser.values['in_group'] = groups
+            yield extuser
+        # generate groups
         for groupdict in self.group_source_entities_by_extid.values():
-            self._process('CWGroup', groupdict, raise_on_error=raise_on_error)
+            attrs = self.ldap2cwattrs(groupdict, 'CWGroup')
+            extgroup = importer.ExtEntity('CWGroup', groupdict['dn'], attrs)
+            yield extgroup
+            # record group membership for later insertion
+            members = groupdict.get(self.source.group_rev_attrs['member'], ())
+            self._group_members[attrs['name']] = members
+
+    def _process_email(self, extuser, userdict):
+        try:
+            emailaddrs = userdict.pop(self.source.user_rev_attrs['email'])
+        except KeyError:
+            return  # no email for that user, nothing to do
+        if not isinstance(emailaddrs, list):
+            emailaddrs = [emailaddrs]
+        for emailaddr in emailaddrs:
+            # search for existing email first, may be coming from another source
+            rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
+                                    {'addr': emailaddr})
+            if not rset:
+                # not found, create it. first forge an external id
+                emailextid = userdict['dn'] + '@@' + emailaddr
+                extuser.values.setdefault('use_email', []).append(emailextid)
+                yield importer.ExtEntity('EmailAddress', emailextid, dict(address=[emailaddr]))
+            elif self.sourceuris:
+                # pop from sourceuris anyway, else email may be removed by the
+                # source once import is finished
+                emailextid = userdict['dn'] + '@@' + emailaddr
+                self.sourceuris.pop(emailextid, None)
+            # XXX else check use_email relation?
 
     def handle_deletion(self, config, cnx, myuris):
         if config['delete-entities']:
@@ -108,22 +203,12 @@
                 wf.fire_transition_if_possible('deactivate')
         cnx.commit()
 
-    def update_if_necessary(self, entity, attrs):
-        # disable read security to allow password selection
-        with entity._cw.security_enabled(read=False):
-            entity.complete(tuple(attrs))
+    def ensure_activated(self, entity):
         if entity.cw_etype == 'CWUser':
             wf = entity.cw_adapt_to('IWorkflowable')
             if wf.state == 'deactivated':
                 wf.fire_transition('activate')
                 self.info('user %s reactivated', entity.login)
-        mdate = attrs.get('modification_date')
-        if not mdate or mdate > entity.modification_date:
-            attrs = dict( (k, v) for k, v in attrs.items()
-                          if v != getattr(entity, k))
-            if attrs:
-                entity.cw_set(**attrs)
-                self.notify_updated(entity)
 
     def ldap2cwattrs(self, sdict, etype):
         """Transform dictionary of LDAP attributes to CW.
@@ -145,40 +230,11 @@
                         'source attribute %s has not been found in the source, '
                         'please check the %s-attrs-map field and the permissions of '
                         'the LDAP binding user' % (sattr, etype[2:].lower()))
+                if not isinstance(value, list):
+                    value = [value]
                 tdict[tattr] = value
         return tdict
 
-    def before_entity_copy(self, entity, sourceparams):
-        etype = entity.cw_etype
-        if etype == 'EmailAddress':
-            entity.cw_edited['address'] = sourceparams['address']
-        else:
-            self.ldap2cwattrs(sourceparams, etype, tdict=entity.cw_edited)
-            if etype == 'CWUser':
-                pwd = entity.cw_edited.get('upassword')
-                if not pwd:
-                    # generate a dumb password if not fetched from ldap (see
-                    # userPassword)
-                    pwd = crypt_password(generate_password())
-                    entity.cw_edited['upassword'] = Binary(pwd)
-        return entity
-
-    def after_entity_copy(self, entity, sourceparams):
-        super(DataFeedLDAPAdapter, self).after_entity_copy(entity, sourceparams)
-        etype = entity.cw_etype
-        if etype == 'EmailAddress':
-            return
-        # all CWUsers must be treated before CWGroups to have the in_group relation
-        # set correctly in _associate_ldapusers
-        elif etype == 'CWUser':
-            groups = list(filter(None, [self._get_group(name)
-                                        for name in self.source.user_default_groups]))
-            if groups:
-                entity.cw_set(in_group=groups)
-            self._process_email(entity, sourceparams)
-        elif etype == 'CWGroup':
-            self._process_membership(entity, sourceparams)
-
     def is_deleted(self, extidplus, etype, eid):
         try:
             extid = extidplus.rsplit(b'@@', 1)[0]
@@ -188,48 +244,11 @@
             extid = extidplus
         return extid not in self.user_source_entities_by_extid
 
-    def _process_email(self, entity, userdict):
-        try:
-            emailaddrs = userdict[self.source.user_rev_attrs['email']]
-        except KeyError:
-            return # no email for that user, nothing to do
-        if not isinstance(emailaddrs, list):
-            emailaddrs = [emailaddrs]
-        for emailaddr in emailaddrs:
-            # search for existing email first, may be coming from another source
-            rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
-                                   {'addr': emailaddr})
-            if not rset:
-                # not found, create it. first forge an external id
-                emailextid = userdict['dn'] + '@@' + emailaddr
-                email = self.extid2entity(emailextid, 'EmailAddress',
-                                          address=emailaddr)
-                entity.cw_set(use_email=email)
-            elif self.sourceuris:
-                # pop from sourceuris anyway, else email may be removed by the
-                # source once import is finished
-                uri = userdict['dn'] + '@@' + emailaddr
-                self.sourceuris.pop(uri, None)
-            # XXX else check use_email relation?
-
-    def _process_membership(self, entity, sourceparams):
-        """ Find existing CWUsers with the same login as the memberUids in the
-        CWGroup entity and create the in_group relationship """
-        mdate = sourceparams.get('modification_date')
-        if (not mdate or mdate > entity.modification_date):
-            self._cw.execute('DELETE U in_group G WHERE G eid %(g)s',
-                             {'g':entity.eid})
-            members = sourceparams.get(self.source.group_rev_attrs['member'])
-            if members:
-                members = ["'%s'" % e for e in members]
-                rql = 'SET U in_group G WHERE G eid %%(g)s, U login IN (%s)' % ','.join(members)
-                self._cw.execute(rql, {'g':entity.eid,  })
-
     @cached
     def _get_group(self, name):
         try:
             return self._cw.execute('Any X WHERE X is CWGroup, X name %(name)s',
-                                    {'name': name}).get_entity(0, 0)
+                                    {'name': name})[0][0]
         except IndexError:
             self.error('group %r referenced by source configuration %r does not exist',
                        name, self.source.uri)