server/sources/datafeed.py
changeset 8187 981f6e487788
parent 8069 4341fb713b14
child 8188 1867e252e487
equal deleted inserted replaced
8186:341c57b39dc9 8187:981f6e487788
     1 # copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2010-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
    20 """
    20 """
    21 from __future__ import with_statement
    21 from __future__ import with_statement
    22 
    22 
    23 import urllib2
    23 import urllib2
    24 import StringIO
    24 import StringIO
       
    25 from os.path import exists
    25 from datetime import datetime, timedelta
    26 from datetime import datetime, timedelta
    26 from base64 import b64decode
    27 from base64 import b64decode
    27 from cookielib import CookieJar
    28 from cookielib import CookieJar
    28 
    29 
    29 from lxml import etree
    30 from lxml import etree
   197         importlog = self.init_import_log(session)
   198         importlog = self.init_import_log(session)
   198         parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
   199         parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
   199         if self.process_urls(parser, self.urls, raise_on_error):
   200         if self.process_urls(parser, self.urls, raise_on_error):
   200             self.warning("some error occured, don't attempt to delete entities")
   201             self.warning("some error occured, don't attempt to delete entities")
   201         elif self.config['delete-entities'] and myuris:
   202         elif self.config['delete-entities'] and myuris:
   202             byetype = {}
   203             for extid, (eid, etype) in myuris.iteritems():
   203             for eid, etype in myuris.values():
   204                 if parser.is_deleted(extid, etype, eid):
   204                 byetype.setdefault(etype, []).append(str(eid))
   205                     byetype.setdefault(etype, []).append(str(eid))
   205             self.error('delete %s entities %s', self.uri, byetype)
       
   206             for etype, eids in byetype.iteritems():
   206             for etype, eids in byetype.iteritems():
       
   207                 self.warning('delete %s %s entities', len(eids), etype)
   207                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   208                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   208                                 % (etype, ','.join(eids)))
   209                                 % (etype, ','.join(eids)))
   209         self.update_latest_retrieval(session)
   210         self.update_latest_retrieval(session)
   210         stats = parser.stats
   211         stats = parser.stats
   211         if stats.get('created'):
   212         if stats.get('created'):
   274                                            start_timestamp=datetime.utcnow(),
   275                                            start_timestamp=datetime.utcnow(),
   275                                            **kwargs)
   276                                            **kwargs)
   276         dataimport.init()
   277         dataimport.init()
   277         return dataimport
   278         return dataimport
   278 
   279 
       
   280 
   279 class DataFeedParser(AppObject):
   281 class DataFeedParser(AppObject):
   280     __registry__ = 'parsers'
   282     __registry__ = 'parsers'
   281 
   283 
   282     def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
   284     def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
   283         super(DataFeedParser, self).__init__(session, **kwargs)
   285         super(DataFeedParser, self).__init__(session, **kwargs)
   284         self.source = source
   286         self.source = source
   285         self.sourceuris = sourceuris
   287         self.sourceuris = sourceuris
   286         self.import_log = import_log
   288         self.import_log = import_log
   287         self.stats = {'created': set(),
   289         self.stats = {'created': set(),
   288                       'updated': set()}
   290                       'updated': set()}
       
   291 
       
   292     def normalize_url(self, url):
       
   293         from cubicweb.sobjects.parsers import URL_MAPPING
       
   294         for mappedurl in URL_MAPPING:
       
   295             if url.startswith(mappedurl):
       
   296                 return url.replace(mappedurl, URL_MAPPING[mappedurl], 1)
       
   297         return url
   289 
   298 
   290     def add_schema_config(self, schemacfg, checkonly=False):
   299     def add_schema_config(self, schemacfg, checkonly=False):
   291         """added CWSourceSchemaConfig, modify mapping accordingly"""
   300         """added CWSourceSchemaConfig, modify mapping accordingly"""
   292         msg = schemacfg._cw._("this parser doesn't use a mapping")
   301         msg = schemacfg._cw._("this parser doesn't use a mapping")
   293         raise ValidationError(schemacfg.eid, {None: msg})
   302         raise ValidationError(schemacfg.eid, {None: msg})
   356         return entity.eid in self.stats['updated']
   365         return entity.eid in self.stats['updated']
   357 
   366 
   358     def notify_updated(self, entity):
   367     def notify_updated(self, entity):
   359         return self.stats['updated'].add(entity.eid)
   368         return self.stats['updated'].add(entity.eid)
   360 
   369 
       
   370     def is_deleted(self, extid, etype, eid):
       
   371         """return True if the entity of given external id, entity type and eid
       
   372         is actually deleted. Always return True by default, put more sensible
       
   373         stuff in sub-classes.
       
   374         """
       
   375         return True
   361 
   376 
   362 class DataFeedXMLParser(DataFeedParser):
   377 class DataFeedXMLParser(DataFeedParser):
   363 
   378 
   364     def process(self, url, raise_on_error=False, partialcommit=True):
   379     def process(self, url, raise_on_error=False, partialcommit=True):
   365         """IDataFeedParser main entry point"""
   380         """IDataFeedParser main entry point"""
   391                     raise
   406                     raise
   392         return error
   407         return error
   393 
   408 
   394     def parse(self, url):
   409     def parse(self, url):
   395         if url.startswith('http'):
   410         if url.startswith('http'):
   396             from cubicweb.sobjects.parsers import URL_MAPPING
   411             url = self.normalize_url(url)
   397             for mappedurl in URL_MAPPING:
       
   398                 if url.startswith(mappedurl):
       
   399                     url = url.replace(mappedurl, URL_MAPPING[mappedurl], 1)
       
   400                     break
       
   401             self.source.info('GET %s', url)
   412             self.source.info('GET %s', url)
   402             stream = _OPENER.open(url)
   413             stream = _OPENER.open(url)
   403         elif url.startswith('file://'):
   414         elif url.startswith('file://'):
   404             stream = open(url[7:])
   415             stream = open(url[7:])
   405         else:
   416         else:
   409     def parse_etree(self, document):
   420     def parse_etree(self, document):
   410         return [(document,)]
   421         return [(document,)]
   411 
   422 
   412     def process_item(self, *args):
   423     def process_item(self, *args):
   413         raise NotImplementedError
   424         raise NotImplementedError
       
   425 
       
   426     def is_deleted(self, extid, etype, eid):
       
   427         if extid.startswith('http'):
       
   428             try:
       
   429                 _OPENER.open(self.normalize_url(extid)) # XXX HTTP HEAD request
       
   430             except urllib2.HTTPError, ex:
       
   431                 if ex.code == 404:
       
   432                     return True
       
   433         elif extid.startswith('file://'):
       
   434             return exists(extid[7:])
       
   435         return False
   414 
   436 
   415 # use a cookie enabled opener to use session cookie if any
   437 # use a cookie enabled opener to use session cookie if any
   416 _OPENER = urllib2.build_opener()
   438 _OPENER = urllib2.build_opener()
   417 try:
   439 try:
   418     from logilab.common import urllib2ext
   440     from logilab.common import urllib2ext