server/sources/datafeed.py
changeset 6957 ffda12be2e9f
child 6972 12aa5cd81ce5
equal deleted inserted replaced
6956:b172c383dbce 6957:ffda12be2e9f
       
     1 # copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """datafeed sources: copy data from an external data stream into the system
       
    19 database
       
    20 """
       
    21 from datetime import datetime, timedelta
       
    22 from base64 import b64decode
       
    23 
       
    24 from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError
       
    25 from cubicweb.server.sources import AbstractSource
       
    26 from cubicweb.appobject import AppObject
       
    27 
       
    28 class DataFeedSource(AbstractSource):
       
    29     copy_based_source = True
       
    30 
       
    31     options = (
       
    32         ('synchronize',
       
    33          {'type' : 'yn',
       
    34           'default': True,
       
    35           'help': ('Is the repository responsible to automatically import '
       
    36                    'content from this source? '
       
    37                    'You should say yes unless you don\'t want this behaviour '
       
    38                    'or if you use a multiple repositories setup, in which '
       
    39                    'case you should say yes on one repository, no on others.'),
       
    40           'group': 'datafeed-source', 'level': 2,
       
    41           }),
       
    42         ('synchronization-interval',
       
    43          {'type' : 'time',
       
    44           'default': '5min',
       
    45           'help': ('Interval in seconds between synchronization with the '
       
    46                    'external source (default to 5 minutes, must be >= 1 min).'),
       
    47           'group': 'datafeed-source', 'level': 2,
       
    48           }),
       
    49         ('delete-entities',
       
    50          {'type' : 'yn',
       
    51           'default': True,
       
    52           'help': ('Should already imported entities not found anymore on the '
       
    53                    'external source be deleted?'),
       
    54           'group': 'datafeed-source', 'level': 2,
       
    55           }),
       
    56 
       
    57         )
       
    58     def __init__(self, repo, source_config, eid=None):
       
    59         AbstractSource.__init__(self, repo, source_config, eid)
       
    60         self.update_config(None, self.check_conf_dict(eid, source_config))
       
    61 
       
    62     def check_config(self, source_entity):
       
    63         """check configuration of source entity"""
       
    64         typedconfig = super(DataFeedSource, self).check_config(source_entity)
       
    65         if typedconfig['synchronization-interval'] < 60:
       
    66             _ = source_entity._cw._
       
    67             msg = _('synchronization-interval must be greater than 1 minute')
       
    68             raise ValidationError(source_entity.eid, {'config': msg})
       
    69         return typedconfig
       
    70 
       
    71     def _entity_update(self, source_entity):
       
    72         source_entity.complete()
       
    73         self.parser = source_entity.parser
       
    74         self.latest_retrieval = source_entity.latest_retrieval
       
    75         self.urls = [url.strip() for url in source_entity.url.splitlines()
       
    76                      if url.strip()]
       
    77 
       
    78     def update_config(self, source_entity, typedconfig):
       
    79         """update configuration from source entity. `typedconfig` is config
       
    80         properly typed with defaults set
       
    81         """
       
    82         self.synchro_interval = timedelta(seconds=typedconfig['synchronization-interval'])
       
    83         if source_entity is not None:
       
    84             self._entity_update(source_entity)
       
    85         self.config = typedconfig
       
    86 
       
    87     def init(self, activated, source_entity):
       
    88         if activated:
       
    89             self._entity_update(source_entity)
       
    90         self.parser = source_entity.parser
       
    91         self.load_mapping(source_entity._cw)
       
    92 
       
    93     def _get_parser(self, session, **kwargs):
       
    94         return self.repo.vreg['parsers'].select(
       
    95             self.parser, session, source=self, **kwargs)
       
    96 
       
    97     def load_mapping(self, session):
       
    98         self.mapping = {}
       
    99         self.mapping_idx = {}
       
   100         try:
       
   101             parser = self._get_parser(session)
       
   102         except (RegistryNotFound, ObjectNotFound):
       
   103             return # no parser yet, don't go further
       
   104         self._load_mapping(session, parser=parser)
       
   105 
       
   106     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
       
   107         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   108         if parser is None:
       
   109             parser = self._get_parser(schemacfg._cw)
       
   110         parser.add_schema_config(schemacfg, checkonly)
       
   111 
       
   112     def del_schema_config(self, schemacfg, checkonly=False, parser=None):
       
   113         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   114         if parser is None:
       
   115             parser = self._get_parser(schemacfg._cw)
       
   116         parser.del_schema_config(schemacfg, checkonly)
       
   117 
       
   118     def fresh(self):
       
   119         if self.latest_retrieval is None:
       
   120             return False
       
   121         return datetime.now() < (self.latest_retrieval + self.synchro_interval)
       
   122 
       
   123     def pull_data(self, session, force=False):
       
   124         if not force and self.fresh():
       
   125             return
       
   126         if self.config['delete-entities']:
       
   127             myuris = self.source_cwuris(session)
       
   128         else:
       
   129             myuris = None
       
   130         parser = self._get_parser(session, sourceuris=myuris)
       
   131         error = False
       
   132         self.info('pulling data for source %s', self.uri)
       
   133         for url in self.urls:
       
   134             try:
       
   135                 parser.process(url)
       
   136             except IOError, exc:
       
   137                 self.error('could not pull data while processing %s: %s',
       
   138                            url, exc)
       
   139                 error = True
       
   140         if error:
       
   141             self.warning("some error occured, don't attempt to delete entities")
       
   142         elif self.config['delete-entities'] and myuris:
       
   143             byetype = {}
       
   144             for eid, etype in myuris.values():
       
   145                 byetype.setdefault(etype, []).append(str(eid))
       
   146             self.error('delete %s entities %s', self.uri, byetype)
       
   147             for etype, eids in byetype.iteritems():
       
   148                 session.execute('DELETE %s X WHERE X eid IN (%s)'
       
   149                                 % (etype, ','.join(eids)))
       
   150         self.latest_retrieval = datetime.now()
       
   151         session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   152                         {'x': self.eid, 'date': self.latest_retrieval})
       
   153         return parser.stats
       
   154 
       
   155     def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
       
   156         """called by the repository when an eid has been attributed for an
       
   157         entity stored here but the entity has not been inserted in the system
       
   158         table yet.
       
   159 
       
   160         This method must return the an Entity instance representation of this
       
   161         entity.
       
   162         """
       
   163         entity = super(DataFeedSource, self).before_entity_insertion(
       
   164             session, lid, etype, eid, sourceparams)
       
   165         entity.cw_edited['cwuri'] = unicode(lid)
       
   166         entity.cw_edited.set_defaults()
       
   167         sourceparams['parser'].before_entity_copy(entity, sourceparams)
       
   168         # avoid query to search full-text indexed attributes
       
   169         for attr in entity.e_schema.indexable_attributes():
       
   170             entity.cw_edited.setdefault(attr, u'')
       
   171         return entity
       
   172 
       
   173     def after_entity_insertion(self, session, lid, entity, sourceparams):
       
   174         """called by the repository after an entity stored here has been
       
   175         inserted in the system table.
       
   176         """
       
   177         if session.is_hook_category_activated('integrity'):
       
   178             entity.cw_edited.check(creation=True)
       
   179         self.repo.system_source.add_entity(session, entity)
       
   180         entity.cw_edited.saved = entity._cw_is_saved = True
       
   181         sourceparams['parser'].after_entity_copy(entity, sourceparams)
       
   182 
       
   183     def source_cwuris(self, session):
       
   184         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
       
   185                'WHERE entities.eid=cw_source_relation.eid_from '
       
   186                'AND cw_source_relation.eid_to=%s' % self.eid)
       
   187         return dict((b64decode(uri), (eid, type))
       
   188                     for uri, eid, type in session.system_sql(sql))
       
   189 
       
   190 
       
   191 class DataFeedParser(AppObject):
       
   192     __registry__ = 'parsers'
       
   193 
       
   194     def __init__(self, session, source, sourceuris=None):
       
   195         self._cw = session
       
   196         self.source = source
       
   197         self.sourceuris = sourceuris
       
   198         self.stats = {'created': set(),
       
   199                       'updated': set()}
       
   200 
       
   201     def add_schema_config(self, schemacfg, checkonly=False):
       
   202         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   203         msg = schemacfg._cw._("this parser doesn't use a mapping")
       
   204         raise ValidationError(schemacfg.eid, {None: msg})
       
   205 
       
   206     def del_schema_config(self, schemacfg, checkonly=False):
       
   207         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   208         msg = schemacfg._cw._("this parser doesn't use a mapping")
       
   209         raise ValidationError(schemacfg.eid, {None: msg})
       
   210 
       
   211     def extid2entity(self, uri, etype, **sourceparams):
       
   212         sourceparams['parser'] = self
       
   213         eid = self.source.extid2eid(str(uri), etype, self._cw,
       
   214                                     sourceparams=sourceparams)
       
   215         if self.sourceuris is not None:
       
   216             self.sourceuris.pop(str(uri), None)
       
   217         return self._cw.entity_from_eid(eid, etype)
       
   218 
       
   219     def process(self, url):
       
   220         """main callback: process the url"""
       
   221         raise NotImplementedError
       
   222 
       
   223     def before_entity_copy(self, entity, sourceparams):
       
   224         raise NotImplementedError
       
   225 
       
   226     def after_entity_copy(self, entity, sourceparams):
       
   227         self.stats['created'].add(entity.eid)
       
   228 
       
   229     def created_during_pull(self, entity):
       
   230         return entity.eid in self.stats['created']
       
   231 
       
   232     def updated_during_pull(self, entity):
       
   233         return entity.eid in self.stats['updated']
       
   234 
       
   235     def notify_updated(self, entity):
       
   236         return self.stats['updated'].add(entity.eid)