cubicweb/server/sources/datafeed.py
changeset 11254 4f467683b8c9
parent 11253 be480b9d6ee2
child 11255 58be5fe4a232
equal deleted inserted replaced
11253:be480b9d6ee2 11254:4f467683b8c9
   233     def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
   233     def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
   234         importlog = self.init_import_log(cnx, import_log_eid)
   234         importlog = self.init_import_log(cnx, import_log_eid)
   235         source_uris = self.source_uris(cnx)
   235         source_uris = self.source_uris(cnx)
   236         try:
   236         try:
   237             parser = self._get_parser(cnx, import_log=importlog,
   237             parser = self._get_parser(cnx, import_log=importlog,
   238                                       source_uris=source_uris)
   238                                       source_uris=source_uris,
       
   239                                       moved_uris=self.moved_uris(cnx))
   239         except ObjectNotFound:
   240         except ObjectNotFound:
   240             return {}
   241             return {}
   241         if parser.process_urls(self.urls, raise_on_error):
   242         if parser.process_urls(self.urls, raise_on_error):
   242             self.warning("some error occurred, don't attempt to delete entities")
   243             self.warning("some error occurred, don't attempt to delete entities")
   243         else:
   244         else:
   291     def source_uris(self, cnx):
   292     def source_uris(self, cnx):
   292         sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s'
   293         sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s'
   293         return dict((self.decode_extid(uri), (eid, type))
   294         return dict((self.decode_extid(uri), (eid, type))
   294                     for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall())
   295                     for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall())
   295 
   296 
       
   297     def moved_uris(self, cnx):
       
   298         sql = 'SELECT extid FROM moved_entities'
       
   299         return set(self.decode_extid(uri) for uri, in cnx.system_sql(sql).fetchall())
       
   300 
   296     def init_import_log(self, cnx, import_log_eid=None, **kwargs):
   301     def init_import_log(self, cnx, import_log_eid=None, **kwargs):
   297         if import_log_eid is None:
   302         if import_log_eid is None:
   298             import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
   303             import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
   299                                            start_timestamp=datetime.now(tz=utc),
   304                                            start_timestamp=datetime.now(tz=utc),
   300                                            **kwargs)
   305                                            **kwargs)
   307 
   312 
   308 
   313 
   309 class DataFeedParser(AppObject):
   314 class DataFeedParser(AppObject):
   310     __registry__ = 'parsers'
   315     __registry__ = 'parsers'
   311 
   316 
   312     def __init__(self, cnx, source, import_log=None, source_uris=None, **kwargs):
   317     def __init__(self, cnx, source, import_log=None, source_uris=None, moved_uris=None, **kwargs):
   313         super(DataFeedParser, self).__init__(cnx, **kwargs)
   318         super(DataFeedParser, self).__init__(cnx, **kwargs)
   314         self.source = source
   319         self.source = source
   315         self.import_log = import_log
   320         self.import_log = import_log
   316         if source_uris is None:
   321         if source_uris is None:
   317             source_uris = {}
   322             source_uris = {}
   318         self.source_uris = source_uris
   323         self.source_uris = source_uris
       
   324         if moved_uris is None:
       
   325             moved_uris = ()
       
   326         self.moved_uris = moved_uris
   319         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   327         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   320 
   328 
   321     def normalize_url(self, url):
   329     def normalize_url(self, url):
   322         """Normalize an url by looking if there is a replacement for it in
   330         """Normalize an url by looking if there is a replacement for it in
   323         `cubicweb.sobjects.URL_MAPPING`.
   331         `cubicweb.sobjects.URL_MAPPING`.