server/sources/datafeed.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """datafeed sources: copy data from an external data stream into the system
       
    19 database
       
    20 """
       
    21 
       
    22 from io import BytesIO
       
    23 from os.path import exists
       
    24 from datetime import datetime, timedelta
       
    25 
       
    26 from six import text_type
       
    27 from six.moves.urllib.parse import urlparse
       
    28 from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
       
    29 from six.moves.urllib.error import HTTPError
       
    30 from six.moves.http_cookiejar import CookieJar
       
    31 
       
    32 from pytz import utc
       
    33 from lxml import etree
       
    34 
       
    35 from logilab.common.deprecation import deprecated
       
    36 
       
    37 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
       
    38 from cubicweb.server.repository import preprocess_inlined_relations
       
    39 from cubicweb.server.sources import AbstractSource
       
    40 from cubicweb.appobject import AppObject
       
    41 
       
    42 
       
    43 class DataFeedSource(AbstractSource):
       
    44     use_cwuri_as_url = True
       
    45 
       
    46     options = (
       
    47         ('synchronize',
       
    48          {'type' : 'yn',
       
    49           'default': True,
       
    50           'help': ('Is the repository responsible to automatically import '
       
    51                    'content from this source? '
       
    52                    'You should say yes unless you don\'t want this behaviour '
       
    53                    'or if you use a multiple repositories setup, in which '
       
    54                    'case you should say yes on one repository, no on others.'),
       
    55           'group': 'datafeed-source', 'level': 2,
       
    56           }),
       
    57         ('synchronization-interval',
       
    58          {'type' : 'time',
       
    59           'default': '5min',
       
    60           'help': ('Interval in seconds between synchronization with the '
       
    61                    'external source (default to 5 minutes, must be >= 1 min).'),
       
    62           'group': 'datafeed-source', 'level': 2,
       
    63           }),
       
    64         ('max-lock-lifetime',
       
    65          {'type' : 'time',
       
    66           'default': '1h',
       
    67           'help': ('Maximum time allowed for a synchronization to be run. '
       
    68                    'Exceeded that time, the synchronization will be considered '
       
    69                    'as having failed and not properly released the lock, hence '
       
    70                    'it won\'t be considered'),
       
    71           'group': 'datafeed-source', 'level': 2,
       
    72           }),
       
    73         ('delete-entities',
       
    74          {'type' : 'yn',
       
    75           'default': False,
       
    76           'help': ('Should already imported entities not found anymore on the '
       
    77                    'external source be deleted?'),
       
    78           'group': 'datafeed-source', 'level': 2,
       
    79           }),
       
    80         ('logs-lifetime',
       
    81          {'type': 'time',
       
    82           'default': '10d',
       
    83           'help': ('Time before logs from datafeed imports are deleted.'),
       
    84           'group': 'datafeed-source', 'level': 2,
       
    85           }),
       
    86         ('http-timeout',
       
    87          {'type': 'time',
       
    88           'default': '1min',
       
    89           'help': ('Timeout of HTTP GET requests, when synchronizing a source.'),
       
    90           'group': 'datafeed-source', 'level': 2,
       
    91           }),
       
    92         ('use-cwuri-as-url',
       
    93          {'type': 'yn',
       
    94           'default': None, # explicitly unset
       
    95           'help': ('Use cwuri (i.e. external URL) for link to the entity '
       
    96                    'instead of its local URL.'),
       
    97           'group': 'datafeed-source', 'level': 1,
       
    98           }),
       
    99         )
       
   100 
       
   101     def check_config(self, source_entity):
       
   102         """check configuration of source entity"""
       
   103         typed_config = super(DataFeedSource, self).check_config(source_entity)
       
   104         if typed_config['synchronization-interval'] < 60:
       
   105             _ = source_entity._cw._
       
   106             msg = _('synchronization-interval must be greater than 1 minute')
       
   107             raise ValidationError(source_entity.eid, {'config': msg})
       
   108         return typed_config
       
   109 
       
   110     def _entity_update(self, source_entity):
       
   111         super(DataFeedSource, self)._entity_update(source_entity)
       
   112         self.parser_id = source_entity.parser
       
   113         self.latest_retrieval = source_entity.latest_retrieval
       
   114 
       
   115     def update_config(self, source_entity, typed_config):
       
   116         """update configuration from source entity. `typed_config` is config
       
   117         properly typed with defaults set
       
   118         """
       
   119         super(DataFeedSource, self).update_config(source_entity, typed_config)
       
   120         self.synchro_interval = timedelta(seconds=typed_config['synchronization-interval'])
       
   121         self.max_lock_lifetime = timedelta(seconds=typed_config['max-lock-lifetime'])
       
   122         self.http_timeout = typed_config['http-timeout']
       
   123         # if typed_config['use-cwuri-as-url'] is set, we have to update
       
   124         # use_cwuri_as_url attribute and public configuration dictionary
       
   125         # accordingly
       
   126         if typed_config['use-cwuri-as-url'] is not None:
       
   127             self.use_cwuri_as_url = typed_config['use-cwuri-as-url']
       
   128             self.public_config['use-cwuri-as-url'] = self.use_cwuri_as_url
       
   129 
       
   130     def init(self, activated, source_entity):
       
   131         super(DataFeedSource, self).init(activated, source_entity)
       
   132         self.parser_id = source_entity.parser
       
   133         self.load_mapping(source_entity._cw)
       
   134 
       
   135     def _get_parser(self, cnx, **kwargs):
       
   136         if self.parser_id is None:
       
   137             self.warning('No parser defined on source %r', self)
       
   138             raise ObjectNotFound()
       
   139         return self.repo.vreg['parsers'].select(
       
   140             self.parser_id, cnx, source=self, **kwargs)
       
   141 
       
   142     def load_mapping(self, cnx):
       
   143         self.mapping = {}
       
   144         self.mapping_idx = {}
       
   145         try:
       
   146             parser = self._get_parser(cnx)
       
   147         except (RegistryNotFound, ObjectNotFound):
       
   148             return # no parser yet, don't go further
       
   149         self._load_mapping(cnx, parser=parser)
       
   150 
       
   151     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
       
   152         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   153         if parser is None:
       
   154             parser = self._get_parser(schemacfg._cw)
       
   155         parser.add_schema_config(schemacfg, checkonly)
       
   156 
       
   157     def del_schema_config(self, schemacfg, checkonly=False, parser=None):
       
   158         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   159         if parser is None:
       
   160             parser = self._get_parser(schemacfg._cw)
       
   161         parser.del_schema_config(schemacfg, checkonly)
       
   162 
       
   163     def fresh(self):
       
   164         if self.latest_retrieval is None:
       
   165             return False
       
   166         return datetime.now(tz=utc) < (self.latest_retrieval + self.synchro_interval)
       
   167 
       
   168     def update_latest_retrieval(self, cnx):
       
   169         self.latest_retrieval = datetime.now(tz=utc)
       
   170         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   171                     {'x': self.eid, 'date': self.latest_retrieval})
       
   172         cnx.commit()
       
   173 
       
   174     def acquire_synchronization_lock(self, cnx):
       
   175         # XXX race condition until WHERE of SET queries is executed using
       
   176         # 'SELECT FOR UPDATE'
       
   177         now = datetime.now(tz=utc)
       
   178         if not cnx.execute(
       
   179             'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
       
   180             'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
       
   181             {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
       
   182             self.error('concurrent synchronization detected, skip pull')
       
   183             cnx.commit()
       
   184             return False
       
   185         cnx.commit()
       
   186         return True
       
   187 
       
   188     def release_synchronization_lock(self, cnx):
       
   189         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
       
   190                     {'x': self.eid})
       
   191         cnx.commit()
       
   192 
       
   193     def pull_data(self, cnx, force=False, raise_on_error=False):
       
   194         """Launch synchronization of the source if needed.
       
   195 
       
   196         This method is responsible to handle commit/rollback on the given
       
   197         connection.
       
   198         """
       
   199         if not force and self.fresh():
       
   200             return {}
       
   201         if not self.acquire_synchronization_lock(cnx):
       
   202             return {}
       
   203         try:
       
   204             return self._pull_data(cnx, force, raise_on_error)
       
   205         finally:
       
   206             cnx.rollback() # rollback first in case there is some dirty
       
   207                            # transaction remaining
       
   208             self.release_synchronization_lock(cnx)
       
   209 
       
   210     def _pull_data(self, cnx, force=False, raise_on_error=False):
       
   211         importlog = self.init_import_log(cnx)
       
   212         myuris = self.source_cwuris(cnx)
       
   213         try:
       
   214             parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
       
   215         except ObjectNotFound:
       
   216             return {}
       
   217         if self.process_urls(parser, self.urls, raise_on_error):
       
   218             self.warning("some error occurred, don't attempt to delete entities")
       
   219         else:
       
   220             parser.handle_deletion(self.config, cnx, myuris)
       
   221         self.update_latest_retrieval(cnx)
       
   222         stats = parser.stats
       
   223         if stats.get('created'):
       
   224             importlog.record_info('added %s entities' % len(stats['created']))
       
   225         if stats.get('updated'):
       
   226             importlog.record_info('updated %s entities' % len(stats['updated']))
       
   227         importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
       
   228         cnx.commit()
       
   229         return stats
       
   230 
       
   231     def process_urls(self, parser, urls, raise_on_error=False):
       
   232         error = False
       
   233         for url in urls:
       
   234             self.info('pulling data from %s', url)
       
   235             try:
       
   236                 if parser.process(url, raise_on_error):
       
   237                     error = True
       
   238             except IOError as exc:
       
   239                 if raise_on_error:
       
   240                     raise
       
   241                 parser.import_log.record_error(
       
   242                     'could not pull data while processing %s: %s'
       
   243                     % (url, exc))
       
   244                 error = True
       
   245             except Exception as exc:
       
   246                 if raise_on_error:
       
   247                     raise
       
   248                 self.exception('error while processing %s: %s',
       
   249                                url, exc)
       
   250                 error = True
       
   251         return error
       
   252 
       
   253     @deprecated('[3.21] use the new store API')
       
   254     def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
       
   255         """called by the repository when an eid has been attributed for an
       
   256         entity stored here but the entity has not been inserted in the system
       
   257         table yet.
       
   258 
       
   259         This method must return the an Entity instance representation of this
       
   260         entity.
       
   261         """
       
   262         entity = super(DataFeedSource, self).before_entity_insertion(
       
   263             cnx, lid, etype, eid, sourceparams)
       
   264         entity.cw_edited['cwuri'] = lid.decode('utf-8')
       
   265         entity.cw_edited.set_defaults()
       
   266         sourceparams['parser'].before_entity_copy(entity, sourceparams)
       
   267         return entity
       
   268 
       
   269     @deprecated('[3.21] use the new store API')
       
   270     def after_entity_insertion(self, cnx, lid, entity, sourceparams):
       
   271         """called by the repository after an entity stored here has been
       
   272         inserted in the system table.
       
   273         """
       
   274         relations = preprocess_inlined_relations(cnx, entity)
       
   275         if cnx.is_hook_category_activated('integrity'):
       
   276             entity.cw_edited.check(creation=True)
       
   277         self.repo.system_source.add_entity(cnx, entity)
       
   278         entity.cw_edited.saved = entity._cw_is_saved = True
       
   279         sourceparams['parser'].after_entity_copy(entity, sourceparams)
       
   280         # call hooks for inlined relations
       
   281         call_hooks = self.repo.hm.call_hooks
       
   282         if self.should_call_hooks:
       
   283             for attr, value in relations:
       
   284                 call_hooks('before_add_relation', cnx,
       
   285                            eidfrom=entity.eid, rtype=attr, eidto=value)
       
   286                 call_hooks('after_add_relation', cnx,
       
   287                            eidfrom=entity.eid, rtype=attr, eidto=value)
       
   288 
       
   289     def source_cwuris(self, cnx):
       
   290         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
       
   291                'WHERE entities.eid=cw_source_relation.eid_from '
       
   292                'AND cw_source_relation.eid_to=%s' % self.eid)
       
   293         return dict((self.decode_extid(uri), (eid, type))
       
   294                     for uri, eid, type in cnx.system_sql(sql).fetchall())
       
   295 
       
   296     def init_import_log(self, cnx, **kwargs):
       
   297         dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
       
   298                                        start_timestamp=datetime.now(tz=utc),
       
   299                                        **kwargs)
       
   300         dataimport.init()
       
   301         return dataimport
       
   302 
       
   303 
       
   304 class DataFeedParser(AppObject):
       
   305     __registry__ = 'parsers'
       
   306 
       
   307     def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
       
   308         super(DataFeedParser, self).__init__(cnx, **kwargs)
       
   309         self.source = source
       
   310         self.sourceuris = sourceuris
       
   311         self.import_log = import_log
       
   312         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
       
   313 
       
   314     def normalize_url(self, url):
       
   315         """Normalize an url by looking if there is a replacement for it in
       
   316         `cubicweb.sobjects.URL_MAPPING`.
       
   317 
       
   318         This dictionary allow to redirect from one host to another, which may be
       
   319         useful for example in case of test instance using production data, while
       
   320         you don't want to load the external source nor to hack your `/etc/hosts`
       
   321         file.
       
   322         """
       
   323         # local import mandatory, it's available after registration
       
   324         from cubicweb.sobjects import URL_MAPPING
       
   325         for mappedurl in URL_MAPPING:
       
   326             if url.startswith(mappedurl):
       
   327                 return url.replace(mappedurl, URL_MAPPING[mappedurl], 1)
       
   328         return url
       
   329 
       
   330     def retrieve_url(self, url):
       
   331         """Return stream linked by the given url:
       
   332         * HTTP urls will be normalized (see :meth:`normalize_url`)
       
   333         * handle file:// URL
       
   334         * other will be considered as plain content, useful for testing purpose
       
   335 
       
   336         For http URLs, it will try to find a cwclientlib config entry
       
   337         (if available) and use it as requester.
       
   338         """
       
   339         purl = urlparse(url)
       
   340         if purl.scheme == 'file':
       
   341             return URLLibResponseAdapter(open(url[7:]), url)
       
   342 
       
   343         url = self.normalize_url(url)
       
   344 
       
   345         # first, try to use cwclientlib if it's available and if the
       
   346         # url matches a configuration entry in ~/.config/cwclientlibrc
       
   347         try:
       
   348             from cwclientlib import cwproxy_for
       
   349             # parse url again since it has been normalized
       
   350             cnx = cwproxy_for(url)
       
   351             cnx.timeout = self.source.http_timeout
       
   352             self.source.info('Using cwclientlib for %s' % url)
       
   353             resp = cnx.get(url)
       
   354             resp.raise_for_status()
       
   355             return URLLibResponseAdapter(BytesIO(resp.text), url)
       
   356         except (ImportError, ValueError, EnvironmentError) as exc:
       
   357             # ImportError: not available
       
   358             # ValueError: no config entry found
       
   359             # EnvironmentError: no cwclientlib config file found
       
   360             self.source.debug(str(exc))
       
   361 
       
   362         # no chance with cwclientlib, fall back to former implementation
       
   363         if purl.scheme in ('http', 'https'):
       
   364             self.source.info('GET %s', url)
       
   365             req = Request(url)
       
   366             return _OPENER.open(req, timeout=self.source.http_timeout)
       
   367 
       
   368         # url is probably plain content
       
   369         return URLLibResponseAdapter(BytesIO(url.encode('ascii')), url)
       
   370 
       
   371     def add_schema_config(self, schemacfg, checkonly=False):
       
   372         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   373         msg = schemacfg._cw._("this parser doesn't use a mapping")
       
   374         raise ValidationError(schemacfg.eid, {None: msg})
       
   375 
       
   376     def del_schema_config(self, schemacfg, checkonly=False):
       
   377         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   378         msg = schemacfg._cw._("this parser doesn't use a mapping")
       
   379         raise ValidationError(schemacfg.eid, {None: msg})
       
   380 
       
   381     @deprecated('[3.21] use the new store API')
       
   382     def extid2entity(self, uri, etype, **sourceparams):
       
   383         """Return an entity for the given uri. May return None if it should be
       
   384         skipped.
       
   385 
       
   386         If a `raise_on_error` keyword parameter is passed, a ValidationError
       
   387         exception may be raised.
       
   388         """
       
   389         raise_on_error = sourceparams.pop('raise_on_error', False)
       
   390         cnx = self._cw
       
   391         # if cwsource is specified and repository has a source with the same
       
   392         # name, call extid2eid on that source so entity will be properly seen as
       
   393         # coming from this source
       
   394         source_uri = sourceparams.pop('cwsource', None)
       
   395         if source_uri is not None and source_uri != 'system':
       
   396             source = cnx.repo.sources_by_uri.get(source_uri, self.source)
       
   397         else:
       
   398             source = self.source
       
   399         sourceparams['parser'] = self
       
   400         if isinstance(uri, text_type):
       
   401             uri = uri.encode('utf-8')
       
   402         try:
       
   403             eid = cnx.repo.extid2eid(source, uri, etype, cnx,
       
   404                                      sourceparams=sourceparams)
       
   405         except ValidationError as ex:
       
   406             if raise_on_error:
       
   407                 raise
       
   408             self.source.critical('error while creating %s: %s', etype, ex)
       
   409             self.import_log.record_error('error while creating %s: %s'
       
   410                                          % (etype, ex))
       
   411             return None
       
   412         if eid < 0:
       
   413             # entity has been moved away from its original source
       
   414             #
       
   415             # Don't give etype to entity_from_eid so we get UnknownEid if the
       
   416             # entity has been removed
       
   417             try:
       
   418                 entity = cnx.entity_from_eid(-eid)
       
   419             except UnknownEid:
       
   420                 return None
       
   421             self.notify_updated(entity) # avoid later update from the source's data
       
   422             return entity
       
   423         if self.sourceuris is not None:
       
   424             self.sourceuris.pop(str(uri), None)
       
   425         return cnx.entity_from_eid(eid, etype)
       
   426 
       
   427     def process(self, url, raise_on_error=False):
       
   428         """main callback: process the url"""
       
   429         raise NotImplementedError
       
   430 
       
   431     @deprecated('[3.21] use the new store API')
       
   432     def before_entity_copy(self, entity, sourceparams):
       
   433         raise NotImplementedError
       
   434 
       
   435     @deprecated('[3.21] use the new store API')
       
   436     def after_entity_copy(self, entity, sourceparams):
       
   437         self.stats['created'].add(entity.eid)
       
   438 
       
   439     def created_during_pull(self, entity):
       
   440         return entity.eid in self.stats['created']
       
   441 
       
   442     def updated_during_pull(self, entity):
       
   443         return entity.eid in self.stats['updated']
       
   444 
       
   445     def notify_updated(self, entity):
       
   446         return self.stats['updated'].add(entity.eid)
       
   447 
       
   448     def notify_checked(self, entity):
       
   449         return self.stats['checked'].add(entity.eid)
       
   450 
       
   451     def is_deleted(self, extid, etype, eid):
       
   452         """return True if the entity of given external id, entity type and eid
       
   453         is actually deleted. Always return True by default, put more sensible
       
   454         stuff in sub-classes.
       
   455         """
       
   456         return True
       
   457 
       
   458     def handle_deletion(self, config, cnx, myuris):
       
   459         if config['delete-entities'] and myuris:
       
   460             byetype = {}
       
   461             for extid, (eid, etype) in myuris.items():
       
   462                 if self.is_deleted(extid, etype, eid):
       
   463                     byetype.setdefault(etype, []).append(str(eid))
       
   464             for etype, eids in byetype.items():
       
   465                 self.warning('delete %s %s entities', len(eids), etype)
       
   466                 cnx.execute('DELETE %s X WHERE X eid IN (%s)'
       
   467                             % (etype, ','.join(eids)))
       
   468             cnx.commit()
       
   469 
       
   470     def update_if_necessary(self, entity, attrs):
       
   471         entity.complete(tuple(attrs))
       
   472         # check modification date and compare attribute values to only update
       
   473         # what's actually needed
       
   474         self.notify_checked(entity)
       
   475         mdate = attrs.get('modification_date')
       
   476         if not mdate or mdate > entity.modification_date:
       
   477             attrs = dict( (k, v) for k, v in attrs.items()
       
   478                           if v != getattr(entity, k))
       
   479             if attrs:
       
   480                 entity.cw_set(**attrs)
       
   481                 self.notify_updated(entity)
       
   482 
       
   483 
       
   484 class DataFeedXMLParser(DataFeedParser):
       
   485 
       
   486     @deprecated()
       
   487     def process(self, url, raise_on_error=False):
       
   488         """IDataFeedParser main entry point"""
       
   489         try:
       
   490             parsed = self.parse(url)
       
   491         except Exception as ex:
       
   492             if raise_on_error:
       
   493                 raise
       
   494             self.import_log.record_error(str(ex))
       
   495             return True
       
   496         error = False
       
   497         commit = self._cw.commit
       
   498         rollback = self._cw.rollback
       
   499         for args in parsed:
       
   500             try:
       
   501                 self.process_item(*args, raise_on_error=raise_on_error)
       
   502                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
       
   503                 # other a chance to get our connections set
       
   504                 commit()
       
   505             except ValidationError as exc:
       
   506                 if raise_on_error:
       
   507                     raise
       
   508                 self.source.error('Skipping %s because of validation error %s'
       
   509                                   % (args, exc))
       
   510                 rollback()
       
   511                 error = True
       
   512         return error
       
   513 
       
   514     def parse(self, url):
       
   515         stream = self.retrieve_url(url)
       
   516         return self.parse_etree(etree.parse(stream).getroot())
       
   517 
       
   518     def parse_etree(self, document):
       
   519         return [(document,)]
       
   520 
       
   521     def process_item(self, *args, **kwargs):
       
   522         raise NotImplementedError
       
   523 
       
   524     def is_deleted(self, extid, etype, eid):
       
   525         if extid.startswith('file://'):
       
   526             return exists(extid[7:])
       
   527 
       
   528         url = self.normalize_url(extid)
       
   529         # first, try to use cwclientlib if it's available and if the
       
   530         # url matches a configuration entry in ~/.config/cwclientlibrc
       
   531         try:
       
   532             from cwclientlib import cwproxy_for
       
   533             # parse url again since it has been normalized
       
   534             cnx = cwproxy_for(url)
       
   535             cnx.timeout = self.source.http_timeout
       
   536             self.source.info('Using cwclientlib for checking %s' % url)
       
   537             return cnx.get(url).status_code == 404
       
   538         except (ImportError, ValueError, EnvironmentError) as exc:
       
   539             # ImportError: not available
       
   540             # ValueError: no config entry found
       
   541             # EnvironmentError: no cwclientlib config file found
       
   542             self.source.debug(str(exc))
       
   543 
       
   544         # no chance with cwclientlib, fall back to former implementation
       
   545         if urlparse(url).scheme in ('http', 'https'):
       
   546             try:
       
   547                 _OPENER.open(url, timeout=self.source.http_timeout)
       
   548             except HTTPError as ex:
       
   549                 if ex.code == 404:
       
   550                     return True
       
   551         return False
       
   552 
       
   553 
       
   554 class URLLibResponseAdapter(object):
       
   555     """Thin wrapper to be used to fake a value returned by urllib2.urlopen"""
       
   556     def __init__(self, stream, url, code=200):
       
   557         self._stream = stream
       
   558         self._url = url
       
   559         self.code = code
       
   560 
       
   561     def read(self, *args):
       
   562         return self._stream.read(*args)
       
   563 
       
   564     def geturl(self):
       
   565         return self._url
       
   566 
       
   567     def getcode(self):
       
   568         return self.code
       
   569 
       
   570 
       
   571 # use a cookie enabled opener to use session cookie if any
       
   572 _OPENER = build_opener()
       
   573 try:
       
   574     from logilab.common import urllib2ext
       
   575     _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
       
   576 except ImportError: # python-kerberos not available
       
   577     pass
       
   578 _OPENER.add_handler(HTTPCookieProcessor(CookieJar()))