cubicweb/server/sources/datafeed.py
changeset 11758 3f81636a75db
parent 11757 e845746b4d3c
child 11767 432f87a63057
equal deleted inserted replaced
11757:e845746b4d3c 11758:3f81636a75db
    71           }),
    71           }),
    72         ('delete-entities',
    72         ('delete-entities',
    73          {'type' : 'yn',
    73          {'type' : 'yn',
    74           'default': False,
    74           'default': False,
    75           'help': ('Should already imported entities not found anymore on the '
    75           'help': ('Should already imported entities not found anymore on the '
    76                    'external source be deleted?'),
    76                    'external source be deleted? Handling of this parameter '
       
    77                    "will depend on source's parser."),
    77           'group': 'datafeed-source', 'level': 2,
    78           'group': 'datafeed-source', 'level': 2,
    78           }),
    79           }),
    79         ('logs-lifetime',
    80         ('logs-lifetime',
    80          {'type': 'time',
    81          {'type': 'time',
    81           'default': '10d',
    82           'default': '10d',
   228         cnx.repo.threaded_task(sync)
   229         cnx.repo.threaded_task(sync)
   229         return {'import_log_eid': import_log.eid}
   230         return {'import_log_eid': import_log.eid}
   230 
   231 
   231     def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
   232     def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
   232         importlog = self.init_import_log(cnx, import_log_eid)
   233         importlog = self.init_import_log(cnx, import_log_eid)
   233         source_uris = self.source_uris(cnx)
   234         try:
   234         try:
   235             parser = self._get_parser(cnx, import_log=importlog)
   235             parser = self._get_parser(cnx, import_log=importlog,
       
   236                                       source_uris=source_uris)
       
   237         except ObjectNotFound:
   236         except ObjectNotFound:
   238             msg = 'failed to load parser for %s'
   237             msg = 'failed to load parser for %s'
   239             importlog.record_error(msg % ('source "%s"' % self.uri))
   238             importlog.record_error(msg % ('source "%s"' % self.uri))
   240             self.error(msg, self)
   239             self.error(msg, self)
   241             stats = {}
   240             stats = {}
   242         else:
   241         else:
   243             if parser.process_urls(self.urls, raise_on_error):
   242             if parser.process_urls(self.urls, raise_on_error):
   244                 self.warning("some error occurred, don't attempt to delete entities")
   243                 self.warning("some error occurred, don't attempt to delete entities")
   245             else:
       
   246                 parser.handle_deletion(self.config, cnx, source_uris)
       
   247             stats = parser.stats
   244             stats = parser.stats
   248         self.update_latest_retrieval(cnx)
   245         self.update_latest_retrieval(cnx)
   249         if stats.get('created'):
   246         if stats.get('created'):
   250             importlog.record_info('added %s entities' % len(stats['created']))
   247             importlog.record_info('added %s entities' % len(stats['created']))
   251         if stats.get('updated'):
   248         if stats.get('updated'):
   252             importlog.record_info('updated %s entities' % len(stats['updated']))
   249             importlog.record_info('updated %s entities' % len(stats['updated']))
   253         importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
   250         importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
   254         cnx.commit()
   251         cnx.commit()
   255         return stats
   252         return stats
   256 
       
   257     def source_uris(self, cnx):
       
   258         sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s'
       
   259         return dict((self.decode_extid(uri), (eid, type))
       
   260                     for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall())
       
   261 
   253 
   262     def init_import_log(self, cnx, import_log_eid=None, **kwargs):
   254     def init_import_log(self, cnx, import_log_eid=None, **kwargs):
   263         if import_log_eid is None:
   255         if import_log_eid is None:
   264             import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
   256             import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
   265                                            start_timestamp=datetime.now(tz=utc),
   257                                            start_timestamp=datetime.now(tz=utc),
   273 
   265 
   274 
   266 
   275 class DataFeedParser(AppObject):
   267 class DataFeedParser(AppObject):
   276     __registry__ = 'parsers'
   268     __registry__ = 'parsers'
   277 
   269 
   278     def __init__(self, cnx, source, import_log=None, source_uris=None):
   270     def __init__(self, cnx, source, import_log=None):
   279         super(DataFeedParser, self).__init__(cnx)
   271         super(DataFeedParser, self).__init__(cnx)
   280         self.source = source
   272         self.source = source
   281         self.import_log = import_log
   273         self.import_log = import_log
   282         if source_uris is None:
       
   283             source_uris = {}
       
   284         self.source_uris = source_uris
       
   285         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   274         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   286 
   275 
   287     def normalize_url(self, url):
   276     def normalize_url(self, url):
   288         """Normalize an url by looking if there is a replacement for it in
   277         """Normalize an url by looking if there is a replacement for it in
   289         `cubicweb.sobjects.URL_MAPPING`.
   278         `cubicweb.sobjects.URL_MAPPING`.
   395         is actually deleted. Always return True by default, put more sensible
   384         is actually deleted. Always return True by default, put more sensible
   396         stuff in sub-classes.
   385         stuff in sub-classes.
   397         """
   386         """
   398         return True
   387         return True
   399 
   388 
   400     def handle_deletion(self, config, cnx, source_uris):
       
   401         if config['delete-entities'] and source_uris:
       
   402             byetype = {}
       
   403             for extid, (eid, etype) in source_uris.items():
       
   404                 if self.is_deleted(extid, etype, eid):
       
   405                     byetype.setdefault(etype, []).append(str(eid))
       
   406             for etype, eids in byetype.items():
       
   407                 self.warning('delete %s %s entities', len(eids), etype)
       
   408                 cnx.execute('DELETE %s X WHERE X eid IN (%s)'
       
   409                             % (etype, ','.join(eids)))
       
   410             cnx.commit()
       
   411 
       
   412     def update_if_necessary(self, entity, attrs):
   389     def update_if_necessary(self, entity, attrs):
   413         entity.complete(tuple(attrs))
   390         entity.complete(tuple(attrs))
   414         # check modification date and compare attribute values to only update
   391         # check modification date and compare attribute values to only update
   415         # what's actually needed
   392         # what's actually needed
   416         self.notify_checked(entity)
   393         self.notify_checked(entity)