228 cnx.repo.threaded_task(sync) |
229 cnx.repo.threaded_task(sync) |
229 return {'import_log_eid': import_log.eid} |
230 return {'import_log_eid': import_log.eid} |
230 |
231 |
231 def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): |
232 def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): |
232 importlog = self.init_import_log(cnx, import_log_eid) |
233 importlog = self.init_import_log(cnx, import_log_eid) |
233 source_uris = self.source_uris(cnx) |
234 try: |
234 try: |
235 parser = self._get_parser(cnx, import_log=importlog) |
235 parser = self._get_parser(cnx, import_log=importlog, |
|
236 source_uris=source_uris) |
|
237 except ObjectNotFound: |
236 except ObjectNotFound: |
238 msg = 'failed to load parser for %s' |
237 msg = 'failed to load parser for %s' |
239 importlog.record_error(msg % ('source "%s"' % self.uri)) |
238 importlog.record_error(msg % ('source "%s"' % self.uri)) |
240 self.error(msg, self) |
239 self.error(msg, self) |
241 stats = {} |
240 stats = {} |
242 else: |
241 else: |
243 if parser.process_urls(self.urls, raise_on_error): |
242 if parser.process_urls(self.urls, raise_on_error): |
244 self.warning("some error occurred, don't attempt to delete entities") |
243 self.warning("some error occurred, don't attempt to delete entities") |
245 else: |
|
246 parser.handle_deletion(self.config, cnx, source_uris) |
|
247 stats = parser.stats |
244 stats = parser.stats |
248 self.update_latest_retrieval(cnx) |
245 self.update_latest_retrieval(cnx) |
249 if stats.get('created'): |
246 if stats.get('created'): |
250 importlog.record_info('added %s entities' % len(stats['created'])) |
247 importlog.record_info('added %s entities' % len(stats['created'])) |
251 if stats.get('updated'): |
248 if stats.get('updated'): |
252 importlog.record_info('updated %s entities' % len(stats['updated'])) |
249 importlog.record_info('updated %s entities' % len(stats['updated'])) |
253 importlog.write_log(cnx, end_timestamp=self.latest_retrieval) |
250 importlog.write_log(cnx, end_timestamp=self.latest_retrieval) |
254 cnx.commit() |
251 cnx.commit() |
255 return stats |
252 return stats |
256 |
|
257 def source_uris(self, cnx): |
|
258 sql = 'SELECT extid, eid, type FROM entities WHERE asource=%(source)s' |
|
259 return dict((self.decode_extid(uri), (eid, type)) |
|
260 for uri, eid, type in cnx.system_sql(sql, {'source': self.uri}).fetchall()) |
|
261 |
253 |
262 def init_import_log(self, cnx, import_log_eid=None, **kwargs): |
254 def init_import_log(self, cnx, import_log_eid=None, **kwargs): |
263 if import_log_eid is None: |
255 if import_log_eid is None: |
264 import_log = cnx.create_entity('CWDataImport', cw_import_of=self, |
256 import_log = cnx.create_entity('CWDataImport', cw_import_of=self, |
265 start_timestamp=datetime.now(tz=utc), |
257 start_timestamp=datetime.now(tz=utc), |
273 |
265 |
274 |
266 |
275 class DataFeedParser(AppObject): |
267 class DataFeedParser(AppObject): |
276 __registry__ = 'parsers' |
268 __registry__ = 'parsers' |
277 |
269 |
278 def __init__(self, cnx, source, import_log=None, source_uris=None): |
270 def __init__(self, cnx, source, import_log=None): |
279 super(DataFeedParser, self).__init__(cnx) |
271 super(DataFeedParser, self).__init__(cnx) |
280 self.source = source |
272 self.source = source |
281 self.import_log = import_log |
273 self.import_log = import_log |
282 if source_uris is None: |
|
283 source_uris = {} |
|
284 self.source_uris = source_uris |
|
285 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
274 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
286 |
275 |
287 def normalize_url(self, url): |
276 def normalize_url(self, url): |
288 """Normalize an url by looking if there is a replacement for it in |
277 """Normalize an url by looking if there is a replacement for it in |
289 `cubicweb.sobjects.URL_MAPPING`. |
278 `cubicweb.sobjects.URL_MAPPING`. |
395 is actually deleted. Always return True by default, put more sensible |
384 is actually deleted. Always return True by default, put more sensible |
396 stuff in sub-classes. |
385 stuff in sub-classes. |
397 """ |
386 """ |
398 return True |
387 return True |
399 |
388 |
400 def handle_deletion(self, config, cnx, source_uris): |
|
401 if config['delete-entities'] and source_uris: |
|
402 byetype = {} |
|
403 for extid, (eid, etype) in source_uris.items(): |
|
404 if self.is_deleted(extid, etype, eid): |
|
405 byetype.setdefault(etype, []).append(str(eid)) |
|
406 for etype, eids in byetype.items(): |
|
407 self.warning('delete %s %s entities', len(eids), etype) |
|
408 cnx.execute('DELETE %s X WHERE X eid IN (%s)' |
|
409 % (etype, ','.join(eids))) |
|
410 cnx.commit() |
|
411 |
|
412 def update_if_necessary(self, entity, attrs): |
389 def update_if_necessary(self, entity, attrs): |
413 entity.complete(tuple(attrs)) |
390 entity.complete(tuple(attrs)) |
414 # check modification date and compare attribute values to only update |
391 # check modification date and compare attribute values to only update |
415 # what's actually needed |
392 # what's actually needed |
416 self.notify_checked(entity) |
393 self.notify_checked(entity) |