server/sources/datafeed.py
changeset 7446 6fba86efdd09
parent 7444 9bb8f89fd31c
child 7447 d5705c9bbe82
equal deleted inserted replaced
7445:5331ba22c0e0 7446:6fba86efdd09
   124     def fresh(self):
   124     def fresh(self):
   125         if self.latest_retrieval is None:
   125         if self.latest_retrieval is None:
   126             return False
   126             return False
   127         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   127         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   128 
   128 
       
   129     def update_latest_retrieval(self, session):
       
   130         self.latest_retrieval = datetime.utcnow()
       
   131         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   132                         {'x': self.eid, 'date': self.latest_retrieval})
       
   133 
   129     def pull_data(self, session, force=False, raise_on_error=False):
   134     def pull_data(self, session, force=False, raise_on_error=False):
   130         if not force and self.fresh():
   135         if not force and self.fresh():
   131             return {}
   136             return {}
   132         if self.config['delete-entities']:
   137         if self.config['delete-entities']:
   133             myuris = self.source_cwuris(session)
   138             myuris = self.source_cwuris(session)
   134         else:
   139         else:
   135             myuris = None
   140             myuris = None
   136         parser = self._get_parser(session, sourceuris=myuris)
   141         parser = self._get_parser(session, sourceuris=myuris)
       
   142         if self.process_urls(parser, self.urls, raise_on_error):
       
   143             self.warning("some error occured, don't attempt to delete entities")
       
   144         elif self.config['delete-entities'] and myuris:
       
   145             byetype = {}
       
   146             for eid, etype in myuris.values():
       
   147                 byetype.setdefault(etype, []).append(str(eid))
       
   148             self.error('delete %s entities %s', self.uri, byetype)
       
   149             for etype, eids in byetype.iteritems():
       
   150                 session.execute('DELETE %s X WHERE X eid IN (%s)'
       
   151                                 % (etype, ','.join(eids)))
       
   152         self.update_latest_retrieval(session)
       
   153         return parser.stats
       
   154 
       
   155     def process_urls(self, parser, urls, raise_on_error=False):
   137         error = False
   156         error = False
   138         self.info('pulling data for source %s', self.uri)
   157         for url in urls:
   139         for url in self.urls:
   158             self.info('pulling data from %s', url)
   140             try:
   159             try:
   141                 if parser.process(url, raise_on_error):
   160                 if parser.process(url, raise_on_error):
   142                     error = True
   161                     error = True
   143             except IOError, exc:
   162             except IOError, exc:
   144                 if raise_on_error:
   163                 if raise_on_error:
   145                     raise
   164                     raise
   146                 self.error('could not pull data while processing %s: %s',
   165                 self.error('could not pull data while processing %s: %s',
   147                            url, exc)
   166                            url, exc)
   148                 error = True
   167                 error = True
   149         if error:
   168         return error
   150             self.warning("some error occured, don't attempt to delete entities")
       
   151         elif self.config['delete-entities'] and myuris:
       
   152             byetype = {}
       
   153             for eid, etype in myuris.values():
       
   154                 byetype.setdefault(etype, []).append(str(eid))
       
   155             self.error('delete %s entities %s', self.uri, byetype)
       
   156             for etype, eids in byetype.iteritems():
       
   157                 session.execute('DELETE %s X WHERE X eid IN (%s)'
       
   158                                 % (etype, ','.join(eids)))
       
   159         self.latest_retrieval = datetime.utcnow()
       
   160         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   161                         {'x': self.eid, 'date': self.latest_retrieval})
       
   162         return parser.stats
       
   163 
   169 
   164     def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
   170     def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
   165         """called by the repository when an eid has been attributed for an
   171         """called by the repository when an eid has been attributed for an
   166         entity stored here but the entity has not been inserted in the system
   172         entity stored here but the entity has not been inserted in the system
   167         table yet.
   173         table yet.
   198 
   204 
   199 
   205 
   200 class DataFeedParser(AppObject):
   206 class DataFeedParser(AppObject):
   201     __registry__ = 'parsers'
   207     __registry__ = 'parsers'
   202 
   208 
   203     def __init__(self, session, source, sourceuris=None):
   209     def __init__(self, session, source, sourceuris=None, **kwargs):
   204         self._cw = session
   210         super(DataFeedParser, self).__init__(session, **kwargs)
   205         self.source = source
   211         self.source = source
   206         self.sourceuris = sourceuris
   212         self.sourceuris = sourceuris
   207         self.stats = {'created': set(),
   213         self.stats = {'created': set(),
   208                       'updated': set()}
   214                       'updated': set()}
   209 
   215