server/sources/datafeed.py
changeset 7378 86a1ae289f05
parent 7351 ed66f236715d
child 7379 31adf834a8c6
equal deleted inserted replaced
7377:d8083b2ae4d6 7378:86a1ae289f05
    16 # You should have received a copy of the GNU Lesser General Public License along
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """datafeed sources: copy data from an external data stream into the system
    18 """datafeed sources: copy data from an external data stream into the system
    19 database
    19 database
    20 """
    20 """
       
    21 
       
    22 import urllib2
       
    23 import StringIO
    21 from datetime import datetime, timedelta
    24 from datetime import datetime, timedelta
    22 from base64 import b64decode
    25 from base64 import b64decode
       
    26 from cookielib import CookieJar
       
    27 
       
    28 from lxml import etree
    23 
    29 
    24 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
    30 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
    25 from cubicweb.server.sources import AbstractSource
    31 from cubicweb.server.sources import AbstractSource
    26 from cubicweb.appobject import AppObject
    32 from cubicweb.appobject import AppObject
    27 
    33 
   217                                     sourceparams=sourceparams)
   223                                     sourceparams=sourceparams)
   218         if self.sourceuris is not None:
   224         if self.sourceuris is not None:
   219             self.sourceuris.pop(str(uri), None)
   225             self.sourceuris.pop(str(uri), None)
   220         return self._cw.entity_from_eid(eid, etype)
   226         return self._cw.entity_from_eid(eid, etype)
   221 
   227 
   222     def process(self, url):
   228     def process(self, url, partialcommit=True):
   223         """main callback: process the url"""
   229         """main callback: process the url"""
   224         raise NotImplementedError
   230         raise NotImplementedError
   225 
   231 
   226     def before_entity_copy(self, entity, sourceparams):
   232     def before_entity_copy(self, entity, sourceparams):
   227         raise NotImplementedError
   233         raise NotImplementedError
   235     def updated_during_pull(self, entity):
   241     def updated_during_pull(self, entity):
   236         return entity.eid in self.stats['updated']
   242         return entity.eid in self.stats['updated']
   237 
   243 
   238     def notify_updated(self, entity):
   244     def notify_updated(self, entity):
   239         return self.stats['updated'].add(entity.eid)
   245         return self.stats['updated'].add(entity.eid)
       
   246 
       
   247 
       
   248 class DataFeedXMLParser(DataFeedParser):
       
   249 
       
   250     def process(self, url, partialcommit=True):
       
   251         """IDataFeedParser main entry point"""
       
   252         error = False
       
   253         for args in self.parse(url):
       
   254             print args
       
   255             try:
       
   256                 self.process_item(*args)
       
   257                 if partialcommit:
       
   258                     # commit+set_pool instead of commit(reset_pool=False) to let
       
   259                     # other a chance to get our pool
       
   260                     self._cw.commit()
       
   261                     self._cw.set_pool()
       
   262             except ValidationError, exc:
       
   263                 if partialcommit:
       
   264                     self.source.error('Skipping %s because of validation error %s' % (args, exc))
       
   265                     self._cw.rollback()
       
   266                     self._cw.set_pool()
       
   267                     error = True
       
   268                 else:
       
   269                     raise
       
   270         return error
       
   271 
       
   272     def parse(self, url):
       
   273         if url.startswith('http'):
       
   274             from cubicweb.sobjects.parsers import HOST_MAPPING
       
   275             for mappedurl in HOST_MAPPING:
       
   276                 if url.startswith(mappedurl):
       
   277                     url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
       
   278                     break
       
   279             self.source.info('GET %s', url)
       
   280             stream = _OPENER.open(url)
       
   281         elif url.startswith('file://'):
       
   282             stream = open(url[7:])
       
   283         else:
       
   284             stream = StringIO.StringIO(url)
       
   285         return self.parse_etree(etree.parse(stream).getroot())
       
   286 
       
   287     def parse_etree(self, document):
       
   288         return [(document,)]
       
   289 
       
   290     def process_item(self, *args):
       
   291         raise NotImplementedError
       
   292 
       
   293 # use a cookie enabled opener to use session cookie if any
       
   294 _OPENER = urllib2.build_opener()
       
   295 try:
       
   296     from logilab.common import urllib2ext
       
   297     _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
       
   298 except ImportError: # python-kerberos not available
       
   299     pass
       
   300 _OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))