57 or not repo.config.source_enabled(source) |
57 or not repo.config.source_enabled(source) |
58 or not source.config['synchronize']): |
58 or not source.config['synchronize']): |
59 continue |
59 continue |
60 session = repo.internal_session(safe=True) |
60 session = repo.internal_session(safe=True) |
61 try: |
61 try: |
62 stats = source.pull_data(session) |
62 source.pull_data(session) |
63 if stats.get('created'): |
|
64 source.info('added %s entities', len(stats['created'])) |
|
65 if stats.get('updated'): |
|
66 source.info('updated %s entities', len(stats['updated'])) |
|
67 except Exception, exc: |
63 except Exception, exc: |
68 session.exception('while trying to update feed %s', source) |
64 session.exception('while trying to update feed %s', source) |
69 finally: |
65 finally: |
70 session.close() |
66 session.close() |
71 self.repo.looping_task(60, update_feeds, self.repo) |
67 self.repo.looping_task(60, update_feeds, self.repo) |
|
68 |
|
69 def expire_dataimports(repo=self.repo): |
|
70 for source in repo.sources_by_eid.itervalues(): |
|
71 if (not source.copy_based_source |
|
72 or not repo.config.source_enabled(source)): |
|
73 continue |
|
74 session = repo.internal_session() |
|
75 try: |
|
76 mindate = datetime.now() - timedelta(seconds=source.config['logs-lifetime']) |
|
77 session.execute('DELETE CWDataImport X WHERE X start_timestamp < %(time)s', {'time': mindate}) |
|
78 session.commit() |
|
79 finally: |
|
80 session.close() |
|
81 self.repo.looping_task(60*60*24, expire_dataimports, self.repo) |