20 """ |
20 """ |
21 |
21 |
22 from io import BytesIO |
22 from io import BytesIO |
23 from os.path import exists |
23 from os.path import exists |
24 from datetime import datetime, timedelta |
24 from datetime import datetime, timedelta |
|
25 from functools import partial |
25 |
26 |
26 from six import text_type |
27 from six import text_type |
27 from six.moves.urllib.parse import urlparse |
28 from six.moves.urllib.parse import urlparse |
28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor |
29 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor |
29 from six.moves.urllib.error import HTTPError |
30 from six.moves.urllib.error import HTTPError |
192 def release_synchronization_lock(self, cnx): |
193 def release_synchronization_lock(self, cnx): |
193 cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
194 cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
194 {'x': self.eid}) |
195 {'x': self.eid}) |
195 cnx.commit() |
196 cnx.commit() |
196 |
197 |
197 def pull_data(self, cnx, force=False, raise_on_error=False): |
198 def pull_data(self, cnx, force=False, raise_on_error=False, async=False): |
198 """Launch synchronization of the source if needed. |
199 """Launch synchronization of the source if needed. |
199 |
200 |
200 This method is responsible to handle commit/rollback on the given |
201 If `async` is true, the method return immediatly a dictionnary containing the import log's |
201 connection. |
202 eid, and the actual synchronization is done asynchronously. If `async` is false, return some |
|
203 imports statistics (e.g. number of created and updated entities). |
|
204 |
|
205 This method is responsible to handle commit/rollback on the given connection. |
202 """ |
206 """ |
203 if not force and self.fresh(): |
207 if not force and self.fresh(): |
204 return {} |
208 return {} |
205 if not self.acquire_synchronization_lock(cnx, force): |
209 if not self.acquire_synchronization_lock(cnx, force): |
206 return {} |
210 return {} |
207 try: |
211 try: |
208 return self._pull_data(cnx, force, raise_on_error) |
212 if async: |
|
213 return self._async_pull_data(cnx, force, raise_on_error) |
|
214 else: |
|
215 return self._pull_data(cnx, force, raise_on_error) |
209 finally: |
216 finally: |
210 cnx.rollback() # rollback first in case there is some dirty |
217 cnx.rollback() # rollback first in case there is some dirty transaction remaining |
211 # transaction remaining |
|
212 self.release_synchronization_lock(cnx) |
218 self.release_synchronization_lock(cnx) |
213 |
219 |
214 def _pull_data(self, cnx, force=False, raise_on_error=False): |
220 def _async_pull_data(self, cnx, force, raise_on_error): |
215 importlog = self.init_import_log(cnx) |
221 import_log = cnx.create_entity('CWDataImport', cw_import_of=self) |
|
222 cnx.commit() # commit the import log creation before starting the synchronize task |
|
223 |
|
224 def _synchronize_source(repo, source_eid, import_log_eid): |
|
225 with repo.internal_cnx() as cnx: |
|
226 source = repo.sources_by_eid[source_eid] |
|
227 source._pull_data(cnx, force, raise_on_error, import_log_eid=import_log_eid) |
|
228 |
|
229 sync = partial(_synchronize_source, cnx.repo, self.eid, import_log.eid) |
|
230 cnx.repo.threaded_task(sync) |
|
231 return {'import_log_eid': import_log.eid} |
|
232 |
|
233 def _pull_data(self, cnx, force=False, raise_on_error=False, import_log_eid=None): |
|
234 importlog = self.init_import_log(cnx, import_log_eid) |
216 myuris = self.source_cwuris(cnx) |
235 myuris = self.source_cwuris(cnx) |
217 try: |
236 try: |
218 parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog) |
237 parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog) |
219 except ObjectNotFound: |
238 except ObjectNotFound: |
220 return {} |
239 return {} |
295 'WHERE entities.eid=cw_source_relation.eid_from ' |
314 'WHERE entities.eid=cw_source_relation.eid_from ' |
296 'AND cw_source_relation.eid_to=%s' % self.eid) |
315 'AND cw_source_relation.eid_to=%s' % self.eid) |
297 return dict((self.decode_extid(uri), (eid, type)) |
316 return dict((self.decode_extid(uri), (eid, type)) |
298 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
317 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
299 |
318 |
300 def init_import_log(self, cnx, **kwargs): |
319 def init_import_log(self, cnx, import_log_eid=None, **kwargs): |
301 dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, |
320 if import_log_eid is None: |
302 start_timestamp=datetime.now(tz=utc), |
321 import_log = cnx.create_entity('CWDataImport', cw_import_of=self, |
303 **kwargs) |
322 start_timestamp=datetime.now(tz=utc), |
304 dataimport.init() |
323 **kwargs) |
305 return dataimport |
324 else: |
|
325 import_log = cnx.entity_from_eid(import_log_eid) |
|
326 import_log.cw_set(start_timestamp=datetime.now(tz=utc), **kwargs) |
|
327 cnx.commit() # make changes visible |
|
328 import_log.init() |
|
329 return import_log |
306 |
330 |
307 |
331 |
308 class DataFeedParser(AppObject): |
332 class DataFeedParser(AppObject): |
309 __registry__ = 'parsers' |
333 __registry__ = 'parsers' |
310 |
334 |