server/sources/datafeed.py
changeset 7995 9a9f35ef418c
parent 7950 99ae8c883ad3
child 8068 72210779ff6d
equal deleted inserted replaced
7994:af3fb709c061 7995:9a9f35ef418c
    25 from datetime import datetime, timedelta
    25 from datetime import datetime, timedelta
    26 from base64 import b64decode
    26 from base64 import b64decode
    27 from cookielib import CookieJar
    27 from cookielib import CookieJar
    28 
    28 
    29 from lxml import etree
    29 from lxml import etree
       
    30 from logilab.mtconverter import xml_escape
    30 
    31 
    31 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    32 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    32 from cubicweb.server.sources import AbstractSource
    33 from cubicweb.server.sources import AbstractSource
    33 from cubicweb.appobject import AppObject
    34 from cubicweb.appobject import AppObject
    34 
    35 
    69           'default': True,
    70           'default': True,
    70           'help': ('Should already imported entities not found anymore on the '
    71           'help': ('Should already imported entities not found anymore on the '
    71                    'external source be deleted?'),
    72                    'external source be deleted?'),
    72           'group': 'datafeed-source', 'level': 2,
    73           'group': 'datafeed-source', 'level': 2,
    73           }),
    74           }),
    74 
    75         ('logs-lifetime',
       
    76          {'type': 'time',
       
    77           'default': '10d',
       
    78           'help': ('Time before logs from datafeed imports are deleted.'),
       
    79           'group': 'datafeed-source', 'level': 2,
       
    80           }),
    75         )
    81         )
    76     def __init__(self, repo, source_config, eid=None):
    82     def __init__(self, repo, source_config, eid=None):
    77         AbstractSource.__init__(self, repo, source_config, eid)
    83         AbstractSource.__init__(self, repo, source_config, eid)
    78         self.update_config(None, self.check_conf_dict(eid, source_config,
    84         self.update_config(None, self.check_conf_dict(eid, source_config,
    79                                                       fail_if_unknown=False))
    85                                                       fail_if_unknown=False))
   186     def _pull_data(self, session, force=False, raise_on_error=False):
   192     def _pull_data(self, session, force=False, raise_on_error=False):
   187         if self.config['delete-entities']:
   193         if self.config['delete-entities']:
   188             myuris = self.source_cwuris(session)
   194             myuris = self.source_cwuris(session)
   189         else:
   195         else:
   190             myuris = None
   196             myuris = None
   191         parser = self._get_parser(session, sourceuris=myuris)
   197         importlog = self.init_import_log(session)
       
   198         parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
   192         if self.process_urls(parser, self.urls, raise_on_error):
   199         if self.process_urls(parser, self.urls, raise_on_error):
   193             self.warning("some error occured, don't attempt to delete entities")
   200             self.warning("some error occured, don't attempt to delete entities")
   194         elif self.config['delete-entities'] and myuris:
   201         elif self.config['delete-entities'] and myuris:
   195             byetype = {}
   202             byetype = {}
   196             for eid, etype in myuris.values():
   203             for eid, etype in myuris.values():
   198             self.error('delete %s entities %s', self.uri, byetype)
   205             self.error('delete %s entities %s', self.uri, byetype)
   199             for etype, eids in byetype.iteritems():
   206             for etype, eids in byetype.iteritems():
   200                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   207                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   201                                 % (etype, ','.join(eids)))
   208                                 % (etype, ','.join(eids)))
   202         self.update_latest_retrieval(session)
   209         self.update_latest_retrieval(session)
   203         return parser.stats
   210         stats = parser.stats
       
   211         if stats.get('created'):
       
   212             importlog.record_info('added %s entities' % len(stats['created']))
       
   213         if stats.get('updated'):
       
   214             importlog.record_info('updated %s entities' % len(stats['updated']))
       
   215         importlog.write_log(session, end_timestamp=self.latest_retrieval)
       
   216         return stats
   204 
   217 
   205     def process_urls(self, parser, urls, raise_on_error=False):
   218     def process_urls(self, parser, urls, raise_on_error=False):
   206         error = False
   219         error = False
   207         for url in urls:
   220         for url in urls:
   208             self.info('pulling data from %s', url)
   221             self.info('pulling data from %s', url)
   253                'WHERE entities.eid=cw_source_relation.eid_from '
   266                'WHERE entities.eid=cw_source_relation.eid_from '
   254                'AND cw_source_relation.eid_to=%s' % self.eid)
   267                'AND cw_source_relation.eid_to=%s' % self.eid)
   255         return dict((b64decode(uri), (eid, type))
   268         return dict((b64decode(uri), (eid, type))
   256                     for uri, eid, type in session.system_sql(sql))
   269                     for uri, eid, type in session.system_sql(sql))
   257 
   270 
       
   271     def init_import_log(self, session, **kwargs):
       
   272         dataimport = session.create_entity('CWDataImport', cw_import_of=self,
       
   273                                            start_timestamp=datetime.utcnow(),
       
   274                                            **kwargs)
       
   275         return dataimport
   258 
   276 
   259 class DataFeedParser(AppObject):
   277 class DataFeedParser(AppObject):
   260     __registry__ = 'parsers'
   278     __registry__ = 'parsers'
   261 
   279 
   262     def __init__(self, session, source, sourceuris=None, **kwargs):
   280     def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
   263         super(DataFeedParser, self).__init__(session, **kwargs)
   281         super(DataFeedParser, self).__init__(session, **kwargs)
   264         self.source = source
   282         self.source = source
   265         self.sourceuris = sourceuris
   283         self.sourceuris = sourceuris
       
   284         self.import_log = import_log
   266         self.stats = {'created': set(),
   285         self.stats = {'created': set(),
   267                       'updated': set()}
   286                       'updated': set()}
   268 
   287 
   269     def add_schema_config(self, schemacfg, checkonly=False):
   288     def add_schema_config(self, schemacfg, checkonly=False):
   270         """added CWSourceSchemaConfig, modify mapping accordingly"""
   289         """added CWSourceSchemaConfig, modify mapping accordingly"""