cubicweb/server/sources/datafeed.py
changeset 11057 0b59724cb3f2
parent 11042 079b32f4cd0d
child 11125 e717da3dc164
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cubicweb/server/sources/datafeed.py	Sat Jan 16 13:48:51 2016 +0100
@@ -0,0 +1,578 @@
+# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+#
+# This file is part of CubicWeb.
+#
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""datafeed sources: copy data from an external data stream into the system
+database
+"""
+
+from io import BytesIO
+from os.path import exists
+from datetime import datetime, timedelta
+
+from six import text_type
+from six.moves.urllib.parse import urlparse
+from six.moves.urllib.request import Request, build_opener, HTTPCookieProcessor
+from six.moves.urllib.error import HTTPError
+from six.moves.http_cookiejar import CookieJar
+
+from pytz import utc
+from lxml import etree
+
+from logilab.common.deprecation import deprecated
+
+from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
+from cubicweb.server.repository import preprocess_inlined_relations
+from cubicweb.server.sources import AbstractSource
+from cubicweb.appobject import AppObject
+
+
+class DataFeedSource(AbstractSource):
+    use_cwuri_as_url = True
+
+    options = (
+        ('synchronize',
+         {'type' : 'yn',
+          'default': True,
+          'help': ('Is the repository responsible to automatically import '
+                   'content from this source? '
+                   'You should say yes unless you don\'t want this behaviour '
+                   'or if you use a multiple repositories setup, in which '
+                   'case you should say yes on one repository, no on others.'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
+        ('synchronization-interval',
+         {'type' : 'time',
+          'default': '5min',
+          'help': ('Interval in seconds between synchronization with the '
+                   'external source (default to 5 minutes, must be >= 1 min).'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
+        ('max-lock-lifetime',
+         {'type' : 'time',
+          'default': '1h',
+          'help': ('Maximum time allowed for a synchronization to be run. '
+                   'Exceeded that time, the synchronization will be considered '
+                   'as having failed and not properly released the lock, hence '
+                   'it won\'t be considered'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
+        ('delete-entities',
+         {'type' : 'yn',
+          'default': False,
+          'help': ('Should already imported entities not found anymore on the '
+                   'external source be deleted?'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
+        ('logs-lifetime',
+         {'type': 'time',
+          'default': '10d',
+          'help': ('Time before logs from datafeed imports are deleted.'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
+        ('http-timeout',
+         {'type': 'time',
+          'default': '1min',
+          'help': ('Timeout of HTTP GET requests, when synchronizing a source.'),
+          'group': 'datafeed-source', 'level': 2,
+          }),
+        ('use-cwuri-as-url',
+         {'type': 'yn',
+          'default': None, # explicitly unset
+          'help': ('Use cwuri (i.e. external URL) for link to the entity '
+                   'instead of its local URL.'),
+          'group': 'datafeed-source', 'level': 1,
+          }),
+        )
+
+    def check_config(self, source_entity):
+        """check configuration of source entity"""
+        typed_config = super(DataFeedSource, self).check_config(source_entity)
+        if typed_config['synchronization-interval'] < 60:
+            _ = source_entity._cw._
+            msg = _('synchronization-interval must be greater than 1 minute')
+            raise ValidationError(source_entity.eid, {'config': msg})
+        return typed_config
+
+    def _entity_update(self, source_entity):
+        super(DataFeedSource, self)._entity_update(source_entity)
+        self.parser_id = source_entity.parser
+        self.latest_retrieval = source_entity.latest_retrieval
+
+    def update_config(self, source_entity, typed_config):
+        """update configuration from source entity. `typed_config` is config
+        properly typed with defaults set
+        """
+        super(DataFeedSource, self).update_config(source_entity, typed_config)
+        self.synchro_interval = timedelta(seconds=typed_config['synchronization-interval'])
+        self.max_lock_lifetime = timedelta(seconds=typed_config['max-lock-lifetime'])
+        self.http_timeout = typed_config['http-timeout']
+        # if typed_config['use-cwuri-as-url'] is set, we have to update
+        # use_cwuri_as_url attribute and public configuration dictionary
+        # accordingly
+        if typed_config['use-cwuri-as-url'] is not None:
+            self.use_cwuri_as_url = typed_config['use-cwuri-as-url']
+            self.public_config['use-cwuri-as-url'] = self.use_cwuri_as_url
+
+    def init(self, activated, source_entity):
+        super(DataFeedSource, self).init(activated, source_entity)
+        self.parser_id = source_entity.parser
+        self.load_mapping(source_entity._cw)
+
+    def _get_parser(self, cnx, **kwargs):
+        if self.parser_id is None:
+            self.warning('No parser defined on source %r', self)
+            raise ObjectNotFound()
+        return self.repo.vreg['parsers'].select(
+            self.parser_id, cnx, source=self, **kwargs)
+
+    def load_mapping(self, cnx):
+        self.mapping = {}
+        self.mapping_idx = {}
+        try:
+            parser = self._get_parser(cnx)
+        except (RegistryNotFound, ObjectNotFound):
+            return # no parser yet, don't go further
+        self._load_mapping(cnx, parser=parser)
+
+    def add_schema_config(self, schemacfg, checkonly=False, parser=None):
+        """added CWSourceSchemaConfig, modify mapping accordingly"""
+        if parser is None:
+            parser = self._get_parser(schemacfg._cw)
+        parser.add_schema_config(schemacfg, checkonly)
+
+    def del_schema_config(self, schemacfg, checkonly=False, parser=None):
+        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
+        if parser is None:
+            parser = self._get_parser(schemacfg._cw)
+        parser.del_schema_config(schemacfg, checkonly)
+
+    def fresh(self):
+        if self.latest_retrieval is None:
+            return False
+        return datetime.now(tz=utc) < (self.latest_retrieval + self.synchro_interval)
+
+    def update_latest_retrieval(self, cnx):
+        self.latest_retrieval = datetime.now(tz=utc)
+        cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+                    {'x': self.eid, 'date': self.latest_retrieval})
+        cnx.commit()
+
+    def acquire_synchronization_lock(self, cnx):
+        # XXX race condition until WHERE of SET queries is executed using
+        # 'SELECT FOR UPDATE'
+        now = datetime.now(tz=utc)
+        if not cnx.execute(
+            'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
+            'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
+            {'x': self.eid, 'now': now, 'maxdt': now - self.max_lock_lifetime}):
+            self.error('concurrent synchronization detected, skip pull')
+            cnx.commit()
+            return False
+        cnx.commit()
+        return True
+
+    def release_synchronization_lock(self, cnx):
+        cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
+                    {'x': self.eid})
+        cnx.commit()
+
+    def pull_data(self, cnx, force=False, raise_on_error=False):
+        """Launch synchronization of the source if needed.
+
+        This method is responsible to handle commit/rollback on the given
+        connection.
+        """
+        if not force and self.fresh():
+            return {}
+        if not self.acquire_synchronization_lock(cnx):
+            return {}
+        try:
+            return self._pull_data(cnx, force, raise_on_error)
+        finally:
+            cnx.rollback() # rollback first in case there is some dirty
+                           # transaction remaining
+            self.release_synchronization_lock(cnx)
+
+    def _pull_data(self, cnx, force=False, raise_on_error=False):
+        importlog = self.init_import_log(cnx)
+        myuris = self.source_cwuris(cnx)
+        try:
+            parser = self._get_parser(cnx, sourceuris=myuris, import_log=importlog)
+        except ObjectNotFound:
+            return {}
+        if self.process_urls(parser, self.urls, raise_on_error):
+            self.warning("some error occurred, don't attempt to delete entities")
+        else:
+            parser.handle_deletion(self.config, cnx, myuris)
+        self.update_latest_retrieval(cnx)
+        stats = parser.stats
+        if stats.get('created'):
+            importlog.record_info('added %s entities' % len(stats['created']))
+        if stats.get('updated'):
+            importlog.record_info('updated %s entities' % len(stats['updated']))
+        importlog.write_log(cnx, end_timestamp=self.latest_retrieval)
+        cnx.commit()
+        return stats
+
+    def process_urls(self, parser, urls, raise_on_error=False):
+        error = False
+        for url in urls:
+            self.info('pulling data from %s', url)
+            try:
+                if parser.process(url, raise_on_error):
+                    error = True
+            except IOError as exc:
+                if raise_on_error:
+                    raise
+                parser.import_log.record_error(
+                    'could not pull data while processing %s: %s'
+                    % (url, exc))
+                error = True
+            except Exception as exc:
+                if raise_on_error:
+                    raise
+                self.exception('error while processing %s: %s',
+                               url, exc)
+                error = True
+        return error
+
+    @deprecated('[3.21] use the new store API')
+    def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
+        """called by the repository when an eid has been attributed for an
+        entity stored here but the entity has not been inserted in the system
+        table yet.
+
+        This method must return the an Entity instance representation of this
+        entity.
+        """
+        entity = super(DataFeedSource, self).before_entity_insertion(
+            cnx, lid, etype, eid, sourceparams)
+        entity.cw_edited['cwuri'] = lid.decode('utf-8')
+        entity.cw_edited.set_defaults()
+        sourceparams['parser'].before_entity_copy(entity, sourceparams)
+        return entity
+
+    @deprecated('[3.21] use the new store API')
+    def after_entity_insertion(self, cnx, lid, entity, sourceparams):
+        """called by the repository after an entity stored here has been
+        inserted in the system table.
+        """
+        relations = preprocess_inlined_relations(cnx, entity)
+        if cnx.is_hook_category_activated('integrity'):
+            entity.cw_edited.check(creation=True)
+        self.repo.system_source.add_entity(cnx, entity)
+        entity.cw_edited.saved = entity._cw_is_saved = True
+        sourceparams['parser'].after_entity_copy(entity, sourceparams)
+        # call hooks for inlined relations
+        call_hooks = self.repo.hm.call_hooks
+        if self.should_call_hooks:
+            for attr, value in relations:
+                call_hooks('before_add_relation', cnx,
+                           eidfrom=entity.eid, rtype=attr, eidto=value)
+                call_hooks('after_add_relation', cnx,
+                           eidfrom=entity.eid, rtype=attr, eidto=value)
+
+    def source_cwuris(self, cnx):
+        sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
+               'WHERE entities.eid=cw_source_relation.eid_from '
+               'AND cw_source_relation.eid_to=%s' % self.eid)
+        return dict((self.decode_extid(uri), (eid, type))
+                    for uri, eid, type in cnx.system_sql(sql).fetchall())
+
+    def init_import_log(self, cnx, **kwargs):
+        dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
+                                       start_timestamp=datetime.now(tz=utc),
+                                       **kwargs)
+        dataimport.init()
+        return dataimport
+
+
+class DataFeedParser(AppObject):
+    __registry__ = 'parsers'
+
+    def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
+        super(DataFeedParser, self).__init__(cnx, **kwargs)
+        self.source = source
+        self.sourceuris = sourceuris
+        self.import_log = import_log
+        self.stats = {'created': set(), 'updated': set(), 'checked': set()}
+
+    def normalize_url(self, url):
+        """Normalize an url by looking if there is a replacement for it in
+        `cubicweb.sobjects.URL_MAPPING`.
+
+        This dictionary allow to redirect from one host to another, which may be
+        useful for example in case of test instance using production data, while
+        you don't want to load the external source nor to hack your `/etc/hosts`
+        file.
+        """
+        # local import mandatory, it's available after registration
+        from cubicweb.sobjects import URL_MAPPING
+        for mappedurl in URL_MAPPING:
+            if url.startswith(mappedurl):
+                return url.replace(mappedurl, URL_MAPPING[mappedurl], 1)
+        return url
+
+    def retrieve_url(self, url):
+        """Return stream linked by the given url:
+        * HTTP urls will be normalized (see :meth:`normalize_url`)
+        * handle file:// URL
+        * other will be considered as plain content, useful for testing purpose
+
+        For http URLs, it will try to find a cwclientlib config entry
+        (if available) and use it as requester.
+        """
+        purl = urlparse(url)
+        if purl.scheme == 'file':
+            return URLLibResponseAdapter(open(url[7:]), url)
+
+        url = self.normalize_url(url)
+
+        # first, try to use cwclientlib if it's available and if the
+        # url matches a configuration entry in ~/.config/cwclientlibrc
+        try:
+            from cwclientlib import cwproxy_for
+            # parse url again since it has been normalized
+            cnx = cwproxy_for(url)
+            cnx.timeout = self.source.http_timeout
+            self.source.info('Using cwclientlib for %s' % url)
+            resp = cnx.get(url)
+            resp.raise_for_status()
+            return URLLibResponseAdapter(BytesIO(resp.text), url)
+        except (ImportError, ValueError, EnvironmentError) as exc:
+            # ImportError: not available
+            # ValueError: no config entry found
+            # EnvironmentError: no cwclientlib config file found
+            self.source.debug(str(exc))
+
+        # no chance with cwclientlib, fall back to former implementation
+        if purl.scheme in ('http', 'https'):
+            self.source.info('GET %s', url)
+            req = Request(url)
+            return _OPENER.open(req, timeout=self.source.http_timeout)
+
+        # url is probably plain content
+        return URLLibResponseAdapter(BytesIO(url.encode('ascii')), url)
+
+    def add_schema_config(self, schemacfg, checkonly=False):
+        """added CWSourceSchemaConfig, modify mapping accordingly"""
+        msg = schemacfg._cw._("this parser doesn't use a mapping")
+        raise ValidationError(schemacfg.eid, {None: msg})
+
+    def del_schema_config(self, schemacfg, checkonly=False):
+        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
+        msg = schemacfg._cw._("this parser doesn't use a mapping")
+        raise ValidationError(schemacfg.eid, {None: msg})
+
+    @deprecated('[3.21] use the new store API')
+    def extid2entity(self, uri, etype, **sourceparams):
+        """Return an entity for the given uri. May return None if it should be
+        skipped.
+
+        If a `raise_on_error` keyword parameter is passed, a ValidationError
+        exception may be raised.
+        """
+        raise_on_error = sourceparams.pop('raise_on_error', False)
+        cnx = self._cw
+        # if cwsource is specified and repository has a source with the same
+        # name, call extid2eid on that source so entity will be properly seen as
+        # coming from this source
+        source_uri = sourceparams.pop('cwsource', None)
+        if source_uri is not None and source_uri != 'system':
+            source = cnx.repo.sources_by_uri.get(source_uri, self.source)
+        else:
+            source = self.source
+        sourceparams['parser'] = self
+        if isinstance(uri, text_type):
+            uri = uri.encode('utf-8')
+        try:
+            eid = cnx.repo.extid2eid(source, uri, etype, cnx,
+                                     sourceparams=sourceparams)
+        except ValidationError as ex:
+            if raise_on_error:
+                raise
+            self.source.critical('error while creating %s: %s', etype, ex)
+            self.import_log.record_error('error while creating %s: %s'
+                                         % (etype, ex))
+            return None
+        if eid < 0:
+            # entity has been moved away from its original source
+            #
+            # Don't give etype to entity_from_eid so we get UnknownEid if the
+            # entity has been removed
+            try:
+                entity = cnx.entity_from_eid(-eid)
+            except UnknownEid:
+                return None
+            self.notify_updated(entity) # avoid later update from the source's data
+            return entity
+        if self.sourceuris is not None:
+            self.sourceuris.pop(str(uri), None)
+        return cnx.entity_from_eid(eid, etype)
+
+    def process(self, url, raise_on_error=False):
+        """main callback: process the url"""
+        raise NotImplementedError
+
+    @deprecated('[3.21] use the new store API')
+    def before_entity_copy(self, entity, sourceparams):
+        raise NotImplementedError
+
+    @deprecated('[3.21] use the new store API')
+    def after_entity_copy(self, entity, sourceparams):
+        self.stats['created'].add(entity.eid)
+
+    def created_during_pull(self, entity):
+        return entity.eid in self.stats['created']
+
+    def updated_during_pull(self, entity):
+        return entity.eid in self.stats['updated']
+
+    def notify_updated(self, entity):
+        return self.stats['updated'].add(entity.eid)
+
+    def notify_checked(self, entity):
+        return self.stats['checked'].add(entity.eid)
+
+    def is_deleted(self, extid, etype, eid):
+        """return True if the entity of given external id, entity type and eid
+        is actually deleted. Always return True by default, put more sensible
+        stuff in sub-classes.
+        """
+        return True
+
+    def handle_deletion(self, config, cnx, myuris):
+        if config['delete-entities'] and myuris:
+            byetype = {}
+            for extid, (eid, etype) in myuris.items():
+                if self.is_deleted(extid, etype, eid):
+                    byetype.setdefault(etype, []).append(str(eid))
+            for etype, eids in byetype.items():
+                self.warning('delete %s %s entities', len(eids), etype)
+                cnx.execute('DELETE %s X WHERE X eid IN (%s)'
+                            % (etype, ','.join(eids)))
+            cnx.commit()
+
+    def update_if_necessary(self, entity, attrs):
+        entity.complete(tuple(attrs))
+        # check modification date and compare attribute values to only update
+        # what's actually needed
+        self.notify_checked(entity)
+        mdate = attrs.get('modification_date')
+        if not mdate or mdate > entity.modification_date:
+            attrs = dict( (k, v) for k, v in attrs.items()
+                          if v != getattr(entity, k))
+            if attrs:
+                entity.cw_set(**attrs)
+                self.notify_updated(entity)
+
+
+class DataFeedXMLParser(DataFeedParser):
+
+    @deprecated()
+    def process(self, url, raise_on_error=False):
+        """IDataFeedParser main entry point"""
+        try:
+            parsed = self.parse(url)
+        except Exception as ex:
+            if raise_on_error:
+                raise
+            self.import_log.record_error(str(ex))
+            return True
+        error = False
+        commit = self._cw.commit
+        rollback = self._cw.rollback
+        for args in parsed:
+            try:
+                self.process_item(*args, raise_on_error=raise_on_error)
+                # commit+set_cnxset instead of commit(free_cnxset=False) to let
+                # other a chance to get our connections set
+                commit()
+            except ValidationError as exc:
+                if raise_on_error:
+                    raise
+                self.source.error('Skipping %s because of validation error %s'
+                                  % (args, exc))
+                rollback()
+                error = True
+        return error
+
+    def parse(self, url):
+        stream = self.retrieve_url(url)
+        return self.parse_etree(etree.parse(stream).getroot())
+
+    def parse_etree(self, document):
+        return [(document,)]
+
+    def process_item(self, *args, **kwargs):
+        raise NotImplementedError
+
+    def is_deleted(self, extid, etype, eid):
+        if extid.startswith('file://'):
+            return exists(extid[7:])
+
+        url = self.normalize_url(extid)
+        # first, try to use cwclientlib if it's available and if the
+        # url matches a configuration entry in ~/.config/cwclientlibrc
+        try:
+            from cwclientlib import cwproxy_for
+            # parse url again since it has been normalized
+            cnx = cwproxy_for(url)
+            cnx.timeout = self.source.http_timeout
+            self.source.info('Using cwclientlib for checking %s' % url)
+            return cnx.get(url).status_code == 404
+        except (ImportError, ValueError, EnvironmentError) as exc:
+            # ImportError: not available
+            # ValueError: no config entry found
+            # EnvironmentError: no cwclientlib config file found
+            self.source.debug(str(exc))
+
+        # no chance with cwclientlib, fall back to former implementation
+        if urlparse(url).scheme in ('http', 'https'):
+            try:
+                _OPENER.open(url, timeout=self.source.http_timeout)
+            except HTTPError as ex:
+                if ex.code == 404:
+                    return True
+        return False
+
+
+class URLLibResponseAdapter(object):
+    """Thin wrapper to be used to fake a value returned by urllib2.urlopen"""
+    def __init__(self, stream, url, code=200):
+        self._stream = stream
+        self._url = url
+        self.code = code
+
+    def read(self, *args):
+        return self._stream.read(*args)
+
+    def geturl(self):
+        return self._url
+
+    def getcode(self):
+        return self.code
+
+
+# use a cookie enabled opener to use session cookie if any
+_OPENER = build_opener()
+try:
+    from logilab.common import urllib2ext
+    _OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())
+except ImportError: # python-kerberos not available
+    pass
+_OPENER.add_handler(HTTPCookieProcessor(CookieJar()))