server/sources/datafeed.py
changeset 8430 5bee87a14bb1
parent 8429 cad2d8e03b33
child 8434 39c5bb4dcc59
equal deleted inserted replaced
8429:cad2d8e03b33 8430:5bee87a14bb1
    26 from datetime import datetime, timedelta
    26 from datetime import datetime, timedelta
    27 from base64 import b64decode
    27 from base64 import b64decode
    28 from cookielib import CookieJar
    28 from cookielib import CookieJar
    29 
    29 
    30 from lxml import etree
    30 from lxml import etree
    31 from logilab.mtconverter import xml_escape
       
    32 
    31 
    33 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    32 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    34 from cubicweb.server.sources import AbstractSource
    33 from cubicweb.server.sources import AbstractSource
    35 from cubicweb.appobject import AppObject
    34 from cubicweb.appobject import AppObject
    36 
    35 
    66                    'it won\'t be considered'),
    65                    'it won\'t be considered'),
    67           'group': 'datafeed-source', 'level': 2,
    66           'group': 'datafeed-source', 'level': 2,
    68           }),
    67           }),
    69         ('delete-entities',
    68         ('delete-entities',
    70          {'type' : 'yn',
    69          {'type' : 'yn',
    71           'default': True,
    70           'default': False,
    72           'help': ('Should already imported entities not found anymore on the '
    71           'help': ('Should already imported entities not found anymore on the '
    73                    'external source be deleted?'),
    72                    'external source be deleted?'),
    74           'group': 'datafeed-source', 'level': 2,
    73           'group': 'datafeed-source', 'level': 2,
    75           }),
    74           }),
    76         ('logs-lifetime',
    75         ('logs-lifetime',
    78           'default': '10d',
    77           'default': '10d',
    79           'help': ('Time before logs from datafeed imports are deleted.'),
    78           'help': ('Time before logs from datafeed imports are deleted.'),
    80           'group': 'datafeed-source', 'level': 2,
    79           'group': 'datafeed-source', 'level': 2,
    81           }),
    80           }),
    82         )
    81         )
       
    82 
    83     def __init__(self, repo, source_config, eid=None):
    83     def __init__(self, repo, source_config, eid=None):
    84         AbstractSource.__init__(self, repo, source_config, eid)
    84         AbstractSource.__init__(self, repo, source_config, eid)
    85         self.update_config(None, self.check_conf_dict(eid, source_config,
    85         self.update_config(None, self.check_conf_dict(eid, source_config,
    86                                                       fail_if_unknown=False))
    86                                                       fail_if_unknown=False))
    87 
    87 
   190                 return self._pull_data(session, force, raise_on_error)
   190                 return self._pull_data(session, force, raise_on_error)
   191         finally:
   191         finally:
   192             self.release_synchronization_lock(session)
   192             self.release_synchronization_lock(session)
   193 
   193 
   194     def _pull_data(self, session, force=False, raise_on_error=False):
   194     def _pull_data(self, session, force=False, raise_on_error=False):
   195         if self.config['delete-entities']:
       
   196             myuris = self.source_cwuris(session)
       
   197         else:
       
   198             myuris = None
       
   199         importlog = self.init_import_log(session)
   195         importlog = self.init_import_log(session)
       
   196         myuris = self.source_cwuris(session)
   200         parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
   197         parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
   201         if self.process_urls(parser, self.urls, raise_on_error):
   198         if self.process_urls(parser, self.urls, raise_on_error):
   202             self.warning("some error occured, don't attempt to delete entities")
   199             self.warning("some error occured, don't attempt to delete entities")
   203         elif self.config['delete-entities'] and myuris:
   200         else:
   204             byetype = {}
   201             parser.handle_deletion(self.config, session, myuris)
   205             for extid, (eid, etype) in myuris.iteritems():
       
   206                 if parser.is_deleted(extid, etype, eid):
       
   207                     byetype.setdefault(etype, []).append(str(eid))
       
   208             for etype, eids in byetype.iteritems():
       
   209                 self.warning('delete %s %s entities', len(eids), etype)
       
   210                 session.execute('DELETE %s X WHERE X eid IN (%s)'
       
   211                                 % (etype, ','.join(eids)))
       
   212         self.update_latest_retrieval(session)
   202         self.update_latest_retrieval(session)
   213         stats = parser.stats
   203         stats = parser.stats
   214         if stats.get('created'):
   204         if stats.get('created'):
   215             importlog.record_info('added %s entities' % len(stats['created']))
   205             importlog.record_info('added %s entities' % len(stats['created']))
   216         if stats.get('updated'):
   206         if stats.get('updated'):
   374         is actually deleted. Always return True by default, put more sensible
   364         is actually deleted. Always return True by default, put more sensible
   375         stuff in sub-classes.
   365         stuff in sub-classes.
   376         """
   366         """
   377         return True
   367         return True
   378 
   368 
       
   369     def handle_deletion(self, config, session, myuris):
       
   370         if config['delete-entities'] and myuris:
       
   371             byetype = {}
       
   372             for extid, (eid, etype) in myuris.iteritems():
       
   373                 if self.is_deleted(extid, etype, eid):
       
   374                     byetype.setdefault(etype, []).append(str(eid))
       
   375             for etype, eids in byetype.iteritems():
       
   376                 self.warning('delete %s %s entities', len(eids), etype)
       
   377                 session.execute('DELETE %s X WHERE X eid IN (%s)'
       
   378                                 % (etype, ','.join(eids)))
       
   379 
   379     def update_if_necessary(self, entity, attrs):
   380     def update_if_necessary(self, entity, attrs):
   380         self.notify_updated(entity)
   381         self.notify_updated(entity)
   381         entity.complete(tuple(attrs))
   382         entity.complete(tuple(attrs))
   382         # check modification date and compare attribute values to only update
   383         # check modification date and compare attribute values to only update
   383         # what's actually needed
   384         # what's actually needed