[datafeed] extract some methods from pull_data to ease writing custom datafeed sources
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 27 May 2011 11:17:02 +0200
changeset 7446 6fba86efdd09
parent 7445 5331ba22c0e0
child 7447 d5705c9bbe82
[datafeed] extract some methods from pull_data to ease writing custom datafeed sources
server/sources/datafeed.py
--- a/server/sources/datafeed.py	Fri May 27 09:56:26 2011 +0200
+++ b/server/sources/datafeed.py	Fri May 27 11:17:02 2011 +0200
@@ -126,6 +126,11 @@
             return False
         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
 
+    def update_latest_retrieval(self, session):
+        self.latest_retrieval = datetime.utcnow()
+        session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+                        {'x': self.eid, 'date': self.latest_retrieval})
+
     def pull_data(self, session, force=False, raise_on_error=False):
         if not force and self.fresh():
             return {}
@@ -134,9 +139,23 @@
         else:
             myuris = None
         parser = self._get_parser(session, sourceuris=myuris)
+        if self.process_urls(parser, self.urls, raise_on_error):
+            self.warning("some error occured, don't attempt to delete entities")
+        elif self.config['delete-entities'] and myuris:
+            byetype = {}
+            for eid, etype in myuris.values():
+                byetype.setdefault(etype, []).append(str(eid))
+            self.error('delete %s entities %s', self.uri, byetype)
+            for etype, eids in byetype.iteritems():
+                session.execute('DELETE %s X WHERE X eid IN (%s)'
+                                % (etype, ','.join(eids)))
+        self.update_latest_retrieval(session)
+        return parser.stats
+
+    def process_urls(self, parser, urls, raise_on_error=False):
         error = False
-        self.info('pulling data for source %s', self.uri)
-        for url in self.urls:
+        for url in urls:
+            self.info('pulling data from %s', url)
             try:
                 if parser.process(url, raise_on_error):
                     error = True
@@ -146,20 +165,7 @@
                 self.error('could not pull data while processing %s: %s',
                            url, exc)
                 error = True
-        if error:
-            self.warning("some error occured, don't attempt to delete entities")
-        elif self.config['delete-entities'] and myuris:
-            byetype = {}
-            for eid, etype in myuris.values():
-                byetype.setdefault(etype, []).append(str(eid))
-            self.error('delete %s entities %s', self.uri, byetype)
-            for etype, eids in byetype.iteritems():
-                session.execute('DELETE %s X WHERE X eid IN (%s)'
-                                % (etype, ','.join(eids)))
-        self.latest_retrieval = datetime.utcnow()
-        session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
-                        {'x': self.eid, 'date': self.latest_retrieval})
-        return parser.stats
+        return error
 
     def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
         """called by the repository when an eid has been attributed for an
@@ -200,8 +206,8 @@
 class DataFeedParser(AppObject):
     __registry__ = 'parsers'
 
-    def __init__(self, session, source, sourceuris=None):
-        self._cw = session
+    def __init__(self, session, source, sourceuris=None, **kwargs):
+        super(DataFeedParser, self).__init__(session, **kwargs)
         self.source = source
         self.sourceuris = sourceuris
         self.stats = {'created': set(),