cubicweb/server/sources/datafeed.py
changeset 11138 78c8e64f3cef
parent 11129 97095348b3ee
child 11151 4259c55df3e7
equal deleted inserted replaced
11137:447a6f1e8def 11138:78c8e64f3cef
     1 # copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     1 # copyright 2010-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
     3 #
     3 #
     4 # This file is part of CubicWeb.
     4 # This file is part of CubicWeb.
     5 #
     5 #
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
    20 """
    20 """
    21 
    21 
    22 from io import BytesIO
    22 from io import BytesIO
    23 from os.path import exists
    23 from os.path import exists
    24 from datetime import datetime, timedelta
    24 from datetime import datetime, timedelta
       
    25 from functools import partial
    25 
    26 
    26 from six import text_type
    27 from six import text_type
    27 from six.moves.urllib.parse import urlparse
    28 from six.moves.urllib.parse import urlparse
    28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
    29 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
    29 from six.moves.urllib.error import HTTPError
    30 from six.moves.urllib.error import HTTPError
   192     def release_synchronization_lock(self, cnx):
   193     def release_synchronization_lock(self, cnx):
   193         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   194         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   194                     {'x': self.eid})
   195                     {'x': self.eid})
   195         cnx.commit()
   196         cnx.commit()
   196 
   197 
   197     def pull_data(self, cnx, force=False, raise_on_error=False):
   198     def pull_data(self, cnx, force=False, raise_on_error=False, async=False):
   198         """Launch synchronization of the source if needed.
   199         """Launch synchronization of the source if needed.
   199 
   200 
   200         This method is responsible to handle commit/rollback on the given
   201         If `async` is true, the method return immediatly a dictionnary containing the import log's
   201         connection.
   202         eid, and the actual synchronization is done asynchronously. If `async` is false, return some
       
   203         imports statistics (e.g. number of created and updated entities).
       
   204 
       
   205         This method is responsible to handle commit/rollback on the given connection.
   202         """
   206         """
   203         if not force and self.fresh():
   207         if not force and self.fresh():
   204             return {}
   208             return {}
   205         if not self.acquire_synchronization_lock(cnx, force):
   209         if not self.acquire_synchronization_lock(cnx, force):
   206             return {}
   210             return {}
   207         try:
   211         try:
   208             return self._pull_data(cnx, force, raise_on_error)
   212             if async:
       
   213                 return self._async_pull_data(cnx, force, raise_on_error)
       
   214             else:
       
   215                 return self._pull_data(cnx, force, raise_on_error)
   209         finally:
   216         finally:
   210             cnx.rollback() # rollback first in case there is some dirty
   217             cnx.rollback()  # rollback first in case there is some dirty transaction remaining
   211                            # transaction remaining
       
   212             self.release_synchronization_lock(cnx)
   218             self.release_synchronization_lock(cnx)
   213 
   219 
   214     def _pull_data(self, cnx, force=False, raise_on_error=False):
   220     def _async_pull_data(self, cnx, force, raise_on_error):
   215         importlog = self.init_import_log(cnx)
   221         import_log = cnx.create_entity('CWDataImport', cw_import_of=self)
       
   222         cnx.commit()  # commit the import log creation before starting the synchronize task
       
   223 
       
   224         def _synchronize_source(repo, source_eid, import_log_eid):
       
   225             with repo.internal_cnx() as cnx:
       
   226                 source = repo.sources_by_eid[source_eid]
       
   227                 source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid)
       
   228 
       
   229         sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid)
       
   230         cnx.repo.threaded_task(sync)
       
   231         return {'import_log_eid': import_log.eid}
       
   232 
       
   233     def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None):
       
   234         importlog = self.init_import_log(cnx, import_log_eid)
   216         myuris = self.source_cwuris(cnx)
   235         myuris = self.source_cwuris(cnx)
   217         try:
   236         try:
   218             parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
   237             parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
   219         except ObjectNotFound:
   238         except ObjectNotFound:
   220             return {}
   239             return {}
   295                'WHERE entities.eid=cw_source_relation.eid_from '
   314                'WHERE entities.eid=cw_source_relation.eid_from '
   296                'AND cw_source_relation.eid_to=%s' % self.eid)
   315                'AND cw_source_relation.eid_to=%s' % self.eid)
   297         return dict((self.decode_extid(uri), (eid, type))
   316         return dict((self.decode_extid(uri), (eid, type))
   298                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   317                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   299 
   318 
   300     def init_import_log(self, cnx, **kwargs):
   319     def init_import_log(self, cnx, import_log_eid=None, **kwargs):
   301         dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
   320         if import_log_eid is None:
   302                                        start_timestamp=datetime.now(tz=utc),
   321             import_log = cnx.create_entity('CWDataImport', cw_import_of=self,
   303                                        **kwargs)
   322                                            start_timestamp=datetime.now(tz=utc),
   304         dataimport.init()
   323                                            **kwargs)
   305         return dataimport
   324         else:
       
   325             import_log = cnx.entity_from_eid(import_log_eid)
       
   326             import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs)
       
   327         cnx.commit()  # make changes visible
       
   328         import_log.init()
       
   329         return import_log
   306 
   330 
   307 
   331 
   308 class DataFeedParser(AppObject):
   332 class DataFeedParser(AppObject):
   309     __registry__ = 'parsers'
   333     __registry__ = 'parsers'
   310 
   334