17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """datafeed sources: copy data from an external data stream into the system |
18 """datafeed sources: copy data from an external data stream into the system |
19 database |
19 database |
20 """ |
20 """ |
21 |
21 |
|
22 from warnings import warn |
22 from io import BytesIO |
23 from io import BytesIO |
23 from os.path import exists |
24 from os.path import exists |
24 from datetime import datetime, timedelta |
25 from datetime import datetime, timedelta |
25 from functools import partial |
26 from functools import partial |
26 |
27 |
155 def release_synchronization_lock(self, cnx): |
156 def release_synchronization_lock(self, cnx): |
156 cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
157 cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
157 {'x': self.eid}) |
158 {'x': self.eid}) |
158 cnx.commit() |
159 cnx.commit() |
159 |
160 |
160 def pull_data(self, cnx, force=False, raise_on_error=False, async=False): |
161 def pull_data(self, cnx, force=False, raise_on_error=False, sync=True, **kwargs): |
161 """Launch synchronization of the source if needed. |
162 """Launch synchronization of the source if needed. |
162 |
163 |
163 If `async` is true, the method return immediatly a dictionnary containing the import log's |
164 If `sync` is false, the method return immediatly a dictionnary containing the import log's |
164 eid, and the actual synchronization is done asynchronously. If `async` is false, return some |
165 eid, and the actual synchronization is done asynchronously. If `sync` is True, return some |
165 imports statistics (e.g. number of created and updated entities). |
166 imports statistics (e.g. number of created and updated entities). |
166 |
167 |
167 This method is responsible to handle commit/rollback on the given connection. |
168 This method is responsible to handle commit/rollback on the given connection. |
168 """ |
169 """ |
169 if not force and self.fresh(): |
170 if not force and self.fresh(): |
174 if force: |
175 if force: |
175 raise |
176 raise |
176 self.error(str(exc)) |
177 self.error(str(exc)) |
177 return {} |
178 return {} |
178 try: |
179 try: |
179 if async: |
180 if kwargs.get('async') is not None: |
|
181 warn('[3.27] `async` is reserved keyword in py3.7 use `sync` param instead', |
|
182 DeprecationWarning) |
|
183 sync = not kwargs['async'] |
|
184 if sync: |
|
185 return self._pull_data(cnx, force, raise_on_error) |
|
186 else: |
180 return self._async_pull_data(cnx, force, raise_on_error) |
187 return self._async_pull_data(cnx, force, raise_on_error) |
181 else: |
|
182 return self._pull_data(cnx, force, raise_on_error) |
|
183 finally: |
188 finally: |
184 cnx.rollback() # rollback first in case there is some dirty transaction remaining |
189 cnx.rollback() # rollback first in case there is some dirty transaction remaining |
185 self.release_synchronization_lock(cnx) |
190 self.release_synchronization_lock(cnx) |
186 |
191 |
187 def _async_pull_data(self, cnx, force, raise_on_error): |
192 def _async_pull_data(self, cnx, force, raise_on_error): |