server/sources/datafeed.py
changeset 7456 c54038622fc9
parent 7447 d5705c9bbe82
child 7461 cb25b62074cc
equal deleted inserted replaced
7455:694b21f0fc62 7456:c54038622fc9
    16 # You should have received a copy of the GNU Lesser General Public License along
    16 # You should have received a copy of the GNU Lesser General Public License along
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """datafeed sources: copy data from an external data stream into the system
    18 """datafeed sources: copy data from an external data stream into the system
    19 database
    19 database
    20 """
    20 """
       
    21 from __future__ import with_statement
    21 
    22 
    22 import urllib2
    23 import urllib2
    23 import StringIO
    24 import StringIO
    24 from datetime import datetime, timedelta
    25 from datetime import datetime, timedelta
    25 from base64 import b64decode
    26 from base64 import b64decode
    28 from lxml import etree
    29 from lxml import etree
    29 
    30 
    30 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    31 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
    31 from cubicweb.server.sources import AbstractSource
    32 from cubicweb.server.sources import AbstractSource
    32 from cubicweb.appobject import AppObject
    33 from cubicweb.appobject import AppObject
       
    34 
    33 
    35 
    34 class DataFeedSource(AbstractSource):
    36 class DataFeedSource(AbstractSource):
    35     copy_based_source = True
    37     copy_based_source = True
    36 
    38 
    37     options = (
    39     options = (
   129     def update_latest_retrieval(self, session):
   131     def update_latest_retrieval(self, session):
   130         self.latest_retrieval = datetime.utcnow()
   132         self.latest_retrieval = datetime.utcnow()
   131         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   133         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   132                         {'x': self.eid, 'date': self.latest_retrieval})
   134                         {'x': self.eid, 'date': self.latest_retrieval})
   133 
   135 
       
   136     def acquire_synchronization_lock(self, session):
       
   137         # XXX race condition until WHERE of SET queries is executed using
       
   138         # 'SELECT FOR UPDATE'
       
   139         if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',
       
   140                                {'x': self.eid})[0][0]:
       
   141             self.error('concurrent synchronization detected, skip pull')
       
   142             session.commit(free_cnxset=False)
       
   143             return False
       
   144         session.commit(free_cnxset=False)
       
   145         return True
       
   146 
       
   147     def release_synchronization_lock(self, session):
       
   148         session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',
       
   149                         {'x': self.eid})
       
   150         session.commit()
       
   151 
   134     def pull_data(self, session, force=False, raise_on_error=False):
   152     def pull_data(self, session, force=False, raise_on_error=False):
       
   153         """Launch synchronization of the source if needed.
       
   154 
       
   155         This method is responsible to handle commit/rollback on the given
       
   156         session.
       
   157         """
   135         if not force and self.fresh():
   158         if not force and self.fresh():
   136             return {}
   159             return {}
       
   160         if not self.acquire_synchronization_lock(session):
       
   161             return {}
       
   162         try:
       
   163             with session.transaction(free_cnxset=False):
       
   164                 return self._pull_data(session, force, raise_on_error)
       
   165         finally:
       
   166             self.release_synchronization_lock(session)
       
   167 
       
   168     def _pull_data(self, session, force=False, raise_on_error=False):
   137         if self.config['delete-entities']:
   169         if self.config['delete-entities']:
   138             myuris = self.source_cwuris(session)
   170             myuris = self.source_cwuris(session)
   139         else:
   171         else:
   140             myuris = None
   172             myuris = None
   141         parser = self._get_parser(session, sourceuris=myuris)
   173         parser = self._get_parser(session, sourceuris=myuris)
   270     def process(self, url, raise_on_error=False, partialcommit=True):
   302     def process(self, url, raise_on_error=False, partialcommit=True):
   271         """IDataFeedParser main entry point"""
   303         """IDataFeedParser main entry point"""
   272         try:
   304         try:
   273             parsed = self.parse(url)
   305             parsed = self.parse(url)
   274         except Exception, ex:
   306         except Exception, ex:
   275             self.source.error(ex)
   307             self.source.error(str(ex))
   276             return True
   308             return True
   277         error = False
   309         error = False
   278         for args in parsed:
   310         for args in parsed:
   279             try:
   311             try:
   280                 self.process_item(*args)
   312                 self.process_item(*args)