server/sources/datafeed.py
changeset 9746 81b56897a377
parent 9665 887ad08e3a61
child 9822 4a118bfd6ab4
child 9860 e24bf60428d3
equal deleted inserted replaced
9745:c013d5d76f66 9746:81b56897a377
     1 # copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
   141     def fresh(self):
   141     def fresh(self):
   142         if self.latest_retrieval is None:
   142         if self.latest_retrieval is None:
   143             return False
   143             return False
   144         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   144         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   145 
   145 
   146     def update_latest_retrieval(self, session):
   146     def update_latest_retrieval(self, cnx):
   147         self.latest_retrieval = datetime.utcnow()
   147         self.latest_retrieval = datetime.utcnow()
   148         session.set_cnxset()
   148         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   149         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   150                         {'x': self.eid, 'date': self.latest_retrieval})
   149                         {'x': self.eid, 'date': self.latest_retrieval})
   151         session.commit()
   150         cnx.commit()
   152 
   151 
   153     def acquire_synchronization_lock(self, session):
   152     def acquire_synchronization_lock(self, cnx):
   154         # XXX race condition until WHERE of SET queries is executed using
   153         # XXX race condition until WHERE of SET queries is executed using
   155         # 'SELECT FOR UPDATE'
   154         # 'SELECT FOR UPDATE'
   156         now = datetime.utcnow()
   155         now = datetime.utcnow()
   157         session.set_cnxset()
   156         if not cnx.execute(
   158         if not session.execute(
       
   159             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   157             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
   160             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   158             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
   161             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   159             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
   162             self.error('concurrent synchronization detected, skip pull')
   160             self.error('concurrent synchronization detected, skip pull')
   163             session.commit()
   161             cnx.commit()
   164             return False
   162             return False
   165         session.commit()
   163         cnx.commit()
   166         return True
   164         return True
   167 
   165 
   168     def release_synchronization_lock(self, session):
   166     def release_synchronization_lock(self, cnx):
   169         session.set_cnxset()
   167         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   170         session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
       
   171                         {'x': self.eid})
   168                         {'x': self.eid})
   172         session.commit()
   169         cnx.commit()
   173 
   170 
   174     def pull_data(self, session, force=False, raise_on_error=False):
   171     def pull_data(self, cnx, force=False, raise_on_error=False):
   175         """Launch synchronization of the source if needed.
   172         """Launch synchronization of the source if needed.
   176 
   173 
   177         This method is responsible to handle commit/rollback on the given
   174         This method is responsible to handle commit/rollback on the given
   178         session.
   175         connection.
   179         """
   176         """
   180         if not force and self.fresh():
   177         if not force and self.fresh():
   181             return {}
   178             return {}
   182         if not self.acquire_synchronization_lock(session):
   179         if not self.acquire_synchronization_lock(cnx):
   183             return {}
   180             return {}
   184         try:
   181         try:
   185             with session.transaction(free_cnxset=False):
   182             return self._pull_data(cnx, force, raise_on_error)
   186                 return self._pull_data(session, force, raise_on_error)
       
   187         finally:
   183         finally:
   188             self.release_synchronization_lock(session)
   184             cnx.rollback() # rollback first in case there is some dirty
   189 
   185                            # transaction remaining
   190     def _pull_data(self, session, force=False, raise_on_error=False):
   186             self.release_synchronization_lock(cnx)
   191         importlog = self.init_import_log(session)
   187 
   192         myuris = self.source_cwuris(session)
   188     def _pull_data(self, cnx, force=False, raise_on_error=False):
   193         parser = self._get_parser(session, sourceuris=myuris, import_log=importlog)
   189         importlog = self.init_import_log(cnx)
       
   190         myuris = self.source_cwuris(cnx)
       
   191         parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
   194         if self.process_urls(parser, self.urls, raise_on_error):
   192         if self.process_urls(parser, self.urls, raise_on_error):
   195             self.warning("some error occurred, don't attempt to delete entities")
   193             self.warning("some error occurred, don't attempt to delete entities")
   196         else:
   194         else:
   197             parser.handle_deletion(self.config, session, myuris)
   195             parser.handle_deletion(self.config, cnx, myuris)
   198         self.update_latest_retrieval(session)
   196         self.update_latest_retrieval(cnx)
   199         stats = parser.stats
   197         stats = parser.stats
   200         if stats.get('created'):
   198         if stats.get('created'):
   201             importlog.record_info('added %s entities' % len(stats['created']))
   199             importlog.record_info('added %s entities' % len(stats['created']))
   202         if stats.get('updated'):
   200         if stats.get('updated'):
   203             importlog.record_info('updated %s entities' % len(stats['updated']))
   201             importlog.record_info('updated %s entities' % len(stats['updated']))
   204         session.set_cnxset()
   202         importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
   205         importlog.write_log(session, end_timestamp=self.latest_retrieval)
   203         cnx.commit()
   206         session.commit()
       
   207         return stats
   204         return stats
   208 
   205 
   209     def process_urls(self, parser, urls, raise_on_error=False):
   206     def process_urls(self, parser, urls, raise_on_error=False):
   210         error = False
   207         error = False
   211         for url in urls:
   208         for url in urls:
   414             return True
   411             return True
   415         error = False
   412         error = False
   416         # Check whether self._cw is a session or a connection
   413         # Check whether self._cw is a session or a connection
   417         if getattr(self._cw, 'commit', None) is not None:
   414         if getattr(self._cw, 'commit', None) is not None:
   418             commit = self._cw.commit
   415             commit = self._cw.commit
   419             set_cnxset = self._cw.set_cnxset
       
   420             rollback = self._cw.rollback
   416             rollback = self._cw.rollback
   421         else:
   417         else:
   422             commit = self._cw.cnx.commit
   418             commit = self._cw.cnx.commit
   423             set_cnxset = lambda: None
       
   424             rollback = self._cw.cnx.rollback
   419             rollback = self._cw.cnx.rollback
   425         for args in parsed:
   420         for args in parsed:
   426             try:
   421             try:
   427                 self.process_item(*args)
   422                 self.process_item(*args)
   428                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
   423                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
   429                 # other a chance to get our connections set
   424                 # other a chance to get our connections set
   430                 commit()
   425                 commit()
   431                 set_cnxset()
       
   432             except ValidationError as exc:
   426             except ValidationError as exc:
   433                 if raise_on_error:
   427                 if raise_on_error:
   434                     raise
   428                     raise
   435                 self.source.error('Skipping %s because of validation error %s'
   429                 self.source.error('Skipping %s because of validation error %s'
   436                                   % (args, exc))
   430                                   % (args, exc))
   437                 rollback()
   431                 rollback()
   438                 set_cnxset()
       
   439                 error = True
   432                 error = True
   440         return error
   433         return error
   441 
   434 
   442     def parse(self, url):
   435     def parse(self, url):
   443         if url.startswith('http'):
   436         if url.startswith('http'):