# HG changeset patch # User Sylvain Thénault # Date 1305274241 -7200 # Node ID 86a1ae289f05d685daab69004b1e2e5e43e3db87 # Parent d8083b2ae4d6f3d011087d846f3eae4ac4038d49 [datafeed] extract a generic DataFeedXMLParser from CWEntityXMLParser diff -r d8083b2ae4d6 -r 86a1ae289f05 server/sources/datafeed.py --- a/server/sources/datafeed.py Fri May 13 10:10:19 2011 +0200 +++ b/server/sources/datafeed.py Fri May 13 10:10:41 2011 +0200 @@ -18,8 +18,14 @@ """datafeed sources: copy data from an external data stream into the system database """ + +import urllib2 +import StringIO from datetime import datetime, timedelta from base64 import b64decode +from cookielib import CookieJar + +from lxml import etree from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError from cubicweb.server.sources import AbstractSource @@ -219,7 +225,7 @@ self.sourceuris.pop(str(uri), None) return self._cw.entity_from_eid(eid, etype) - def process(self, url): + def process(self, url, partialcommit=True): """main callback: process the url""" raise NotImplementedError @@ -237,3 +243,58 @@ def notify_updated(self, entity): return self.stats['updated'].add(entity.eid) + + +class DataFeedXMLParser(DataFeedParser): + + def process(self, url, partialcommit=True): + """IDataFeedParser main entry point""" + error = False + for args in self.parse(url): + print args + try: + self.process_item(*args) + if partialcommit: + # commit+set_pool instead of commit(reset_pool=False) to let + # other a chance to get our pool + self._cw.commit() + self._cw.set_pool() + except ValidationError, exc: + if partialcommit: + self.source.error('Skipping %s because of validation error %s' % (args, exc)) + self._cw.rollback() + self._cw.set_pool() + error = True + else: + raise + return error + + def parse(self, url): + if url.startswith('http'): + from cubicweb.sobjects.parsers import HOST_MAPPING + for mappedurl in HOST_MAPPING: + if url.startswith(mappedurl): + url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1) + break + self.source.info('GET %s', url) + stream = _OPENER.open(url) + elif url.startswith('file://'): + stream = open(url[7:]) + else: + stream = StringIO.StringIO(url) + return self.parse_etree(etree.parse(stream).getroot()) + + def parse_etree(self, document): + return [(document,)] + + def process_item(self, *args): + raise NotImplementedError + +# use a cookie enabled opener to use session cookie if any +_OPENER = urllib2.build_opener() +try: + from logilab.common import urllib2ext + _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler()) +except ImportError: # python-kerberos not available + pass +_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar())) diff -r d8083b2ae4d6 -r 86a1ae289f05 sobjects/parsers.py --- a/sobjects/parsers.py Fri May 13 10:10:19 2011 +0200 +++ b/sobjects/parsers.py Fri May 13 10:10:41 2011 +0200 @@ -31,14 +31,9 @@ """ -import urllib2 -import StringIO import os.path as osp -from cookielib import CookieJar from datetime import datetime, timedelta -from lxml import etree - from logilab.common.date import todate, totime from logilab.common.textutils import splitstrip, text_to_dict @@ -72,15 +67,6 @@ return time(seconds=int(ustr)) DEFAULT_CONVERTERS['Interval'] = convert_interval -# use a cookie enabled opener to use session cookie if any -_OPENER = urllib2.build_opener() -try: - from logilab.common import urllib2ext - _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler()) -except ImportError: # python-kerberos not available - pass -_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar())) - def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS): typeddict = {} for rschema in eschema.subject_relations(): @@ -138,7 +124,7 @@ raise ValidationError(eid, {rn('options', 'subject'): msg}) -class CWEntityXMLParser(datafeed.DataFeedParser): +class CWEntityXMLParser(datafeed.DataFeedXMLParser): """datafeed parser for the 'xml' entity view""" __regid__ = 'cw.entityxml' @@ -147,6 +133,8 @@ 'link-or-create': _check_linkattr_option, 'link': _check_linkattr_option, } + parse_etree = staticmethod(_parse_entity_etree) + def __init__(self, *args, **kwargs): super(CWEntityXMLParser, self).__init__(*args, **kwargs) @@ -208,42 +196,8 @@ # import handling ########################################################## - def process(self, url, partialcommit=True): - """IDataFeedParser main entry point""" - # XXX suppression support according to source configuration. If set, get - # all cwuri of entities from this source, and compare with newly - # imported ones - error = False - for item, rels in self.parse(url): - cwuri = item['cwuri'] - try: - self.process_item(item, rels) - if partialcommit: - # commit+set_pool instead of commit(reset_pool=False) to let - # other a chance to get our pool - self._cw.commit() - self._cw.set_pool() - except ValidationError, exc: - if partialcommit: - self.source.error('Skipping %s because of validation error %s' % (cwuri, exc)) - self._cw.rollback() - self._cw.set_pool() - error = True - else: - raise - return error - - def parse(self, url): - if not url.startswith('http'): - stream = StringIO.StringIO(url) - else: - for mappedurl in HOST_MAPPING: - if url.startswith(mappedurl): - url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1) - break - self.source.info('GET %s', url) - stream = _OPENER.open(url) - return _parse_entity_etree(etree.parse(stream).getroot()) + # XXX suppression support according to source configuration. If set, get all + # cwuri of entities from this source, and compare with newly imported ones def process_item(self, item, rels): entity = self.extid2entity(str(item.pop('cwuri')), item.pop('cwtype'),