diff -r d8083b2ae4d6 -r 86a1ae289f05 server/sources/datafeed.py --- a/server/sources/datafeed.py Fri May 13 10:10:19 2011 +0200 +++ b/server/sources/datafeed.py Fri May 13 10:10:41 2011 +0200 @@ -18,8 +18,14 @@ """datafeed sources: copy data from an external data stream into the system database """ + +import urllib2 +import StringIO from datetime import datetime, timedelta from base64 import b64decode +from cookielib import CookieJar + +from lxml import etree from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError from cubicweb.server.sources import AbstractSource @@ -219,7 +225,7 @@ self.sourceuris.pop(str(uri), None) return self._cw.entity_from_eid(eid, etype) - def process(self, url): + def process(self, url, partialcommit=True): """main callback: process the url""" raise NotImplementedError @@ -237,3 +243,58 @@ def notify_updated(self, entity): return self.stats['updated'].add(entity.eid) + + +class DataFeedXMLParser(DataFeedParser): + + def process(self, url, partialcommit=True): + """IDataFeedParser main entry point""" + error = False + for args in self.parse(url): + print args + try: + self.process_item(*args) + if partialcommit: + # commit+set_pool instead of commit(reset_pool=False) to let + # other a chance to get our pool + self._cw.commit() + self._cw.set_pool() + except ValidationError, exc: + if partialcommit: + self.source.error('Skipping %s because of validation error %s' % (args, exc)) + self._cw.rollback() + self._cw.set_pool() + error = True + else: + raise + return error + + def parse(self, url): + if url.startswith('http'): + from cubicweb.sobjects.parsers import HOST_MAPPING + for mappedurl in HOST_MAPPING: + if url.startswith(mappedurl): + url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1) + break + self.source.info('GET %s', url) + stream = _OPENER.open(url) + elif url.startswith('file://'): + stream = open(url[7:]) + else: + stream = StringIO.StringIO(url) + return self.parse_etree(etree.parse(stream).getroot()) + + def parse_etree(self, document): + return [(document,)] + + def process_item(self, *args): + raise NotImplementedError + +# use a cookie enabled opener to use session cookie if any +_OPENER = urllib2.build_opener() +try: + from logilab.common import urllib2ext + _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler()) +except ImportError: # python-kerberos not available + pass +_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))