53 'default': '5min', |
53 'default': '5min', |
54 'help': ('Interval in seconds between synchronization with the ' |
54 'help': ('Interval in seconds between synchronization with the ' |
55 'external source (default to 5 minutes, must be >= 1 min).'), |
55 'external source (default to 5 minutes, must be >= 1 min).'), |
56 'group': 'datafeed-source', 'level': 2, |
56 'group': 'datafeed-source', 'level': 2, |
57 }), |
57 }), |
|
58 ('max-lock-lifetime', |
|
59 {'type' : 'time', |
|
60 'default': '1h', |
|
61 'help': ('Maximum time allowed for a synchronization to be run. ' |
|
62 'Exceeded that time, the synchronization will be considered ' |
|
63 'as having failed and not properly released the lock, hence ' |
|
64 'it won\'t be considered'), |
|
65 'group': 'datafeed-source', 'level': 2, |
|
66 }), |
58 ('delete-entities', |
67 ('delete-entities', |
59 {'type' : 'yn', |
68 {'type' : 'yn', |
60 'default': True, |
69 'default': True, |
61 'help': ('Should already imported entities not found anymore on the ' |
70 'help': ('Should already imported entities not found anymore on the ' |
62 'external source be deleted?'), |
71 'external source be deleted?'), |
88 def update_config(self, source_entity, typedconfig): |
97 def update_config(self, source_entity, typedconfig): |
89 """update configuration from source entity. `typedconfig` is config |
98 """update configuration from source entity. `typedconfig` is config |
90 properly typed with defaults set |
99 properly typed with defaults set |
91 """ |
100 """ |
92 self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) |
101 self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) |
|
102 self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime']) |
93 if source_entity is not None: |
103 if source_entity is not None: |
94 self._entity_update(source_entity) |
104 self._entity_update(source_entity) |
95 self.config = typedconfig |
105 self.config = typedconfig |
96 |
106 |
97 def init(self, activated, source_entity): |
107 def init(self, activated, source_entity): |
136 {'x': self.eid, 'date': self.latest_retrieval}) |
146 {'x': self.eid, 'date': self.latest_retrieval}) |
137 |
147 |
138 def acquire_synchronization_lock(self, session): |
148 def acquire_synchronization_lock(self, session): |
139 # XXX race condition until WHERE of SET queries is executed using |
149 # XXX race condition until WHERE of SET queries is executed using |
140 # 'SELECT FOR UPDATE' |
150 # 'SELECT FOR UPDATE' |
141 if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE', |
151 now = datetime.utcnow() |
142 {'x': self.eid}): |
152 if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s', |
|
153 {'x': self.eid, |
|
154 'now': now, |
|
155 'maxdt': now - self.max_lock_lifetime}): |
143 self.error('concurrent synchronization detected, skip pull') |
156 self.error('concurrent synchronization detected, skip pull') |
144 session.commit(free_cnxset=False) |
157 session.commit(free_cnxset=False) |
145 return False |
158 return False |
146 session.commit(free_cnxset=False) |
159 session.commit(free_cnxset=False) |
147 return True |
160 return True |
148 |
161 |
149 def release_synchronization_lock(self, session): |
162 def release_synchronization_lock(self, session): |
150 session.set_cnxset() |
163 session.set_cnxset() |
151 session.execute('SET X synchronizing FALSE WHERE X eid %(x)s', |
164 session.execute('SET X synchronizing None WHERE X eid %(x)s', |
152 {'x': self.eid}) |
165 {'x': self.eid}) |
153 session.commit() |
166 session.commit() |
154 |
167 |
155 def pull_data(self, session, force=False, raise_on_error=False): |
168 def pull_data(self, session, force=False, raise_on_error=False): |
156 """Launch synchronization of the source if needed. |
169 """Launch synchronization of the source if needed. |