--- a/server/sources/datafeed.py Mon Jan 04 18:40:30 2016 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,578 +0,0 @@
-# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""datafeed sources: copy data from an external data stream into the system
-database
-"""
-
-from io import BytesIO
-from os.path import exists
-from datetime import datetime, timedelta
-
-from six import text_type
-from six.moves.urllib.parse import urlparse
-from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
-from six.moves.urllib.error import HTTPError
-from six.moves.http_cookiejar import CookieJar
-
-from pytz import utc
-from lxml import etree
-
-from logilab.common.deprecation import deprecated
-
-from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
-from cubicweb.server.repository import preprocess_inlined_relations
-from cubicweb.server.sources import AbstractSource
-from cubicweb.appobject import AppObject
-
-
-class DataFeedSource(AbstractSource):
- use_cwuri_as_url = True
-
- options = (
- ('synchronize',
- {'type' : 'yn',
- 'default': True,
- 'help': ('Is the repository responsible to automatically import '
- 'content from this source? '
- 'You should say yes unless you don\'t want this behaviour '
- 'or if you use a multiple repositories setup, in which '
- 'case you should say yes on one repository, no on others.'),
- 'group': 'datafeed-source', 'level': 2,
- }),
- ('synchronization-interval',
- {'type' : 'time',
- 'default': '5min',
- 'help': ('Interval in seconds between synchronization with the '
- 'external source (default to 5 minutes, must be >= 1 min).'),
- 'group': 'datafeed-source', 'level': 2,
- }),
- ('max-lock-lifetime',
- {'type' : 'time',
- 'default': '1h',
- 'help': ('Maximum time allowed for a synchronization to be run. '
- 'Exceeded that time, the synchronization will be considered '
- 'as having failed and not properly released the lock, hence '
- 'it won\'t be considered'),
- 'group': 'datafeed-source', 'level': 2,
- }),
- ('delete-entities',
- {'type' : 'yn',
- 'default': False,
- 'help': ('Should already imported entities not found anymore on the '
- 'external source be deleted?'),
- 'group': 'datafeed-source', 'level': 2,
- }),
- ('logs-lifetime',
- {'type': 'time',
- 'default': '10d',
- 'help': ('Time before logs from datafeed imports are deleted.'),
- 'group': 'datafeed-source', 'level': 2,
- }),
- ('http-timeout',
- {'type': 'time',
- 'default': '1min',
- 'help': ('Timeout of HTTP GET requests, when synchronizing a source.'),
- 'group': 'datafeed-source', 'level': 2,
- }),
- ('use-cwuri-as-url',
- {'type': 'yn',
- 'default': None, # explicitly unset
- 'help': ('Use cwuri (i.e. external URL) for link to the entity '
- 'instead of its local URL.'),
- 'group': 'datafeed-source', 'level': 1,
- }),
- )
-
- def check_config(self, source_entity):
- """check configuration of source entity"""
- typed_config = super(DataFeedSource, self).check_config(source_entity)
- if typed_config['synchronization-interval'] < 60:
- _ = source_entity._cw._
- msg = _('synchronization-interval must be greater than 1 minute')
- raise ValidationError(source_entity.eid, {'config': msg})
- return typed_config
-
- def _entity_update(self, source_entity):
- super(DataFeedSource, self)._entity_update(source_entity)
- self.parser_id = source_entity.parser
- self.latest_retrieval = source_entity.latest_retrieval
-
- def update_config(self, source_entity, typed_config):
- """update configuration from source entity. `typed_config` is config
- properly typed with defaults set
- """
- super(DataFeedSource, self).update_config(source_entity, typed_config)
- self.synchro_interval = timedelta(seconds=typed_config['synchronization-interval'])
- self.max_lock_lifetime = timedelta(seconds=typed_config['max-lock-lifetime'])
- self.http_timeout = typed_config['http-timeout']
- # if typed_config['use-cwuri-as-url'] is set, we have to update
- # use_cwuri_as_url attribute and public configuration dictionary
- # accordingly
- if typed_config['use-cwuri-as-url'] is not None:
- self.use_cwuri_as_url = typed_config['use-cwuri-as-url']
- self.public_config['use-cwuri-as-url'] = self.use_cwuri_as_url
-
- def init(self, activated, source_entity):
- super(DataFeedSource, self).init(activated, source_entity)
- self.parser_id = source_entity.parser
- self.load_mapping(source_entity._cw)
-
- def _get_parser(self, cnx, **kwargs):
- if self.parser_id is None:
- self.warning('No parser defined on source %r', self)
- raise ObjectNotFound()
- return self.repo.vreg['parsers'].select(
- self.parser_id, cnx, source=self, **kwargs)
-
- def load_mapping(self, cnx):
- self.mapping = {}
- self.mapping_idx = {}
- try:
- parser = self._get_parser(cnx)
- except (RegistryNotFound, ObjectNotFound):
- return # no parser yet, don't go further
- self._load_mapping(cnx, parser=parser)
-
- def add_schema_config(self, schemacfg, checkonly=False, parser=None):
- """added CWSourceSchemaConfig, modify mapping accordingly"""
- if parser is None:
- parser = self._get_parser(schemacfg._cw)
- parser.add_schema_config(schemacfg, checkonly)
-
- def del_schema_config(self, schemacfg, checkonly=False, parser=None):
- """deleted CWSourceSchemaConfig, modify mapping accordingly"""
- if parser is None:
- parser = self._get_parser(schemacfg._cw)
- parser.del_schema_config(schemacfg, checkonly)
-
- def fresh(self):
- if self.latest_retrieval is None:
- return False
- return datetime.now(tz=utc) < (self.latest_retrieval + self.synchro_interval)
-
- def update_latest_retrieval(self, cnx):
- self.latest_retrieval = datetime.now(tz=utc)
- cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
- {'x': self.eid, 'date': self.latest_retrieval})
- cnx.commit()
-
- def acquire_synchronization_lock(self, cnx):
- # XXX race condition until WHERE of SET queries is executed using
- # 'SELECT FOR UPDATE'
- now = datetime.now(tz=utc)
- if not cnx.execute(
- 'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
- 'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
- {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
- self.error('concurrent synchronization detected, skip pull')
- cnx.commit()
- return False
- cnx.commit()
- return True
-
- def release_synchronization_lock(self, cnx):
- cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
- {'x': self.eid})
- cnx.commit()
-
- def pull_data(self, cnx, force=False, raise_on_error=False):
- """Launch synchronization of the source if needed.
-
- This method is responsible to handle commit/rollback on the given
- connection.
- """
- if not force and self.fresh():
- return {}
- if not self.acquire_synchronization_lock(cnx):
- return {}
- try:
- return self._pull_data(cnx, force, raise_on_error)
- finally:
- cnx.rollback() # rollback first in case there is some dirty
- # transaction remaining
- self.release_synchronization_lock(cnx)
-
- def _pull_data(self, cnx, force=False, raise_on_error=False):
- importlog = self.init_import_log(cnx)
- myuris = self.source_cwuris(cnx)
- try:
- parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
- except ObjectNotFound:
- return {}
- if self.process_urls(parser, self.urls, raise_on_error):
- self.warning("some error occurred, don't attempt to delete entities")
- else:
- parser.handle_deletion(self.config, cnx, myuris)
- self.update_latest_retrieval(cnx)
- stats = parser.stats
- if stats.get('created'):
- importlog.record_info('added %s entities' % len(stats['created']))
- if stats.get('updated'):
- importlog.record_info('updated %s entities' % len(stats['updated']))
- importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
- cnx.commit()
- return stats
-
- def process_urls(self, parser, urls, raise_on_error=False):
- error = False
- for url in urls:
- self.info('pulling data from %s', url)
- try:
- if parser.process(url, raise_on_error):
- error = True
- except IOError as exc:
- if raise_on_error:
- raise
- parser.import_log.record_error(
- 'could not pull data while processing %s: %s'
- % (url, exc))
- error = True
- except Exception as exc:
- if raise_on_error:
- raise
- self.exception('error while processing %s: %s',
- url, exc)
- error = True
- return error
-
- @deprecated('[3.21] use the new store API')
- def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
- """called by the repository when an eid has been attributed for an
- entity stored here but the entity has not been inserted in the system
- table yet.
-
- This method must return the an Entity instance representation of this
- entity.
- """
- entity = super(DataFeedSource, self).before_entity_insertion(
- cnx, lid, etype, eid, sourceparams)
- entity.cw_edited['cwuri'] = lid.decode('utf-8')
- entity.cw_edited.set_defaults()
- sourceparams['parser'].before_entity_copy(entity, sourceparams)
- return entity
-
- @deprecated('[3.21] use the new store API')
- def after_entity_insertion(self, cnx, lid, entity, sourceparams):
- """called by the repository after an entity stored here has been
- inserted in the system table.
- """
- relations = preprocess_inlined_relations(cnx, entity)
- if cnx.is_hook_category_activated('integrity'):
- entity.cw_edited.check(creation=True)
- self.repo.system_source.add_entity(cnx, entity)
- entity.cw_edited.saved = entity._cw_is_saved = True
- sourceparams['parser'].after_entity_copy(entity, sourceparams)
- # call hooks for inlined relations
- call_hooks = self.repo.hm.call_hooks
- if self.should_call_hooks:
- for attr, value in relations:
- call_hooks('before_add_relation', cnx,
- eidfrom=entity.eid, rtype=attr, eidto=value)
- call_hooks('after_add_relation', cnx,
- eidfrom=entity.eid, rtype=attr, eidto=value)
-
- def source_cwuris(self, cnx):
- sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
- 'WHERE entities.eid=cw_source_relation.eid_from '
- 'AND cw_source_relation.eid_to=%s' % self.eid)
- return dict((self.decode_extid(uri), (eid, type))
- for uri, eid, type in cnx.system_sql(sql).fetchall())
-
- def init_import_log(self, cnx, **kwargs):
- dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
- start_timestamp=datetime.now(tz=utc),
- **kwargs)
- dataimport.init()
- return dataimport
-
-
-class DataFeedParser(AppObject):
- __registry__ = 'parsers'
-
- def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
- super(DataFeedParser, self).__init__(cnx, **kwargs)
- self.source = source
- self.sourceuris = sourceuris
- self.import_log = import_log
- self.stats = {'created': set(), 'updated': set(), 'checked': set()}
-
- def normalize_url(self, url):
- """Normalize an url by looking if there is a replacement for it in
- `cubicweb.sobjects.URL_MAPPING`.
-
- This dictionary allow to redirect from one host to another, which may be
- useful for example in case of test instance using production data, while
- you don't want to load the external source nor to hack your `/etc/hosts`
- file.
- """
- # local import mandatory, it's available after registration
- from cubicweb.sobjects import URL_MAPPING
- for mappedurl in URL_MAPPING:
- if url.startswith(mappedurl):
- return url.replace(mappedurl, URL_MAPPING[mappedurl], 1)
- return url
-
- def retrieve_url(self, url):
- """Return stream linked by the given url:
- * HTTP urls will be normalized (see :meth:`normalize_url`)
- * handle file:// URL
- * other will be considered as plain content, useful for testing purpose
-
- For http URLs, it will try to find a cwclientlib config entry
- (if available) and use it as requester.
- """
- purl = urlparse(url)
- if purl.scheme == 'file':
- return URLLibResponseAdapter(open(url[7:]), url)
-
- url = self.normalize_url(url)
-
- # first, try to use cwclientlib if it's available and if the
- # url matches a configuration entry in ~/.config/cwclientlibrc
- try:
- from cwclientlib import cwproxy_for
- # parse url again since it has been normalized
- cnx = cwproxy_for(url)
- cnx.timeout = self.source.http_timeout
- self.source.info('Using cwclientlib for %s' % url)
- resp = cnx.get(url)
- resp.raise_for_status()
- return URLLibResponseAdapter(BytesIO(resp.text), url)
- except (ImportError, ValueError, EnvironmentError) as exc:
- # ImportError: not available
- # ValueError: no config entry found
- # EnvironmentError: no cwclientlib config file found
- self.source.debug(str(exc))
-
- # no chance with cwclientlib, fall back to former implementation
- if purl.scheme in ('http', 'https'):
- self.source.info('GET %s', url)
- req = Request(url)
- return _OPENER.open(req, timeout=self.source.http_timeout)
-
- # url is probably plain content
- return URLLibResponseAdapter(BytesIO(url.encode('ascii')), url)
-
- def add_schema_config(self, schemacfg, checkonly=False):
- """added CWSourceSchemaConfig, modify mapping accordingly"""
- msg = schemacfg._cw._("this parser doesn't use a mapping")
- raise ValidationError(schemacfg.eid, {None: msg})
-
- def del_schema_config(self, schemacfg, checkonly=False):
- """deleted CWSourceSchemaConfig, modify mapping accordingly"""
- msg = schemacfg._cw._("this parser doesn't use a mapping")
- raise ValidationError(schemacfg.eid, {None: msg})
-
- @deprecated('[3.21] use the new store API')
- def extid2entity(self, uri, etype, **sourceparams):
- """Return an entity for the given uri. May return None if it should be
- skipped.
-
- If a `raise_on_error` keyword parameter is passed, a ValidationError
- exception may be raised.
- """
- raise_on_error = sourceparams.pop('raise_on_error', False)
- cnx = self._cw
- # if cwsource is specified and repository has a source with the same
- # name, call extid2eid on that source so entity will be properly seen as
- # coming from this source
- source_uri = sourceparams.pop('cwsource', None)
- if source_uri is not None and source_uri != 'system':
- source = cnx.repo.sources_by_uri.get(source_uri, self.source)
- else:
- source = self.source
- sourceparams['parser'] = self
- if isinstance(uri, text_type):
- uri = uri.encode('utf-8')
- try:
- eid = cnx.repo.extid2eid(source, uri, etype, cnx,
- sourceparams=sourceparams)
- except ValidationError as ex:
- if raise_on_error:
- raise
- self.source.critical('error while creating %s: %s', etype, ex)
- self.import_log.record_error('error while creating %s: %s'
- % (etype, ex))
- return None
- if eid < 0:
- # entity has been moved away from its original source
- #
- # Don't give etype to entity_from_eid so we get UnknownEid if the
- # entity has been removed
- try:
- entity = cnx.entity_from_eid(-eid)
- except UnknownEid:
- return None
- self.notify_updated(entity) # avoid later update from the source's data
- return entity
- if self.sourceuris is not None:
- self.sourceuris.pop(str(uri), None)
- return cnx.entity_from_eid(eid, etype)
-
- def process(self, url, raise_on_error=False):
- """main callback: process the url"""
- raise NotImplementedError
-
- @deprecated('[3.21] use the new store API')
- def before_entity_copy(self, entity, sourceparams):
- raise NotImplementedError
-
- @deprecated('[3.21] use the new store API')
- def after_entity_copy(self, entity, sourceparams):
- self.stats['created'].add(entity.eid)
-
- def created_during_pull(self, entity):
- return entity.eid in self.stats['created']
-
- def updated_during_pull(self, entity):
- return entity.eid in self.stats['updated']
-
- def notify_updated(self, entity):
- return self.stats['updated'].add(entity.eid)
-
- def notify_checked(self, entity):
- return self.stats['checked'].add(entity.eid)
-
- def is_deleted(self, extid, etype, eid):
- """return True if the entity of given external id, entity type and eid
- is actually deleted. Always return True by default, put more sensible
- stuff in sub-classes.
- """
- return True
-
- def handle_deletion(self, config, cnx, myuris):
- if config['delete-entities'] and myuris:
- byetype = {}
- for extid, (eid, etype) in myuris.items():
- if self.is_deleted(extid, etype, eid):
- byetype.setdefault(etype, []).append(str(eid))
- for etype, eids in byetype.items():
- self.warning('delete %s %s entities', len(eids), etype)
- cnx.execute('DELETE %s X WHERE X eid IN (%s)'
- % (etype, ','.join(eids)))
- cnx.commit()
-
- def update_if_necessary(self, entity, attrs):
- entity.complete(tuple(attrs))
- # check modification date and compare attribute values to only update
- # what's actually needed
- self.notify_checked(entity)
- mdate = attrs.get('modification_date')
- if not mdate or mdate > entity.modification_date:
- attrs = dict( (k, v) for k, v in attrs.items()
- if v != getattr(entity, k))
- if attrs:
- entity.cw_set(**attrs)
- self.notify_updated(entity)
-
-
-class DataFeedXMLParser(DataFeedParser):
-
- @deprecated()
- def process(self, url, raise_on_error=False):
- """IDataFeedParser main entry point"""
- try:
- parsed = self.parse(url)
- except Exception as ex:
- if raise_on_error:
- raise
- self.import_log.record_error(str(ex))
- return True
- error = False
- commit = self._cw.commit
- rollback = self._cw.rollback
- for args in parsed:
- try:
- self.process_item(*args, raise_on_error=raise_on_error)
- # commit+set_cnxset instead of commit(free_cnxset=False) to let
- # other a chance to get our connections set
- commit()
- except ValidationError as exc:
- if raise_on_error:
- raise
- self.source.error('Skipping %s because of validation error %s'
- % (args, exc))
- rollback()
- error = True
- return error
-
- def parse(self, url):
- stream = self.retrieve_url(url)
- return self.parse_etree(etree.parse(stream).getroot())
-
- def parse_etree(self, document):
- return [(document,)]
-
- def process_item(self, *args, **kwargs):
- raise NotImplementedError
-
- def is_deleted(self, extid, etype, eid):
- if extid.startswith('file://'):
- return exists(extid[7:])
-
- url = self.normalize_url(extid)
- # first, try to use cwclientlib if it's available and if the
- # url matches a configuration entry in ~/.config/cwclientlibrc
- try:
- from cwclientlib import cwproxy_for
- # parse url again since it has been normalized
- cnx = cwproxy_for(url)
- cnx.timeout = self.source.http_timeout
- self.source.info('Using cwclientlib for checking %s' % url)
- return cnx.get(url).status_code == 404
- except (ImportError, ValueError, EnvironmentError) as exc:
- # ImportError: not available
- # ValueError: no config entry found
- # EnvironmentError: no cwclientlib config file found
- self.source.debug(str(exc))
-
- # no chance with cwclientlib, fall back to former implementation
- if urlparse(url).scheme in ('http', 'https'):
- try:
- _OPENER.open(url, timeout=self.source.http_timeout)
- except HTTPError as ex:
- if ex.code == 404:
- return True
- return False
-
-
-class URLLibResponseAdapter(object):
- """Thin wrapper to be used to fake a value returned by urllib2.urlopen"""
- def __init__(self, stream, url, code=200):
- self._stream = stream
- self._url = url
- self.code = code
-
- def read(self, *args):
- return self._stream.read(*args)
-
- def geturl(self):
- return self._url
-
- def getcode(self):
- return self.code
-
-
-# use a cookie enabled opener to use session cookie if any
-_OPENER = build_opener()
-try:
- from logilab.common import urllib2ext
- _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
-except ImportError: # python-kerberos not available
- pass
-_OPENER.add_handler(HTTPCookieProcessor(CookieJar()))