cubicweb/server/sources/datafeed.py
changeset 12346 b3f45d96a179
parent 12149 649100470733
child 12508 a8c1ea390400
equal deleted inserted replaced
12345:1b3d82915321 12346:b3f45d96a179
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 """datafeed sources: copy data from an external data stream into the system
    18 """datafeed sources: copy data from an external data stream into the system
    19 database
    19 database
    20 """
    20 """
    21 
    21 
       
    22 from warnings import warn
    22 from io import BytesIO
    23 from io import BytesIO
    23 from os.path import exists
    24 from os.path import exists
    24 from datetime import datetime, timedelta
    25 from datetime import datetime, timedelta
    25 from functools import partial
    26 from functools import partial
    26 
    27 
   155     def release_synchronization_lock(self, cnx):
   156     def release_synchronization_lock(self, cnx):
   156         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   157         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   157                     {'x': self.eid})
   158                     {'x': self.eid})
   158         cnx.commit()
   159         cnx.commit()
   159 
   160 
   160     def pull_data(self, cnx, force=False, raise_on_error=False, async=False):
   161     def pull_data(self, cnx, force=False, raise_on_error=False, sync=True, **kwargs):
   161         """Launch synchronization of the source if needed.
   162         """Launch synchronization of the source if needed.
   162 
   163 
   163         If `async` is true, the method return immediatly a dictionnary containing the import log's
   164         If `sync` is false, the method return immediatly a dictionnary containing the import log's
   164         eid, and the actual synchronization is done asynchronously. If `async` is false, return some
   165         eid, and the actual synchronization is done asynchronously. If `sync` is True, return some
   165         imports statistics (e.g. number of created and updated entities).
   166         imports statistics (e.g. number of created and updated entities).
   166 
   167 
   167         This method is responsible to handle commit/rollback on the given connection.
   168         This method is responsible to handle commit/rollback on the given connection.
   168         """
   169         """
   169         if not force and self.fresh():
   170         if not force and self.fresh():
   174             if force:
   175             if force:
   175                 raise
   176                 raise
   176             self.error(str(exc))
   177             self.error(str(exc))
   177             return {}
   178             return {}
   178         try:
   179         try:
   179             if async:
   180             if kwargs.get('async') is not None:
       
   181                 warn('[3.27] `async` is reserved keyword in py3.7 use `sync` param instead',
       
   182                      DeprecationWarning)
       
   183                 sync = not kwargs['async']
       
   184             if sync:
       
   185                 return self._pull_data(cnx, force, raise_on_error)
       
   186             else:
   180                 return self._async_pull_data(cnx, force, raise_on_error)
   187                 return self._async_pull_data(cnx, force, raise_on_error)
   181             else:
       
   182                 return self._pull_data(cnx, force, raise_on_error)
       
   183         finally:
   188         finally:
   184             cnx.rollback()  # rollback first in case there is some dirty transaction remaining
   189             cnx.rollback()  # rollback first in case there is some dirty transaction remaining
   185             self.release_synchronization_lock(cnx)
   190             self.release_synchronization_lock(cnx)
   186 
   191 
   187     def _async_pull_data(self, cnx, force, raise_on_error):
   192     def _async_pull_data(self, cnx, force, raise_on_error):