16 # You should have received a copy of the GNU Lesser General Public License along |
16 # You should have received a copy of the GNU Lesser General Public License along |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """datafeed sources: copy data from an external data stream into the system |
18 """datafeed sources: copy data from an external data stream into the system |
19 database |
19 database |
20 """ |
20 """ |
|
21 from __future__ import with_statement |
21 |
22 |
22 import urllib2 |
23 import urllib2 |
23 import StringIO |
24 import StringIO |
24 from datetime import datetime, timedelta |
25 from datetime import datetime, timedelta |
25 from base64 import b64decode |
26 from base64 import b64decode |
28 from lxml import etree |
29 from lxml import etree |
29 |
30 |
30 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
31 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
31 from cubicweb.server.sources import AbstractSource |
32 from cubicweb.server.sources import AbstractSource |
32 from cubicweb.appobject import AppObject |
33 from cubicweb.appobject import AppObject |
|
34 |
33 |
35 |
34 class DataFeedSource(AbstractSource): |
36 class DataFeedSource(AbstractSource): |
35 copy_based_source = True |
37 copy_based_source = True |
36 |
38 |
37 options = ( |
39 options = ( |
129 def update_latest_retrieval(self, session): |
131 def update_latest_retrieval(self, session): |
130 self.latest_retrieval = datetime.utcnow() |
132 self.latest_retrieval = datetime.utcnow() |
131 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
133 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
132 {'x': self.eid, 'date': self.latest_retrieval}) |
134 {'x': self.eid, 'date': self.latest_retrieval}) |
133 |
135 |
|
136 def acquire_synchronization_lock(self, session): |
|
137 # XXX race condition until WHERE of SET queries is executed using |
|
138 # 'SELECT FOR UPDATE' |
|
139 if not session.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE', |
|
140 {'x': self.eid})[0][0]: |
|
141 self.error('concurrent synchronization detected, skip pull') |
|
142 session.commit(free_cnxset=False) |
|
143 return False |
|
144 session.commit(free_cnxset=False) |
|
145 return True |
|
146 |
|
147 def release_synchronization_lock(self, session): |
|
148 session.execute('SET X synchronizing FALSE WHERE X eid %(x)s', |
|
149 {'x': self.eid}) |
|
150 session.commit() |
|
151 |
134 def pull_data(self, session, force=False, raise_on_error=False): |
152 def pull_data(self, session, force=False, raise_on_error=False): |
|
153 """Launch synchronization of the source if needed. |
|
154 |
|
155 This method is responsible to handle commit/rollback on the given |
|
156 session. |
|
157 """ |
135 if not force and self.fresh(): |
158 if not force and self.fresh(): |
136 return {} |
159 return {} |
|
160 if not self.acquire_synchronization_lock(session): |
|
161 return {} |
|
162 try: |
|
163 with session.transaction(free_cnxset=False): |
|
164 return self._pull_data(session, force, raise_on_error) |
|
165 finally: |
|
166 self.release_synchronization_lock(session) |
|
167 |
|
168 def _pull_data(self, session, force=False, raise_on_error=False): |
137 if self.config['delete-entities']: |
169 if self.config['delete-entities']: |
138 myuris = self.source_cwuris(session) |
170 myuris = self.source_cwuris(session) |
139 else: |
171 else: |
140 myuris = None |
172 myuris = None |
141 parser = self._get_parser(session, sourceuris=myuris) |
173 parser = self._get_parser(session, sourceuris=myuris) |
270 def process(self, url, raise_on_error=False, partialcommit=True): |
302 def process(self, url, raise_on_error=False, partialcommit=True): |
271 """IDataFeedParser main entry point""" |
303 """IDataFeedParser main entry point""" |
272 try: |
304 try: |
273 parsed = self.parse(url) |
305 parsed = self.parse(url) |
274 except Exception, ex: |
306 except Exception, ex: |
275 self.source.error(ex) |
307 self.source.error(str(ex)) |
276 return True |
308 return True |
277 error = False |
309 error = False |
278 for args in parsed: |
310 for args in parsed: |
279 try: |
311 try: |
280 self.process_item(*args) |
312 self.process_item(*args) |