server/sources/datafeed.py
changeset 7378 86a1ae289f05
parent 7351 ed66f236715d
child 7379 31adf834a8c6
--- a/server/sources/datafeed.py	Fri May 13 10:10:19 2011 +0200
+++ b/server/sources/datafeed.py	Fri May 13 10:10:41 2011 +0200
@@ -18,8 +18,14 @@
 """datafeed sources: copy data from an external data stream into the system
 database
 """
+
+import urllib2
+import StringIO
 from datetime import datetime, timedelta
 from base64 import b64decode
+from cookielib import CookieJar
+
+from lxml import etree
 
 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
 from cubicweb.server.sources import AbstractSource
@@ -219,7 +225,7 @@
             self.sourceuris.pop(str(uri), None)
         return self._cw.entity_from_eid(eid, etype)
 
-    def process(self, url):
+    def process(self, url, partialcommit=True):
         """main callback: process the url"""
         raise NotImplementedError
 
@@ -237,3 +243,58 @@
 
     def notify_updated(self, entity):
         return self.stats['updated'].add(entity.eid)
+
+
+class DataFeedXMLParser(DataFeedParser):
+
+    def process(self, url, partialcommit=True):
+        """IDataFeedParser main entry point"""
+        error = False
+        for args in self.parse(url):
+            print args
+            try:
+                self.process_item(*args)
+                if partialcommit:
+                    # commit+set_pool instead of commit(reset_pool=False) to let
+                    # other a chance to get our pool
+                    self._cw.commit()
+                    self._cw.set_pool()
+            except ValidationError, exc:
+                if partialcommit:
+                    self.source.error('Skipping %s because of validation error %s' % (args, exc))
+                    self._cw.rollback()
+                    self._cw.set_pool()
+                    error = True
+                else:
+                    raise
+        return error
+
+    def parse(self, url):
+        if url.startswith('http'):
+            from cubicweb.sobjects.parsers import HOST_MAPPING
+            for mappedurl in HOST_MAPPING:
+                if url.startswith(mappedurl):
+                    url = url.replace(mappedurl, HOST_MAPPING[mappedurl], 1)
+                    break
+            self.source.info('GET %s', url)
+            stream = _OPENER.open(url)
+        elif url.startswith('file://'):
+            stream = open(url[7:])
+        else:
+            stream = StringIO.StringIO(url)
+        return self.parse_etree(etree.parse(stream).getroot())
+
+    def parse_etree(self, document):
+        return [(document,)]
+
+    def process_item(self, *args):
+        raise NotImplementedError
+
+# use a cookie enabled opener to use session cookie if any
+_OPENER = urllib2.build_opener()
+try:
+    from logilab.common import urllib2ext
+    _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
+except ImportError: # python-kerberos not available
+    pass
+_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))