233 def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): |
233 def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): |
234 importlog = self.init_import_log(cnx, import_log_eid) |
234 importlog = self.init_import_log(cnx, import_log_eid) |
235 source_uris = self.source_uris(cnx) |
235 source_uris = self.source_uris(cnx) |
236 try: |
236 try: |
237 parser = self._get_parser(cnx, import_log=importlog, |
237 parser = self._get_parser(cnx, import_log=importlog, |
238 source_uris=source_uris) |
238 source_uris=source_uris, |
|
239 moved_uris=self.moved_uris(cnx)) |
239 except ObjectNotFound: |
240 except ObjectNotFound: |
240 return {} |
241 return {} |
241 if parser.process_urls(self.urls, raise_on_error): |
242 if parser.process_urls(self.urls, raise_on_error): |
242 self.warning("some error occurred, don't attempt to delete entities") |
243 self.warning("some error occurred, don't attempt to delete entities") |
243 else: |
244 else: |
291 def source_uris(self, cnx): |
292 def source_uris(self, cnx): |
292 sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s' |
293 sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s' |
293 return dict((self.decode_extid(uri), (eid, type)) |
294 return dict((self.decode_extid(uri), (eid, type)) |
294 for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall()) |
295 for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall()) |
295 |
296 |
|
297 def moved_uris(self, cnx): |
|
298 sql = 'SELECT extid FROM moved_entities' |
|
299 return set(self.decode_extid(uri) for uri, in cnx.system_sql(sql).fetchall()) |
|
300 |
296 def init_import_log(self, cnx, import_log_eid=None, **kwargs): |
301 def init_import_log(self, cnx, import_log_eid=None, **kwargs): |
297 if import_log_eid is None: |
302 if import_log_eid is None: |
298 import_log = cnx.create_entity('CWDataImport', cw_import_of=self, |
303 import_log = cnx.create_entity('CWDataImport', cw_import_of=self, |
299 start_timestamp=datetime.now(tz=utc), |
304 start_timestamp=datetime.now(tz=utc), |
300 **kwargs) |
305 **kwargs) |
307 |
312 |
308 |
313 |
309 class DataFeedParser(AppObject): |
314 class DataFeedParser(AppObject): |
310 __registry__ = 'parsers' |
315 __registry__ = 'parsers' |
311 |
316 |
312 def __init__(self, cnx, source, import_log=None, source_uris=None, **kwargs): |
317 def __init__(self, cnx, source, import_log=None, source_uris=None, moved_uris=None, **kwargs): |
313 super(DataFeedParser, self).__init__(cnx, **kwargs) |
318 super(DataFeedParser, self).__init__(cnx, **kwargs) |
314 self.source = source |
319 self.source = source |
315 self.import_log = import_log |
320 self.import_log = import_log |
316 if source_uris is None: |
321 if source_uris is None: |
317 source_uris = {} |
322 source_uris = {} |
318 self.source_uris = source_uris |
323 self.source_uris = source_uris |
|
324 if moved_uris is None: |
|
325 moved_uris = () |
|
326 self.moved_uris = moved_uris |
319 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
327 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
320 |
328 |
321 def normalize_url(self, url): |
329 def normalize_url(self, url): |
322 """Normalize an url by looking if there is a replacement for it in |
330 """Normalize an url by looking if there is a replacement for it in |
323 `cubicweb.sobjects.URL_MAPPING`. |
331 `cubicweb.sobjects.URL_MAPPING`. |