21 |
21 |
22 from datetime import timedelta, datetime |
22 from datetime import timedelta, datetime |
23 |
23 |
24 from cubicweb.server import hook |
24 from cubicweb.server import hook |
25 |
25 |
26 class ServerStartupHook(hook.Hook): |
26 class TransactionsCleanupStartupHook(hook.Hook): |
27 """task to cleanup expirated auth cookie entities""" |
27 """start task to cleanup transaction data""" |
28 __regid__ = 'cw.start-looping-tasks' |
28 __regid__ = 'cw.looping-tasks.transactions-cleanup' |
29 events = ('server_startup',) |
29 events = ('server_startup',) |
30 |
30 |
31 def __call__(self): |
31 def __call__(self): |
32 # XXX use named args and inner functions to avoid referencing globals |
32 # XXX use named args and inner functions to avoid referencing globals |
33 # which may cause reloading pb |
33 # which may cause reloading pb |
47 finally: |
47 finally: |
48 session.close() |
48 session.close() |
49 if self.repo.config['undo-enabled']: |
49 if self.repo.config['undo-enabled']: |
50 self.repo.looping_task(60*60*24, cleanup_old_transactions, |
50 self.repo.looping_task(60*60*24, cleanup_old_transactions, |
51 self.repo) |
51 self.repo) |
|
52 |
|
53 class UpdateFeedsStartupHook(hook.Hook): |
|
54 """start task to update datafeed based sources""" |
|
55 __regid__ = 'cw.looping-tasks.update-feeds' |
|
56 events = ('server_startup',) |
|
57 |
|
58 def __call__(self): |
52 def update_feeds(repo): |
59 def update_feeds(repo): |
53 # don't iter on repo.sources which doesn't include copy based |
60 # don't iter on repo.sources which doesn't include copy based |
54 # sources (the one we're looking for) |
61 # sources (the one we're looking for) |
55 for source in repo.sources_by_eid.itervalues(): |
62 for source in repo.sources_by_eid.itervalues(): |
56 if (not source.copy_based_source |
63 if (not source.copy_based_source |
64 session.exception('while trying to update feed %s', source) |
71 session.exception('while trying to update feed %s', source) |
65 finally: |
72 finally: |
66 session.close() |
73 session.close() |
67 self.repo.looping_task(60, update_feeds, self.repo) |
74 self.repo.looping_task(60, update_feeds, self.repo) |
68 |
75 |
|
76 |
|
77 class DataImportsCleanupStartupHook(hook.Hook): |
|
78 """start task to cleanup old data imports (ie datafeed import logs)""" |
|
79 __regid__ = 'cw.looping-tasks.dataimports-cleanup' |
|
80 events = ('server_startup',) |
|
81 |
|
82 def __call__(self): |
69 def expire_dataimports(repo=self.repo): |
83 def expire_dataimports(repo=self.repo): |
70 for source in repo.sources_by_eid.itervalues(): |
84 for source in repo.sources_by_eid.itervalues(): |
71 if (not source.copy_based_source |
85 if (not source.copy_based_source |
72 or not repo.config.source_enabled(source)): |
86 or not repo.config.source_enabled(source)): |
73 continue |
87 continue |
74 session = repo.internal_session() |
88 session = repo.internal_session() |
75 try: |
89 try: |
76 mindate = datetime.now() - timedelta(seconds=source.config['logs-lifetime']) |
90 mindate = datetime.now() - timedelta(seconds=source.config['logs-lifetime']) |
77 session.execute('DELETE CWDataImport X WHERE X start_timestamp < %(time)s', {'time': mindate}) |
91 session.execute('DELETE CWDataImport X WHERE X start_timestamp < %(time)s', |
|
92 {'time': mindate}) |
78 session.commit() |
93 session.commit() |
79 finally: |
94 finally: |
80 session.close() |
95 session.close() |
81 self.repo.looping_task(60*60*24, expire_dataimports, self.repo) |
96 self.repo.looping_task(60*60*24, expire_dataimports, self.repo) |