[datafeed] Drop entity deletion handling in the default source / parser
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 29 Sep 2016 22:23:16 +0200
changeset 11758 3f81636a75db
parent 11757 e845746b4d3c
child 11759 7b7108eb8178
[datafeed] Drop entity deletion handling in the default source / parser This should be handled by specific parser to avoid using "entities.asource". Reimplement it in the ldap parser. Related to #15538288
cubicweb/server/sources/datafeed.py
cubicweb/server/test/unittest_datafeed.py
cubicweb/sobjects/ldapparser.py
--- a/cubicweb/server/sources/datafeed.py	Thu Sep 29 11:44:19 2016 +0200
+++ b/cubicweb/server/sources/datafeed.py	Thu Sep 29 22:23:16 2016 +0200
@@ -73,7 +73,8 @@
          {'type' : 'yn',
           'default': False,
           'help': ('Should already imported entities not found anymore on the '
-                   'external source be deleted?'),
+                   'external source be deleted? Handling of this parameter '
+                   "will depend on source's parser."),
           'group': 'datafeed-source', 'level': 2,
           }),
         ('logs-lifetime',
@@ -230,10 +231,8 @@
 
     def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
         importlog = self.init_import_log(cnx, import_log_eid)
-        source_uris = self.source_uris(cnx)
         try:
-            parser = self._get_parser(cnx, import_log=importlog,
-                                      source_uris=source_uris)
+            parser = self._get_parser(cnx, import_log=importlog)
         except ObjectNotFound:
             msg = 'failed to load parser for %s'
             importlog.record_error(msg % ('source "%s"' % self.uri))
@@ -242,8 +241,6 @@
         else:
             if parser.process_urls(self.urls, raise_on_error):
                 self.warning("some error occurred, don't attempt to delete entities")
-            else:
-                parser.handle_deletion(self.config, cnx, source_uris)
             stats = parser.stats
         self.update_latest_retrieval(cnx)
         if stats.get('created'):
@@ -254,11 +251,6 @@
         cnx.commit()
         return stats
 
-    def source_uris(self, cnx):
-        sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s'
-        return dict((self.decode_extid(uri), (eid, type))
-                    for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall())
-
     def init_import_log(self, cnx, import_log_eid=None, **kwargs):
         if import_log_eid is None:
             import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
@@ -275,13 +267,10 @@
 class DataFeedParser(AppObject):
     __registry__ = 'parsers'
 
-    def __init__(self, cnx, source, import_log=None, source_uris=None):
+    def __init__(self, cnx, source, import_log=None):
         super(DataFeedParser, self).__init__(cnx)
         self.source = source
         self.import_log = import_log
-        if source_uris is None:
-            source_uris = {}
-        self.source_uris = source_uris
         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
 
     def normalize_url(self, url):
@@ -397,18 +386,6 @@
         """
         return True
 
-    def handle_deletion(self, config, cnx, source_uris):
-        if config['delete-entities'] and source_uris:
-            byetype = {}
-            for extid, (eid, etype) in source_uris.items():
-                if self.is_deleted(extid, etype, eid):
-                    byetype.setdefault(etype, []).append(str(eid))
-            for etype, eids in byetype.items():
-                self.warning('delete %s %s entities', len(eids), etype)
-                cnx.execute('DELETE %s X WHERE X eid IN (%s)'
-                            % (etype, ','.join(eids)))
-            cnx.commit()
-
     def update_if_necessary(self, entity, attrs):
         entity.complete(tuple(attrs))
         # check modification date and compare attribute values to only update
--- a/cubicweb/server/test/unittest_datafeed.py	Thu Sep 29 11:44:19 2016 +0200
+++ b/cubicweb/server/test/unittest_datafeed.py	Thu Sep 29 22:23:16 2016 +0200
@@ -1,5 +1,5 @@
 # coding: utf-8
-# copyright 2011-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2011-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -96,8 +96,6 @@
                 self.assertEqual(self.repo._type_source_cache[entity.eid],
                                  ('Card', b'http://www.cubicweb.org/', u'รด myfeed'))
 
-                self.assertEqual(dfsource.source_uris(cnx),
-                                 {b'http://www.cubicweb.org/': (entity.eid, 'Card')})
                 self.assertTrue(dfsource.latest_retrieval)
                 self.assertTrue(dfsource.fresh())
 
--- a/cubicweb/sobjects/ldapparser.py	Thu Sep 29 11:44:19 2016 +0200
+++ b/cubicweb/sobjects/ldapparser.py	Thu Sep 29 22:23:16 2016 +0200
@@ -86,10 +86,18 @@
                                                         attrs))
         return {}
 
+    def process_urls(self, *args, **kwargs):
+        """IDataFeedParser main entry point."""
+        self._source_uris = {}
+        self._group_members = {}
+        error = super(DataFeedLDAPAdapter, self).process_urls(*args, **kwargs)
+        if not error:
+            self.handle_deletion()
+        return error
+
     def process(self, url, raise_on_error=False):
-        """IDataFeedParser main entry point"""
+        """Called once by process_urls (several URL are not expected with this parser)."""
         self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr)
-        self._group_members = {}
         eeimporter = self.build_importer(raise_on_error)
         for name in self.source.user_default_groups:
             geid = self._get_group(name)
@@ -125,7 +133,10 @@
             rset = self._cw.execute('Any XURI, X WHERE X cwuri XURI, X is {0},'
                                     ' X cw_source S, S name %(source)s'.format(etype),
                                     {'source': self.source.uri})
-            extid2eid.update(dict((extid.encode('ascii'), eid) for extid, eid in rset))
+            for extid, eid in rset:
+                extid = extid.encode('ascii')
+                extid2eid[extid] = eid
+                self._source_uris[extid] = (eid, etype)
         existing_relations = {}
         for rtype in ('in_group', 'use_email', 'owned_by'):
             rql = 'Any S,O WHERE S {} O, S cw_source SO, SO eid %(s)s'.format(rtype)
@@ -154,6 +165,7 @@
                 # userPassword)
                 pwd = crypt_password(generate_password())
                 attrs['upassword'] = set([pwd])
+            self._source_uris.pop(userdict['dn'], None)
             extuser = importer.ExtEntity('CWUser', userdict['dn'].encode('ascii'), attrs)
             extuser.values['owned_by'] = set([extuser.extid])
             for extemail in self._process_email(extuser, userdict):
@@ -166,6 +178,7 @@
         # generate groups
         for groupdict in self.group_source_entities_by_extid.values():
             attrs = self.ldap2cwattrs(groupdict, 'CWGroup')
+            self._source_uris.pop(groupdict['dn'], None)
             extgroup = importer.ExtEntity('CWGroup', groupdict['dn'].encode('ascii'), attrs)
             yield extgroup
             # record group membership for later insertion
@@ -184,28 +197,20 @@
             rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s',
                                     {'addr': emailaddr})
             emailextid = (userdict['dn'] + '@@' + emailaddr).encode('ascii')
+            self._source_uris.pop(emailextid, None)
             if not rset:
                 # not found, create it. first forge an external id
                 extuser.values.setdefault('use_email', []).append(emailextid)
                 yield importer.ExtEntity('EmailAddress', emailextid, dict(address=[emailaddr]))
-            elif self.source_uris:
-                # pop from source_uris anyway, else email may be removed by the
-                # source once import is finished
-                self.source_uris.pop(emailextid, None)
             # XXX else check use_email relation?
 
-    def handle_deletion(self, config, cnx, myuris):
-        if config['delete-entities']:
-            super(DataFeedLDAPAdapter, self).handle_deletion(config, cnx, myuris)
-            return
-        if myuris:
-            for extid, (eid, etype) in myuris.items():
-                if etype != 'CWUser' or not self.is_deleted(extid, etype, eid):
-                    continue
-                self.info('deactivate user %s', eid)
-                wf = cnx.entity_from_eid(eid).cw_adapt_to('IWorkflowable')
-                wf.fire_transition_if_possible('deactivate')
-        cnx.commit()
+    def handle_deletion(self):
+        for extid, (eid, etype) in self._source_uris.items():
+            if etype != 'CWUser' or not self.is_deleted(extid, etype, eid):
+                continue
+            self.info('deactivate user %s', eid)
+            wf = self._cw.entity_from_eid(eid).cw_adapt_to('IWorkflowable')
+            wf.fire_transition_if_possible('deactivate')
 
     def ensure_activated(self, entity):
         if entity.cw_etype == 'CWUser':