--- a/server/sources/datafeed.py Fri May 13 10:10:19 2011 +0200
+++ b/server/sources/datafeed.py Fri May 13 10:10:41 2011 +0200
@@ -18,8 +18,14 @@
"""datafeed sources: copy data from an external data stream into the system
database
"""
+
+import urllib2
+import StringIO
from datetime import datetime, timedelta
from base64 import b64decode
+from cookielib import CookieJar
+
+from lxml import etree
from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
from cubicweb.server.sources import AbstractSource
@@ -219,7 +225,7 @@
self.sourceuris.pop(str(uri), None)
return self._cw.entity_from_eid(eid, etype)
- def process(self, url):
+ def process(self, url, partialcommit=True):
"""main callback: process the url"""
raise NotImplementedError
@@ -237,3 +243,58 @@
def notify_updated(self, entity):
return self.stats['updated'].add(entity.eid)
+
+
+class DataFeedXMLParser(DataFeedParser):
+
+ def process(self, url, partialcommit=True):
+ """IDataFeedParser main entry point"""
+ error = False
+ for args in self.parse(url):
+ print args
+ try:
+ self.process_item(*args)
+ if partialcommit:
+ # commit+set_pool instead of commit(reset_pool=False) to let
+ # other a chance to get our pool
+ self._cw.commit()
+ self._cw.set_pool()
+ except ValidationError, exc:
+ if partialcommit:
+ self.source.error('Skipping %s because of validation error %s' % (args, exc))
+ self._cw.rollback()
+ self._cw.set_pool()
+ error = True
+ else:
+ raise
+ return error
+
+ def parse(self, url):
+ if url.startswith('http'):
+ from cubicweb.sobjects.parsers import HOST_MAPPING
+ for mappedurl in HOST_MAPPING:
+ if url.startswith(mappedurl):
+ url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
+ break
+ self.source.info('GET %s', url)
+ stream = _OPENER.open(url)
+ elif url.startswith('file://'):
+ stream = open(url[7:])
+ else:
+ stream = StringIO.StringIO(url)
+ return self.parse_etree(etree.parse(stream).getroot())
+
+ def parse_etree(self, document):
+ return [(document,)]
+
+ def process_item(self, *args):
+ raise NotImplementedError
+
+# use a cookie enabled opener to use session cookie if any
+_OPENER = urllib2.build_opener()
+try:
+ from logilab.common import urllib2ext
+ _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
+except ImportError: # python-kerberos not available
+ pass
+_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))
--- a/sobjects/parsers.py Fri May 13 10:10:19 2011 +0200
+++ b/sobjects/parsers.py Fri May 13 10:10:41 2011 +0200
@@ -31,14 +31,9 @@
"""
-import urllib2
-import StringIO
import os.path as osp
-from cookielib import CookieJar
from datetime import datetime, timedelta
-from lxml import etree
-
from logilab.common.date import todate, totime
from logilab.common.textutils import splitstrip, text_to_dict
@@ -72,15 +67,6 @@
return time(seconds=int(ustr))
DEFAULT_CONVERTERS['Interval'] = convert_interval
-# use a cookie enabled opener to use session cookie if any
-_OPENER = urllib2.build_opener()
-try:
- from logilab.common import urllib2ext
- _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
-except ImportError: # python-kerberos not available
- pass
-_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))
-
def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS):
typeddict = {}
for rschema in eschema.subject_relations():
@@ -138,7 +124,7 @@
raise ValidationError(eid, {rn('options', 'subject'): msg})
-class CWEntityXMLParser(datafeed.DataFeedParser):
+class CWEntityXMLParser(datafeed.DataFeedXMLParser):
"""datafeed parser for the 'xml' entity view"""
__regid__ = 'cw.entityxml'
@@ -147,6 +133,8 @@
'link-or-create': _check_linkattr_option,
'link': _check_linkattr_option,
}
+ parse_etree = staticmethod(_parse_entity_etree)
+
def __init__(self, *args, **kwargs):
super(CWEntityXMLParser, self).__init__(*args, **kwargs)
@@ -208,42 +196,8 @@
# import handling ##########################################################
- def process(self, url, partialcommit=True):
- """IDataFeedParser main entry point"""
- # XXX suppression support according to source configuration. If set, get
- # all cwuri of entities from this source, and compare with newly
- # imported ones
- error = False
- for item, rels in self.parse(url):
- cwuri = item['cwuri']
- try:
- self.process_item(item, rels)
- if partialcommit:
- # commit+set_pool instead of commit(reset_pool=False) to let
- # other a chance to get our pool
- self._cw.commit()
- self._cw.set_pool()
- except ValidationError, exc:
- if partialcommit:
- self.source.error('Skipping %s because of validation error %s' % (cwuri, exc))
- self._cw.rollback()
- self._cw.set_pool()
- error = True
- else:
- raise
- return error
-
- def parse(self, url):
- if not url.startswith('http'):
- stream = StringIO.StringIO(url)
- else:
- for mappedurl in HOST_MAPPING:
- if url.startswith(mappedurl):
- url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
- break
- self.source.info('GET %s', url)
- stream = _OPENER.open(url)
- return _parse_entity_etree(etree.parse(stream).getroot())
+ # XXX suppression support according to source configuration. If set, get all
+ # cwuri of entities from this source, and compare with newly imported ones
def process_item(self, item, rels):
entity = self.extid2entity(str(item.pop('cwuri')), item.pop('cwtype'),