1 # copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """datafeed sources: copy data from an external data stream into the system |
|
19 database |
|
20 """ |
|
21 |
|
22 from io import BytesIO |
|
23 from os.path import exists |
|
24 from datetime import datetime, timedelta |
|
25 |
|
26 from six import text_type |
|
27 from six.moves.urllib.parse import urlparse |
|
28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor |
|
29 from six.moves.urllib.error import HTTPError |
|
30 from six.moves.http_cookiejar import CookieJar |
|
31 |
|
32 from pytz import utc |
|
33 from lxml import etree |
|
34 |
|
35 from logilab.common.deprecation import deprecated |
|
36 |
|
37 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid |
|
38 from cubicweb.server.repository import preprocess_inlined_relations |
|
39 from cubicweb.server.sources import AbstractSource |
|
40 from cubicweb.appobject import AppObject |
|
41 |
|
42 |
|
43 class DataFeedSource(AbstractSource): |
|
44 use_cwuri_as_url = True |
|
45 |
|
46 options = ( |
|
47 ('synchronize', |
|
48 {'type' : 'yn', |
|
49 'default': True, |
|
50 'help': ('Is the repository responsible to automatically import ' |
|
51 'content from this source? ' |
|
52 'You should say yes unless you don\'t want this behaviour ' |
|
53 'or if you use a multiple repositories setup, in which ' |
|
54 'case you should say yes on one repository, no on others.'), |
|
55 'group': 'datafeed-source', 'level': 2, |
|
56 }), |
|
57 ('synchronization-interval', |
|
58 {'type' : 'time', |
|
59 'default': '5min', |
|
60 'help': ('Interval in seconds between synchronization with the ' |
|
61 'external source (default to 5 minutes, must be >= 1 min).'), |
|
62 'group': 'datafeed-source', 'level': 2, |
|
63 }), |
|
64 ('max-lock-lifetime', |
|
65 {'type' : 'time', |
|
66 'default': '1h', |
|
67 'help': ('Maximum time allowed for a synchronization to be run. ' |
|
68 'Exceeded that time, the synchronization will be considered ' |
|
69 'as having failed and not properly released the lock, hence ' |
|
70 'it won\'t be considered'), |
|
71 'group': 'datafeed-source', 'level': 2, |
|
72 }), |
|
73 ('delete-entities', |
|
74 {'type' : 'yn', |
|
75 'default': False, |
|
76 'help': ('Should already imported entities not found anymore on the ' |
|
77 'external source be deleted?'), |
|
78 'group': 'datafeed-source', 'level': 2, |
|
79 }), |
|
80 ('logs-lifetime', |
|
81 {'type': 'time', |
|
82 'default': '10d', |
|
83 'help': ('Time before logs from datafeed imports are deleted.'), |
|
84 'group': 'datafeed-source', 'level': 2, |
|
85 }), |
|
86 ('http-timeout', |
|
87 {'type': 'time', |
|
88 'default': '1min', |
|
89 'help': ('Timeout of HTTP GET requests, when synchronizing a source.'), |
|
90 'group': 'datafeed-source', 'level': 2, |
|
91 }), |
|
92 ('use-cwuri-as-url', |
|
93 {'type': 'yn', |
|
94 'default': None, # explicitly unset |
|
95 'help': ('Use cwuri (i.e. external URL) for link to the entity ' |
|
96 'instead of its local URL.'), |
|
97 'group': 'datafeed-source', 'level': 1, |
|
98 }), |
|
99 ) |
|
100 |
|
101 def check_config(self, source_entity): |
|
102 """check configuration of source entity""" |
|
103 typed_config = super(DataFeedSource, self).check_config(source_entity) |
|
104 if typed_config['synchronization-interval'] < 60: |
|
105 _ = source_entity._cw._ |
|
106 msg = _('synchronization-interval must be greater than 1 minute') |
|
107 raise ValidationError(source_entity.eid, {'config': msg}) |
|
108 return typed_config |
|
109 |
|
110 def _entity_update(self, source_entity): |
|
111 super(DataFeedSource, self)._entity_update(source_entity) |
|
112 self.parser_id = source_entity.parser |
|
113 self.latest_retrieval = source_entity.latest_retrieval |
|
114 |
|
115 def update_config(self, source_entity, typed_config): |
|
116 """update configuration from source entity. `typed_config` is config |
|
117 properly typed with defaults set |
|
118 """ |
|
119 super(DataFeedSource, self).update_config(source_entity, typed_config) |
|
120 self.synchro_interval = timedelta(seconds=typed_config['synchronization-interval']) |
|
121 self.max_lock_lifetime = timedelta(seconds=typed_config['max-lock-lifetime']) |
|
122 self.http_timeout = typed_config['http-timeout'] |
|
123 # if typed_config['use-cwuri-as-url'] is set, we have to update |
|
124 # use_cwuri_as_url attribute and public configuration dictionary |
|
125 # accordingly |
|
126 if typed_config['use-cwuri-as-url'] is not None: |
|
127 self.use_cwuri_as_url = typed_config['use-cwuri-as-url'] |
|
128 self.public_config['use-cwuri-as-url'] = self.use_cwuri_as_url |
|
129 |
|
130 def init(self, activated, source_entity): |
|
131 super(DataFeedSource, self).init(activated, source_entity) |
|
132 self.parser_id = source_entity.parser |
|
133 self.load_mapping(source_entity._cw) |
|
134 |
|
135 def _get_parser(self, cnx, **kwargs): |
|
136 if self.parser_id is None: |
|
137 self.warning('No parser defined on source %r', self) |
|
138 raise ObjectNotFound() |
|
139 return self.repo.vreg['parsers'].select( |
|
140 self.parser_id, cnx, source=self, **kwargs) |
|
141 |
|
142 def load_mapping(self, cnx): |
|
143 self.mapping = {} |
|
144 self.mapping_idx = {} |
|
145 try: |
|
146 parser = self._get_parser(cnx) |
|
147 except (RegistryNotFound, ObjectNotFound): |
|
148 return # no parser yet, don't go further |
|
149 self._load_mapping(cnx, parser=parser) |
|
150 |
|
151 def add_schema_config(self, schemacfg, checkonly=False, parser=None): |
|
152 """added CWSourceSchemaConfig, modify mapping accordingly""" |
|
153 if parser is None: |
|
154 parser = self._get_parser(schemacfg._cw) |
|
155 parser.add_schema_config(schemacfg, checkonly) |
|
156 |
|
157 def del_schema_config(self, schemacfg, checkonly=False, parser=None): |
|
158 """deleted CWSourceSchemaConfig, modify mapping accordingly""" |
|
159 if parser is None: |
|
160 parser = self._get_parser(schemacfg._cw) |
|
161 parser.del_schema_config(schemacfg, checkonly) |
|
162 |
|
163 def fresh(self): |
|
164 if self.latest_retrieval is None: |
|
165 return False |
|
166 return datetime.now(tz=utc) < (self.latest_retrieval + self.synchro_interval) |
|
167 |
|
168 def update_latest_retrieval(self, cnx): |
|
169 self.latest_retrieval = datetime.now(tz=utc) |
|
170 cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
171 {'x': self.eid, 'date': self.latest_retrieval}) |
|
172 cnx.commit() |
|
173 |
|
174 def acquire_synchronization_lock(self, cnx): |
|
175 # XXX race condition until WHERE of SET queries is executed using |
|
176 # 'SELECT FOR UPDATE' |
|
177 now = datetime.now(tz=utc) |
|
178 if not cnx.execute( |
|
179 'SET X in_synchronization %(now)s WHERE X eid %(x)s, ' |
|
180 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s', |
|
181 {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}): |
|
182 self.error('concurrent synchronization detected, skip pull') |
|
183 cnx.commit() |
|
184 return False |
|
185 cnx.commit() |
|
186 return True |
|
187 |
|
188 def release_synchronization_lock(self, cnx): |
|
189 cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', |
|
190 {'x': self.eid}) |
|
191 cnx.commit() |
|
192 |
|
193 def pull_data(self, cnx, force=False, raise_on_error=False): |
|
194 """Launch synchronization of the source if needed. |
|
195 |
|
196 This method is responsible to handle commit/rollback on the given |
|
197 connection. |
|
198 """ |
|
199 if not force and self.fresh(): |
|
200 return {} |
|
201 if not self.acquire_synchronization_lock(cnx): |
|
202 return {} |
|
203 try: |
|
204 return self._pull_data(cnx, force, raise_on_error) |
|
205 finally: |
|
206 cnx.rollback() # rollback first in case there is some dirty |
|
207 # transaction remaining |
|
208 self.release_synchronization_lock(cnx) |
|
209 |
|
210 def _pull_data(self, cnx, force=False, raise_on_error=False): |
|
211 importlog = self.init_import_log(cnx) |
|
212 myuris = self.source_cwuris(cnx) |
|
213 try: |
|
214 parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog) |
|
215 except ObjectNotFound: |
|
216 return {} |
|
217 if self.process_urls(parser, self.urls, raise_on_error): |
|
218 self.warning("some error occurred, don't attempt to delete entities") |
|
219 else: |
|
220 parser.handle_deletion(self.config, cnx, myuris) |
|
221 self.update_latest_retrieval(cnx) |
|
222 stats = parser.stats |
|
223 if stats.get('created'): |
|
224 importlog.record_info('added %s entities' % len(stats['created'])) |
|
225 if stats.get('updated'): |
|
226 importlog.record_info('updated %s entities' % len(stats['updated'])) |
|
227 importlog.write_log(cnx, end_timestamp=self.latest_retrieval) |
|
228 cnx.commit() |
|
229 return stats |
|
230 |
|
231 def process_urls(self, parser, urls, raise_on_error=False): |
|
232 error = False |
|
233 for url in urls: |
|
234 self.info('pulling data from %s', url) |
|
235 try: |
|
236 if parser.process(url, raise_on_error): |
|
237 error = True |
|
238 except IOError as exc: |
|
239 if raise_on_error: |
|
240 raise |
|
241 parser.import_log.record_error( |
|
242 'could not pull data while processing %s: %s' |
|
243 % (url, exc)) |
|
244 error = True |
|
245 except Exception as exc: |
|
246 if raise_on_error: |
|
247 raise |
|
248 self.exception('error while processing %s: %s', |
|
249 url, exc) |
|
250 error = True |
|
251 return error |
|
252 |
|
253 @deprecated('[3.21] use the new store API') |
|
254 def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams): |
|
255 """called by the repository when an eid has been attributed for an |
|
256 entity stored here but the entity has not been inserted in the system |
|
257 table yet. |
|
258 |
|
259 This method must return the an Entity instance representation of this |
|
260 entity. |
|
261 """ |
|
262 entity = super(DataFeedSource, self).before_entity_insertion( |
|
263 cnx, lid, etype, eid, sourceparams) |
|
264 entity.cw_edited['cwuri'] = lid.decode('utf-8') |
|
265 entity.cw_edited.set_defaults() |
|
266 sourceparams['parser'].before_entity_copy(entity, sourceparams) |
|
267 return entity |
|
268 |
|
269 @deprecated('[3.21] use the new store API') |
|
270 def after_entity_insertion(self, cnx, lid, entity, sourceparams): |
|
271 """called by the repository after an entity stored here has been |
|
272 inserted in the system table. |
|
273 """ |
|
274 relations = preprocess_inlined_relations(cnx, entity) |
|
275 if cnx.is_hook_category_activated('integrity'): |
|
276 entity.cw_edited.check(creation=True) |
|
277 self.repo.system_source.add_entity(cnx, entity) |
|
278 entity.cw_edited.saved = entity._cw_is_saved = True |
|
279 sourceparams['parser'].after_entity_copy(entity, sourceparams) |
|
280 # call hooks for inlined relations |
|
281 call_hooks = self.repo.hm.call_hooks |
|
282 if self.should_call_hooks: |
|
283 for attr, value in relations: |
|
284 call_hooks('before_add_relation', cnx, |
|
285 eidfrom=entity.eid, rtype=attr, eidto=value) |
|
286 call_hooks('after_add_relation', cnx, |
|
287 eidfrom=entity.eid, rtype=attr, eidto=value) |
|
288 |
|
289 def source_cwuris(self, cnx): |
|
290 sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' |
|
291 'WHERE entities.eid=cw_source_relation.eid_from ' |
|
292 'AND cw_source_relation.eid_to=%s' % self.eid) |
|
293 return dict((self.decode_extid(uri), (eid, type)) |
|
294 for uri, eid, type in cnx.system_sql(sql).fetchall()) |
|
295 |
|
296 def init_import_log(self, cnx, **kwargs): |
|
297 dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, |
|
298 start_timestamp=datetime.now(tz=utc), |
|
299 **kwargs) |
|
300 dataimport.init() |
|
301 return dataimport |
|
302 |
|
303 |
|
304 class DataFeedParser(AppObject): |
|
305 __registry__ = 'parsers' |
|
306 |
|
307 def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs): |
|
308 super(DataFeedParser, self).__init__(cnx, **kwargs) |
|
309 self.source = source |
|
310 self.sourceuris = sourceuris |
|
311 self.import_log = import_log |
|
312 self.stats = {'created': set(), 'updated': set(), 'checked': set()} |
|
313 |
|
314 def normalize_url(self, url): |
|
315 """Normalize an url by looking if there is a replacement for it in |
|
316 `cubicweb.sobjects.URL_MAPPING`. |
|
317 |
|
318 This dictionary allow to redirect from one host to another, which may be |
|
319 useful for example in case of test instance using production data, while |
|
320 you don't want to load the external source nor to hack your `/etc/hosts` |
|
321 file. |
|
322 """ |
|
323 # local import mandatory, it's available after registration |
|
324 from cubicweb.sobjects import URL_MAPPING |
|
325 for mappedurl in URL_MAPPING: |
|
326 if url.startswith(mappedurl): |
|
327 return url.replace(mappedurl, URL_MAPPING[mappedurl], 1) |
|
328 return url |
|
329 |
|
330 def retrieve_url(self, url): |
|
331 """Return stream linked by the given url: |
|
332 * HTTP urls will be normalized (see :meth:`normalize_url`) |
|
333 * handle file:// URL |
|
334 * other will be considered as plain content, useful for testing purpose |
|
335 |
|
336 For http URLs, it will try to find a cwclientlib config entry |
|
337 (if available) and use it as requester. |
|
338 """ |
|
339 purl = urlparse(url) |
|
340 if purl.scheme == 'file': |
|
341 return URLLibResponseAdapter(open(url[7:]), url) |
|
342 |
|
343 url = self.normalize_url(url) |
|
344 |
|
345 # first, try to use cwclientlib if it's available and if the |
|
346 # url matches a configuration entry in ~/.config/cwclientlibrc |
|
347 try: |
|
348 from cwclientlib import cwproxy_for |
|
349 # parse url again since it has been normalized |
|
350 cnx = cwproxy_for(url) |
|
351 cnx.timeout = self.source.http_timeout |
|
352 self.source.info('Using cwclientlib for %s' % url) |
|
353 resp = cnx.get(url) |
|
354 resp.raise_for_status() |
|
355 return URLLibResponseAdapter(BytesIO(resp.text), url) |
|
356 except (ImportError, ValueError, EnvironmentError) as exc: |
|
357 # ImportError: not available |
|
358 # ValueError: no config entry found |
|
359 # EnvironmentError: no cwclientlib config file found |
|
360 self.source.debug(str(exc)) |
|
361 |
|
362 # no chance with cwclientlib, fall back to former implementation |
|
363 if purl.scheme in ('http', 'https'): |
|
364 self.source.info('GET %s', url) |
|
365 req = Request(url) |
|
366 return _OPENER.open(req, timeout=self.source.http_timeout) |
|
367 |
|
368 # url is probably plain content |
|
369 return URLLibResponseAdapter(BytesIO(url.encode('ascii')), url) |
|
370 |
|
371 def add_schema_config(self, schemacfg, checkonly=False): |
|
372 """added CWSourceSchemaConfig, modify mapping accordingly""" |
|
373 msg = schemacfg._cw._("this parser doesn't use a mapping") |
|
374 raise ValidationError(schemacfg.eid, {None: msg}) |
|
375 |
|
376 def del_schema_config(self, schemacfg, checkonly=False): |
|
377 """deleted CWSourceSchemaConfig, modify mapping accordingly""" |
|
378 msg = schemacfg._cw._("this parser doesn't use a mapping") |
|
379 raise ValidationError(schemacfg.eid, {None: msg}) |
|
380 |
|
381 @deprecated('[3.21] use the new store API') |
|
382 def extid2entity(self, uri, etype, **sourceparams): |
|
383 """Return an entity for the given uri. May return None if it should be |
|
384 skipped. |
|
385 |
|
386 If a `raise_on_error` keyword parameter is passed, a ValidationError |
|
387 exception may be raised. |
|
388 """ |
|
389 raise_on_error = sourceparams.pop('raise_on_error', False) |
|
390 cnx = self._cw |
|
391 # if cwsource is specified and repository has a source with the same |
|
392 # name, call extid2eid on that source so entity will be properly seen as |
|
393 # coming from this source |
|
394 source_uri = sourceparams.pop('cwsource', None) |
|
395 if source_uri is not None and source_uri != 'system': |
|
396 source = cnx.repo.sources_by_uri.get(source_uri, self.source) |
|
397 else: |
|
398 source = self.source |
|
399 sourceparams['parser'] = self |
|
400 if isinstance(uri, text_type): |
|
401 uri = uri.encode('utf-8') |
|
402 try: |
|
403 eid = cnx.repo.extid2eid(source, uri, etype, cnx, |
|
404 sourceparams=sourceparams) |
|
405 except ValidationError as ex: |
|
406 if raise_on_error: |
|
407 raise |
|
408 self.source.critical('error while creating %s: %s', etype, ex) |
|
409 self.import_log.record_error('error while creating %s: %s' |
|
410 % (etype, ex)) |
|
411 return None |
|
412 if eid < 0: |
|
413 # entity has been moved away from its original source |
|
414 # |
|
415 # Don't give etype to entity_from_eid so we get UnknownEid if the |
|
416 # entity has been removed |
|
417 try: |
|
418 entity = cnx.entity_from_eid(-eid) |
|
419 except UnknownEid: |
|
420 return None |
|
421 self.notify_updated(entity) # avoid later update from the source's data |
|
422 return entity |
|
423 if self.sourceuris is not None: |
|
424 self.sourceuris.pop(str(uri), None) |
|
425 return cnx.entity_from_eid(eid, etype) |
|
426 |
|
427 def process(self, url, raise_on_error=False): |
|
428 """main callback: process the url""" |
|
429 raise NotImplementedError |
|
430 |
|
431 @deprecated('[3.21] use the new store API') |
|
432 def before_entity_copy(self, entity, sourceparams): |
|
433 raise NotImplementedError |
|
434 |
|
435 @deprecated('[3.21] use the new store API') |
|
436 def after_entity_copy(self, entity, sourceparams): |
|
437 self.stats['created'].add(entity.eid) |
|
438 |
|
439 def created_during_pull(self, entity): |
|
440 return entity.eid in self.stats['created'] |
|
441 |
|
442 def updated_during_pull(self, entity): |
|
443 return entity.eid in self.stats['updated'] |
|
444 |
|
445 def notify_updated(self, entity): |
|
446 return self.stats['updated'].add(entity.eid) |
|
447 |
|
448 def notify_checked(self, entity): |
|
449 return self.stats['checked'].add(entity.eid) |
|
450 |
|
451 def is_deleted(self, extid, etype, eid): |
|
452 """return True if the entity of given external id, entity type and eid |
|
453 is actually deleted. Always return True by default, put more sensible |
|
454 stuff in sub-classes. |
|
455 """ |
|
456 return True |
|
457 |
|
458 def handle_deletion(self, config, cnx, myuris): |
|
459 if config['delete-entities'] and myuris: |
|
460 byetype = {} |
|
461 for extid, (eid, etype) in myuris.items(): |
|
462 if self.is_deleted(extid, etype, eid): |
|
463 byetype.setdefault(etype, []).append(str(eid)) |
|
464 for etype, eids in byetype.items(): |
|
465 self.warning('delete %s %s entities', len(eids), etype) |
|
466 cnx.execute('DELETE %s X WHERE X eid IN (%s)' |
|
467 % (etype, ','.join(eids))) |
|
468 cnx.commit() |
|
469 |
|
470 def update_if_necessary(self, entity, attrs): |
|
471 entity.complete(tuple(attrs)) |
|
472 # check modification date and compare attribute values to only update |
|
473 # what's actually needed |
|
474 self.notify_checked(entity) |
|
475 mdate = attrs.get('modification_date') |
|
476 if not mdate or mdate > entity.modification_date: |
|
477 attrs = dict( (k, v) for k, v in attrs.items() |
|
478 if v != getattr(entity, k)) |
|
479 if attrs: |
|
480 entity.cw_set(**attrs) |
|
481 self.notify_updated(entity) |
|
482 |
|
483 |
|
484 class DataFeedXMLParser(DataFeedParser): |
|
485 |
|
486 @deprecated() |
|
487 def process(self, url, raise_on_error=False): |
|
488 """IDataFeedParser main entry point""" |
|
489 try: |
|
490 parsed = self.parse(url) |
|
491 except Exception as ex: |
|
492 if raise_on_error: |
|
493 raise |
|
494 self.import_log.record_error(str(ex)) |
|
495 return True |
|
496 error = False |
|
497 commit = self._cw.commit |
|
498 rollback = self._cw.rollback |
|
499 for args in parsed: |
|
500 try: |
|
501 self.process_item(*args, raise_on_error=raise_on_error) |
|
502 # commit+set_cnxset instead of commit(free_cnxset=False) to let |
|
503 # other a chance to get our connections set |
|
504 commit() |
|
505 except ValidationError as exc: |
|
506 if raise_on_error: |
|
507 raise |
|
508 self.source.error('Skipping %s because of validation error %s' |
|
509 % (args, exc)) |
|
510 rollback() |
|
511 error = True |
|
512 return error |
|
513 |
|
514 def parse(self, url): |
|
515 stream = self.retrieve_url(url) |
|
516 return self.parse_etree(etree.parse(stream).getroot()) |
|
517 |
|
518 def parse_etree(self, document): |
|
519 return [(document,)] |
|
520 |
|
521 def process_item(self, *args, **kwargs): |
|
522 raise NotImplementedError |
|
523 |
|
524 def is_deleted(self, extid, etype, eid): |
|
525 if extid.startswith('file://'): |
|
526 return exists(extid[7:]) |
|
527 |
|
528 url = self.normalize_url(extid) |
|
529 # first, try to use cwclientlib if it's available and if the |
|
530 # url matches a configuration entry in ~/.config/cwclientlibrc |
|
531 try: |
|
532 from cwclientlib import cwproxy_for |
|
533 # parse url again since it has been normalized |
|
534 cnx = cwproxy_for(url) |
|
535 cnx.timeout = self.source.http_timeout |
|
536 self.source.info('Using cwclientlib for checking %s' % url) |
|
537 return cnx.get(url).status_code == 404 |
|
538 except (ImportError, ValueError, EnvironmentError) as exc: |
|
539 # ImportError: not available |
|
540 # ValueError: no config entry found |
|
541 # EnvironmentError: no cwclientlib config file found |
|
542 self.source.debug(str(exc)) |
|
543 |
|
544 # no chance with cwclientlib, fall back to former implementation |
|
545 if urlparse(url).scheme in ('http', 'https'): |
|
546 try: |
|
547 _OPENER.open(url, timeout=self.source.http_timeout) |
|
548 except HTTPError as ex: |
|
549 if ex.code == 404: |
|
550 return True |
|
551 return False |
|
552 |
|
553 |
|
554 class URLLibResponseAdapter(object): |
|
555 """Thin wrapper to be used to fake a value returned by urllib2.urlopen""" |
|
556 def __init__(self, stream, url, code=200): |
|
557 self._stream = stream |
|
558 self._url = url |
|
559 self.code = code |
|
560 |
|
561 def read(self, *args): |
|
562 return self._stream.read(*args) |
|
563 |
|
564 def geturl(self): |
|
565 return self._url |
|
566 |
|
567 def getcode(self): |
|
568 return self.code |
|
569 |
|
570 |
|
571 # use a cookie enabled opener to use session cookie if any |
|
572 _OPENER = build_opener() |
|
573 try: |
|
574 from logilab.common import urllib2ext |
|
575 _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler()) |
|
576 except ImportError: # python-kerberos not available |
|
577 pass |
|
578 _OPENER.add_handler(HTTPCookieProcessor(CookieJar())) |
|