# HG changeset patch # User Sylvain Thénault # Date 1306487822 -7200 # Node ID 6fba86efdd09e7bb12f54c044bbdf6ba09d5bdd0 # Parent 5331ba22c0e043ff61195c7f5388007b551b6d04 [datafeed] extract some methods from pull_data to ease writing custom datafeed sources diff -r 5331ba22c0e0 -r 6fba86efdd09 server/sources/datafeed.py --- a/server/sources/datafeed.py Fri May 27 09:56:26 2011 +0200 +++ b/server/sources/datafeed.py Fri May 27 11:17:02 2011 +0200 @@ -126,6 +126,11 @@ return False return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval) + def update_latest_retrieval(self, session): + self.latest_retrieval = datetime.utcnow() + session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', + {'x': self.eid, 'date': self.latest_retrieval}) + def pull_data(self, session, force=False, raise_on_error=False): if not force and self.fresh(): return {} @@ -134,9 +139,23 @@ else: myuris = None parser = self._get_parser(session, sourceuris=myuris) + if self.process_urls(parser, self.urls, raise_on_error): + self.warning("some error occured, don't attempt to delete entities") + elif self.config['delete-entities'] and myuris: + byetype = {} + for eid, etype in myuris.values(): + byetype.setdefault(etype, []).append(str(eid)) + self.error('delete %s entities %s', self.uri, byetype) + for etype, eids in byetype.iteritems(): + session.execute('DELETE %s X WHERE X eid IN (%s)' + % (etype, ','.join(eids))) + self.update_latest_retrieval(session) + return parser.stats + + def process_urls(self, parser, urls, raise_on_error=False): error = False - self.info('pulling data for source %s', self.uri) - for url in self.urls: + for url in urls: + self.info('pulling data from %s', url) try: if parser.process(url, raise_on_error): error = True @@ -146,20 +165,7 @@ self.error('could not pull data while processing %s: %s', url, exc) error = True - if error: - self.warning("some error occured, don't attempt to delete entities") - elif self.config['delete-entities'] and myuris: - byetype = {} - for eid, etype in myuris.values(): - byetype.setdefault(etype, []).append(str(eid)) - self.error('delete %s entities %s', self.uri, byetype) - for etype, eids in byetype.iteritems(): - session.execute('DELETE %s X WHERE X eid IN (%s)' - % (etype, ','.join(eids))) - self.latest_retrieval = datetime.utcnow() - session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', - {'x': self.eid, 'date': self.latest_retrieval}) - return parser.stats + return error def before_entity_insertion(self, session, lid, etype, eid, sourceparams): """called by the repository when an eid has been attributed for an @@ -200,8 +206,8 @@ class DataFeedParser(AppObject): __registry__ = 'parsers' - def __init__(self, session, source, sourceuris=None): - self._cw = session + def __init__(self, session, source, sourceuris=None, **kwargs): + super(DataFeedParser, self).__init__(session, **kwargs) self.source = source self.sourceuris = sourceuris self.stats = {'created': set(),