[datafeed] extract a generic DataFeedXMLParser from CWEntityXMLParser
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 13 May 2011 10:10:41 +0200
changeset 7378 86a1ae289f05
parent 7377 d8083b2ae4d6
child 7379 31adf834a8c6
[datafeed] extract a generic DataFeedXMLParser from CWEntityXMLParser
server/sources/datafeed.py
sobjects/parsers.py
--- a/server/sources/datafeed.py	Fri May 13 10:10:19 2011 +0200
+++ b/server/sources/datafeed.py	Fri May 13 10:10:41 2011 +0200
@@ -18,8 +18,14 @@
 """datafeed sources: copy data from an external data stream into the system
 database
 """
+
+import urllib2
+import StringIO
 from datetime import datetime, timedelta
 from base64 import b64decode
+from cookielib import CookieJar
+
+from lxml import etree
 
 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
 from cubicweb.server.sources import AbstractSource
@@ -219,7 +225,7 @@
             self.sourceuris.pop(str(uri), None)
         return self._cw.entity_from_eid(eid, etype)
 
-    def process(self, url):
+    def process(self, url, partialcommit=True):
         """main callback: process the url"""
         raise NotImplementedError
 
@@ -237,3 +243,58 @@
 
     def notify_updated(self, entity):
         return self.stats['updated'].add(entity.eid)
+
+
+class DataFeedXMLParser(DataFeedParser):
+
+    def process(self, url, partialcommit=True):
+        """IDataFeedParser main entry point"""
+        error = False
+        for args in self.parse(url):
+            print args
+            try:
+                self.process_item(*args)
+                if partialcommit:
+                    # commit+set_pool instead of commit(reset_pool=False) to let
+                    # other a chance to get our pool
+                    self._cw.commit()
+                    self._cw.set_pool()
+            except ValidationError, exc:
+                if partialcommit:
+                    self.source.error('Skipping %s because of validation error %s' % (args, exc))
+                    self._cw.rollback()
+                    self._cw.set_pool()
+                    error = True
+                else:
+                    raise
+        return error
+
+    def parse(self, url):
+        if url.startswith('http'):
+            from cubicweb.sobjects.parsers import HOST_MAPPING
+            for mappedurl in HOST_MAPPING:
+                if url.startswith(mappedurl):
+                    url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
+                    break
+            self.source.info('GET %s', url)
+            stream = _OPENER.open(url)
+        elif url.startswith('file://'):
+            stream = open(url[7:])
+        else:
+            stream = StringIO.StringIO(url)
+        return self.parse_etree(etree.parse(stream).getroot())
+
+    def parse_etree(self, document):
+        return [(document,)]
+
+    def process_item(self, *args):
+        raise NotImplementedError
+
+# use a cookie enabled opener to use session cookie if any
+_OPENER = urllib2.build_opener()
+try:
+    from logilab.common import urllib2ext
+    _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
+except ImportError: # python-kerberos not available
+    pass
+_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))
--- a/sobjects/parsers.py	Fri May 13 10:10:19 2011 +0200
+++ b/sobjects/parsers.py	Fri May 13 10:10:41 2011 +0200
@@ -31,14 +31,9 @@
 
 """
 
-import urllib2
-import StringIO
 import os.path as osp
-from cookielib import CookieJar
 from datetime import datetime, timedelta
 
-from lxml import etree
-
 from logilab.common.date import todate, totime
 from logilab.common.textutils import splitstrip, text_to_dict
 
@@ -72,15 +67,6 @@
     return time(seconds=int(ustr))
 DEFAULT_CONVERTERS['Interval'] = convert_interval
 
-# use a cookie enabled opener to use session cookie if any
-_OPENER = urllib2.build_opener()
-try:
-    from logilab.common import urllib2ext
-    _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
-except ImportError: # python-kerberos not available
-    pass
-_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))
-
 def extract_typed_attrs(eschema, stringdict, converters=DEFAULT_CONVERTERS):
     typeddict = {}
     for rschema in eschema.subject_relations():
@@ -138,7 +124,7 @@
         raise ValidationError(eid, {rn('options', 'subject'): msg})
 
 
-class CWEntityXMLParser(datafeed.DataFeedParser):
+class CWEntityXMLParser(datafeed.DataFeedXMLParser):
     """datafeed parser for the 'xml' entity view"""
     __regid__ = 'cw.entityxml'
 
@@ -147,6 +133,8 @@
         'link-or-create': _check_linkattr_option,
         'link': _check_linkattr_option,
         }
+    parse_etree = staticmethod(_parse_entity_etree)
+
 
     def __init__(self, *args, **kwargs):
         super(CWEntityXMLParser, self).__init__(*args, **kwargs)
@@ -208,42 +196,8 @@
 
     # import handling ##########################################################
 
-    def process(self, url, partialcommit=True):
-        """IDataFeedParser main entry point"""
-        # XXX suppression support according to source configuration. If set, get
-        # all cwuri of entities from this source, and compare with newly
-        # imported ones
-        error = False
-        for item, rels in self.parse(url):
-            cwuri = item['cwuri']
-            try:
-                self.process_item(item, rels)
-                if partialcommit:
-                    # commit+set_pool instead of commit(reset_pool=False) to let
-                    # other a chance to get our pool
-                    self._cw.commit()
-                    self._cw.set_pool()
-            except ValidationError, exc:
-                if partialcommit:
-                    self.source.error('Skipping %s because of validation error %s' % (cwuri, exc))
-                    self._cw.rollback()
-                    self._cw.set_pool()
-                    error = True
-                else:
-                    raise
-        return error
-
-    def parse(self, url):
-        if not url.startswith('http'):
-            stream = StringIO.StringIO(url)
-        else:
-            for mappedurl in HOST_MAPPING:
-                if url.startswith(mappedurl):
-                    url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
-                    break
-            self.source.info('GET %s', url)
-            stream = _OPENER.open(url)
-        return _parse_entity_etree(etree.parse(stream).getroot())
+    # XXX suppression support according to source configuration. If set, get all
+    # cwuri of entities from this source, and compare with newly imported ones
 
     def process_item(self, item, rels):
         entity = self.extid2entity(str(item.pop('cwuri')),  item.pop('cwtype'),