# HG changeset patch # User Sylvain Thénault # Date 1475180596 -7200 # Node ID 3f81636a75dbed6b7c86431d19de85281585a585 # Parent e845746b4d3c0a4cddbf0408e1c7882919c49718 [datafeed] Drop entity deletion handling in the default source / parser This should be handled by specific parser to avoid using "entities.asource". Reimplement it in the ldap parser. Related to #15538288 diff -r e845746b4d3c -r 3f81636a75db cubicweb/server/sources/datafeed.py --- a/cubicweb/server/sources/datafeed.py Thu Sep 29 11:44:19 2016 +0200 +++ b/cubicweb/server/sources/datafeed.py Thu Sep 29 22:23:16 2016 +0200 @@ -73,7 +73,8 @@ {'type' : 'yn', 'default': False, 'help': ('Should already imported entities not found anymore on the ' - 'external source be deleted?'), + 'external source be deleted? Handling of this parameter ' + "will depend on source's parser."), 'group': 'datafeed-source', 'level': 2, }), ('logs-lifetime', @@ -230,10 +231,8 @@ def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): importlog = self.init_import_log(cnx, import_log_eid) - source_uris = self.source_uris(cnx) try: - parser = self._get_parser(cnx, import_log=importlog, - source_uris=source_uris) + parser = self._get_parser(cnx, import_log=importlog) except ObjectNotFound: msg = 'failed to load parser for %s' importlog.record_error(msg % ('source "%s"' % self.uri)) @@ -242,8 +241,6 @@ else: if parser.process_urls(self.urls, raise_on_error): self.warning("some error occurred, don't attempt to delete entities") - else: - parser.handle_deletion(self.config, cnx, source_uris) stats = parser.stats self.update_latest_retrieval(cnx) if stats.get('created'): @@ -254,11 +251,6 @@ cnx.commit() return stats - def source_uris(self, cnx): - sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s' - return dict((self.decode_extid(uri), (eid, type)) - for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall()) - def init_import_log(self, cnx, import_log_eid=None, **kwargs): if import_log_eid is None: import_log = cnx.create_entity('CWDataImport', cw_import_of=self, @@ -275,13 +267,10 @@ class DataFeedParser(AppObject): __registry__ = 'parsers' - def __init__(self, cnx, source, import_log=None, source_uris=None): + def __init__(self, cnx, source, import_log=None): super(DataFeedParser, self).__init__(cnx) self.source = source self.import_log = import_log - if source_uris is None: - source_uris = {} - self.source_uris = source_uris self.stats = {'created': set(), 'updated': set(), 'checked': set()} def normalize_url(self, url): @@ -397,18 +386,6 @@ """ return True - def handle_deletion(self, config, cnx, source_uris): - if config['delete-entities'] and source_uris: - byetype = {} - for extid, (eid, etype) in source_uris.items(): - if self.is_deleted(extid, etype, eid): - byetype.setdefault(etype, []).append(str(eid)) - for etype, eids in byetype.items(): - self.warning('delete %s %s entities', len(eids), etype) - cnx.execute('DELETE %s X WHERE X eid IN (%s)' - % (etype, ','.join(eids))) - cnx.commit() - def update_if_necessary(self, entity, attrs): entity.complete(tuple(attrs)) # check modification date and compare attribute values to only update diff -r e845746b4d3c -r 3f81636a75db cubicweb/server/test/unittest_datafeed.py --- a/cubicweb/server/test/unittest_datafeed.py Thu Sep 29 11:44:19 2016 +0200 +++ b/cubicweb/server/test/unittest_datafeed.py Thu Sep 29 22:23:16 2016 +0200 @@ -1,5 +1,5 @@ # coding: utf-8 -# copyright 2011-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2011-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -96,8 +96,6 @@ self.assertEqual(self.repo._type_source_cache[entity.eid], ('Card', b'http://www.cubicweb.org/', u'ô myfeed')) - self.assertEqual(dfsource.source_uris(cnx), - {b'http://www.cubicweb.org/': (entity.eid, 'Card')}) self.assertTrue(dfsource.latest_retrieval) self.assertTrue(dfsource.fresh()) diff -r e845746b4d3c -r 3f81636a75db cubicweb/sobjects/ldapparser.py --- a/cubicweb/sobjects/ldapparser.py Thu Sep 29 11:44:19 2016 +0200 +++ b/cubicweb/sobjects/ldapparser.py Thu Sep 29 22:23:16 2016 +0200 @@ -86,10 +86,18 @@ attrs)) return {} + def process_urls(self, *args, **kwargs): + """IDataFeedParser main entry point.""" + self._source_uris = {} + self._group_members = {} + error = super(DataFeedLDAPAdapter, self).process_urls(*args, **kwargs) + if not error: + self.handle_deletion() + return error + def process(self, url, raise_on_error=False): - """IDataFeedParser main entry point""" + """Called once by process_urls (several URL are not expected with this parser).""" self.debug('processing ldapfeed source %s %s', self.source, self.searchfilterstr) - self._group_members = {} eeimporter = self.build_importer(raise_on_error) for name in self.source.user_default_groups: geid = self._get_group(name) @@ -125,7 +133,10 @@ rset = self._cw.execute('Any XURI, X WHERE X cwuri XURI, X is {0},' ' X cw_source S, S name %(source)s'.format(etype), {'source': self.source.uri}) - extid2eid.update(dict((extid.encode('ascii'), eid) for extid, eid in rset)) + for extid, eid in rset: + extid = extid.encode('ascii') + extid2eid[extid] = eid + self._source_uris[extid] = (eid, etype) existing_relations = {} for rtype in ('in_group', 'use_email', 'owned_by'): rql = 'Any S,O WHERE S {} O, S cw_source SO, SO eid %(s)s'.format(rtype) @@ -154,6 +165,7 @@ # userPassword) pwd = crypt_password(generate_password()) attrs['upassword'] = set([pwd]) + self._source_uris.pop(userdict['dn'], None) extuser = importer.ExtEntity('CWUser', userdict['dn'].encode('ascii'), attrs) extuser.values['owned_by'] = set([extuser.extid]) for extemail in self._process_email(extuser, userdict): @@ -166,6 +178,7 @@ # generate groups for groupdict in self.group_source_entities_by_extid.values(): attrs = self.ldap2cwattrs(groupdict, 'CWGroup') + self._source_uris.pop(groupdict['dn'], None) extgroup = importer.ExtEntity('CWGroup', groupdict['dn'].encode('ascii'), attrs) yield extgroup # record group membership for later insertion @@ -184,28 +197,20 @@ rset = self._cw.execute('EmailAddress X WHERE X address %(addr)s', {'addr': emailaddr}) emailextid = (userdict['dn'] + '@@' + emailaddr).encode('ascii') + self._source_uris.pop(emailextid, None) if not rset: # not found, create it. first forge an external id extuser.values.setdefault('use_email', []).append(emailextid) yield importer.ExtEntity('EmailAddress', emailextid, dict(address=[emailaddr])) - elif self.source_uris: - # pop from source_uris anyway, else email may be removed by the - # source once import is finished - self.source_uris.pop(emailextid, None) # XXX else check use_email relation? - def handle_deletion(self, config, cnx, myuris): - if config['delete-entities']: - super(DataFeedLDAPAdapter, self).handle_deletion(config, cnx, myuris) - return - if myuris: - for extid, (eid, etype) in myuris.items(): - if etype != 'CWUser' or not self.is_deleted(extid, etype, eid): - continue - self.info('deactivate user %s', eid) - wf = cnx.entity_from_eid(eid).cw_adapt_to('IWorkflowable') - wf.fire_transition_if_possible('deactivate') - cnx.commit() + def handle_deletion(self): + for extid, (eid, etype) in self._source_uris.items(): + if etype != 'CWUser' or not self.is_deleted(extid, etype, eid): + continue + self.info('deactivate user %s', eid) + wf = self._cw.entity_from_eid(eid).cw_adapt_to('IWorkflowable') + wf.fire_transition_if_possible('deactivate') def ensure_activated(self, entity): if entity.cw_etype == 'CWUser':