53 'default': '5min', |
53 'default': '5min', |
54 'help': ('Interval in seconds between synchronization with the ' |
54 'help': ('Interval in seconds between synchronization with the ' |
55 'external source (default to 5 minutes, must be >= 1 min).'), |
55 'external source (default to 5 minutes, must be >= 1 min).'), |
56 'group': 'datafeed-source', 'level': 2, |
56 'group': 'datafeed-source', 'level': 2, |
57 }), |
57 }), |
|
58 ('max-lock-lifetime', |
|
59 {'type' : 'time', |
|
60 'default': '1h', |
|
61 'help': ('Maximum time allowed for a synchronization to be run. ' |
|
62 'Exceeded that time, the synchronization will be considered ' |
|
63 'as having failed and not properly released the lock, hence ' |
|
64 'it won\'t be considered'), |
|
65 'group': 'datafeed-source', 'level': 2, |
|
66 }), |
58 ('delete-entities', |
67 ('delete-entities', |
59 {'type' : 'yn', |
68 {'type' : 'yn', |
60 'default': True, |
69 'default': True, |
61 'help': ('Should already imported entities not found anymore on the ' |
70 'help': ('Should already imported entities not found anymore on the ' |
62 'external source be deleted?'), |
71 'external source be deleted?'), |
90 def update_config(self, source_entity, typedconfig): |
99 def update_config(self, source_entity, typedconfig): |
91 """update configuration from source entity. `typedconfig` is config |
100 """update configuration from source entity. `typedconfig` is config |
92 properly typed with defaults set |
101 properly typed with defaults set |
93 """ |
102 """ |
94 self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) |
103 self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval']) |
|
104 self.max_lock_lifetime = timedelta(seconds=typedconfig['max-lock-lifetime']) |
95 if source_entity is not None: |
105 if source_entity is not None: |
96 self._entity_update(source_entity) |
106 self._entity_update(source_entity) |
97 self.config = typedconfig |
107 self.config = typedconfig |
98 |
108 |
99 def init(self, activated, source_entity): |
109 def init(self, activated, source_entity): |
138 {'x': self.eid, 'date': self.latest_retrieval}) |
148 {'x': self.eid, 'date': self.latest_retrieval}) |
139 |
149 |
140 def acquire_synchronization_lock(self, session): |
150 def acquire_synchronization_lock(self, session): |
141 # XXX race condition until WHERE of SET queries is executed using |
151 # XXX race condition until WHERE of SET queries is executed using |
142 # 'SELECT FOR UPDATE' |
152 # 'SELECT FOR UPDATE' |
143 if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE', |
153 now = datetime.utcnow() |
144 {'x': self.eid}): |
154 if not session.execute('SET X in_synchronizaton %(now)s WHERE X eid %(x)s, X synchronizing NULL OR X synchronizing < %(maxdt)s', |
|
155 {'x': self.eid, |
|
156 'now': now, |
|
157 'maxdt': now - self.max_lock_lifetime}): |
145 self.error('concurrent synchronization detected, skip pull') |
158 self.error('concurrent synchronization detected, skip pull') |
146 session.commit(free_cnxset=False) |
159 session.commit(free_cnxset=False) |
147 return False |
160 return False |
148 session.commit(free_cnxset=False) |
161 session.commit(free_cnxset=False) |
149 return True |
162 return True |
150 |
163 |
151 def release_synchronization_lock(self, session): |
164 def release_synchronization_lock(self, session): |
152 session.set_cnxset() |
165 session.set_cnxset() |
153 session.execute('SET X synchronizing FALSE WHERE X eid %(x)s', |
166 session.execute('SET X synchronizing None WHERE X eid %(x)s', |
154 {'x': self.eid}) |
167 {'x': self.eid}) |
155 session.commit() |
168 session.commit() |
156 |
169 |
157 def pull_data(self, session, force=False, raise_on_error=False): |
170 def pull_data(self, session, force=False, raise_on_error=False): |
158 """Launch synchronization of the source if needed. |
171 """Launch synchronization of the source if needed. |