server/sources/datafeed.py
changeset 9897 fa44db7da2dc
parent 9825 946b483bc8a1
parent 9879 21278eb03bbf
child 9990 c84ad981fc4a
equal deleted inserted replaced
9892:928732ec00dd 9897:fa44db7da2dc
   124     def init(self, activated, source_entity):
   124     def init(self, activated, source_entity):
   125         super(DataFeedSource, self).init(activated, source_entity)
   125         super(DataFeedSource, self).init(activated, source_entity)
   126         self.parser_id = source_entity.parser
   126         self.parser_id = source_entity.parser
   127         self.load_mapping(source_entity._cw)
   127         self.load_mapping(source_entity._cw)
   128 
   128 
   129     def _get_parser(self, session, **kwargs):
   129     def _get_parser(self, cnx, **kwargs):
   130         return self.repo.vreg['parsers'].select(
   130         return self.repo.vreg['parsers'].select(
   131             self.parser_id, session, source=self, **kwargs)
   131             self.parser_id, cnx, source=self, **kwargs)
   132 
   132 
   133     def load_mapping(self, session):
   133     def load_mapping(self, cnx):
   134         self.mapping = {}
   134         self.mapping = {}
   135         self.mapping_idx = {}
   135         self.mapping_idx = {}
   136         try:
   136         try:
   137             parser = self._get_parser(session)
   137             parser = self._get_parser(cnx)
   138         except (RegistryNotFound, ObjectNotFound):
   138         except (RegistryNotFound, ObjectNotFound):
   139             return # no parser yet, don't go further
   139             return # no parser yet, don't go further
   140         self._load_mapping(session, parser=parser)
   140         self._load_mapping(cnx, parser=parser)
   141 
   141 
   142     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
   142     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
   143         """added CWSourceSchemaConfig, modify mapping accordingly"""
   143         """added CWSourceSchemaConfig, modify mapping accordingly"""
   144         if parser is None:
   144         if parser is None:
   145             parser = self._get_parser(schemacfg._cw)
   145             parser = self._get_parser(schemacfg._cw)
   157         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   157         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   158 
   158 
   159     def update_latest_retrieval(self, cnx):
   159     def update_latest_retrieval(self, cnx):
   160         self.latest_retrieval = datetime.utcnow()
   160         self.latest_retrieval = datetime.utcnow()
   161         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   161         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   162                         {'x': self.eid, 'date': self.latest_retrieval})
   162                     {'x': self.eid, 'date': self.latest_retrieval})
   163         cnx.commit()
   163         cnx.commit()
   164 
   164 
   165     def acquire_synchronization_lock(self, cnx):
   165     def acquire_synchronization_lock(self, cnx):
   166         # XXX race condition until WHERE of SET queries is executed using
   166         # XXX race condition until WHERE of SET queries is executed using
   167         # 'SELECT FOR UPDATE'
   167         # 'SELECT FOR UPDATE'
   176         cnx.commit()
   176         cnx.commit()
   177         return True
   177         return True
   178 
   178 
   179     def release_synchronization_lock(self, cnx):
   179     def release_synchronization_lock(self, cnx):
   180         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   180         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   181                         {'x': self.eid})
   181                     {'x': self.eid})
   182         cnx.commit()
   182         cnx.commit()
   183 
   183 
   184     def pull_data(self, cnx, force=False, raise_on_error=False):
   184     def pull_data(self, cnx, force=False, raise_on_error=False):
   185         """Launch synchronization of the source if needed.
   185         """Launch synchronization of the source if needed.
   186 
   186 
   236                 self.exception('error while processing %s: %s',
   236                 self.exception('error while processing %s: %s',
   237                                url, exc)
   237                                url, exc)
   238                 error = True
   238                 error = True
   239         return error
   239         return error
   240 
   240 
   241     def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
   241     def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
   242         """called by the repository when an eid has been attributed for an
   242         """called by the repository when an eid has been attributed for an
   243         entity stored here but the entity has not been inserted in the system
   243         entity stored here but the entity has not been inserted in the system
   244         table yet.
   244         table yet.
   245 
   245 
   246         This method must return the an Entity instance representation of this
   246         This method must return the an Entity instance representation of this
   247         entity.
   247         entity.
   248         """
   248         """
   249         entity = super(DataFeedSource, self).before_entity_insertion(
   249         entity = super(DataFeedSource, self).before_entity_insertion(
   250             session, lid, etype, eid, sourceparams)
   250             cnx, lid, etype, eid, sourceparams)
   251         entity.cw_edited['cwuri'] = lid.decode('utf-8')
   251         entity.cw_edited['cwuri'] = lid.decode('utf-8')
   252         entity.cw_edited.set_defaults()
   252         entity.cw_edited.set_defaults()
   253         sourceparams['parser'].before_entity_copy(entity, sourceparams)
   253         sourceparams['parser'].before_entity_copy(entity, sourceparams)
   254         return entity
   254         return entity
   255 
   255 
   256     def after_entity_insertion(self, session, lid, entity, sourceparams):
   256     def after_entity_insertion(self, cnx, lid, entity, sourceparams):
   257         """called by the repository after an entity stored here has been
   257         """called by the repository after an entity stored here has been
   258         inserted in the system table.
   258         inserted in the system table.
   259         """
   259         """
   260         relations = preprocess_inlined_relations(session, entity)
   260         relations = preprocess_inlined_relations(cnx, entity)
   261         if session.is_hook_category_activated('integrity'):
   261         if cnx.is_hook_category_activated('integrity'):
   262             entity.cw_edited.check(creation=True)
   262             entity.cw_edited.check(creation=True)
   263         self.repo.system_source.add_entity(session, entity)
   263         self.repo.system_source.add_entity(cnx, entity)
   264         entity.cw_edited.saved = entity._cw_is_saved = True
   264         entity.cw_edited.saved = entity._cw_is_saved = True
   265         sourceparams['parser'].after_entity_copy(entity, sourceparams)
   265         sourceparams['parser'].after_entity_copy(entity, sourceparams)
   266         # call hooks for inlined relations
   266         # call hooks for inlined relations
   267         call_hooks = self.repo.hm.call_hooks
   267         call_hooks = self.repo.hm.call_hooks
   268         if self.should_call_hooks:
   268         if self.should_call_hooks:
   269             for attr, value in relations:
   269             for attr, value in relations:
   270                 call_hooks('before_add_relation', session,
   270                 call_hooks('before_add_relation', cnx,
   271                            eidfrom=entity.eid, rtype=attr, eidto=value)
   271                            eidfrom=entity.eid, rtype=attr, eidto=value)
   272                 call_hooks('after_add_relation', session,
   272                 call_hooks('after_add_relation', cnx,
   273                            eidfrom=entity.eid, rtype=attr, eidto=value)
   273                            eidfrom=entity.eid, rtype=attr, eidto=value)
   274 
   274 
   275     def source_cwuris(self, session):
   275     def source_cwuris(self, cnx):
   276         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
   276         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
   277                'WHERE entities.eid=cw_source_relation.eid_from '
   277                'WHERE entities.eid=cw_source_relation.eid_from '
   278                'AND cw_source_relation.eid_to=%s' % self.eid)
   278                'AND cw_source_relation.eid_to=%s' % self.eid)
   279         return dict((b64decode(uri), (eid, type))
   279         return dict((b64decode(uri), (eid, type))
   280                     for uri, eid, type in session.system_sql(sql).fetchall())
   280                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   281 
   281 
   282     def init_import_log(self, session, **kwargs):
   282     def init_import_log(self, cnx, **kwargs):
   283         dataimport = session.create_entity('CWDataImport', cw_import_of=self,
   283         dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
   284                                            start_timestamp=datetime.utcnow(),
   284                                            start_timestamp=datetime.utcnow(),
   285                                            **kwargs)
   285                                            **kwargs)
   286         dataimport.init()
   286         dataimport.init()
   287         return dataimport
   287         return dataimport
   288 
   288 
   289 
   289 
   290 class DataFeedParser(AppObject):
   290 class DataFeedParser(AppObject):
   291     __registry__ = 'parsers'
   291     __registry__ = 'parsers'
   292 
   292 
   293     def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
   293     def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
   294         super(DataFeedParser, self).__init__(session, **kwargs)
   294         super(DataFeedParser, self).__init__(cnx, **kwargs)
   295         self.source = source
   295         self.source = source
   296         self.sourceuris = sourceuris
   296         self.sourceuris = sourceuris
   297         self.import_log = import_log
   297         self.import_log = import_log
   298         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   298         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   299 
   299 
   343 
   343 
   344     def extid2entity(self, uri, etype, **sourceparams):
   344     def extid2entity(self, uri, etype, **sourceparams):
   345         """return an entity for the given uri. May return None if it should be
   345         """return an entity for the given uri. May return None if it should be
   346         skipped
   346         skipped
   347         """
   347         """
   348         session = self._cw
   348         cnx = self._cw
   349         # if cwsource is specified and repository has a source with the same
   349         # if cwsource is specified and repository has a source with the same
   350         # name, call extid2eid on that source so entity will be properly seen as
   350         # name, call extid2eid on that source so entity will be properly seen as
   351         # coming from this source
   351         # coming from this source
   352         source_uri = sourceparams.pop('cwsource', None)
   352         source_uri = sourceparams.pop('cwsource', None)
   353         if source_uri is not None and source_uri != 'system':
   353         if source_uri is not None and source_uri != 'system':
   354             source = session.repo.sources_by_uri.get(source_uri, self.source)
   354             source = cnx.repo.sources_by_uri.get(source_uri, self.source)
   355         else:
   355         else:
   356             source = self.source
   356             source = self.source
   357         sourceparams['parser'] = self
   357         sourceparams['parser'] = self
   358         if isinstance(uri, unicode):
   358         if isinstance(uri, unicode):
   359             uri = uri.encode('utf-8')
   359             uri = uri.encode('utf-8')
   360         try:
   360         try:
   361             eid = session.repo.extid2eid(source, str(uri), etype, session,
   361             eid = cnx.repo.extid2eid(source, str(uri), etype, cnx,
   362                                          sourceparams=sourceparams)
   362                                          sourceparams=sourceparams)
   363         except ValidationError as ex:
   363         except ValidationError as ex:
   364             # XXX use critical so they are seen during tests. Should consider
   364             # XXX use critical so they are seen during tests. Should consider
   365             # raise_on_error instead?
   365             # raise_on_error instead?
   366             self.source.critical('error while creating %s: %s', etype, ex)
   366             self.source.critical('error while creating %s: %s', etype, ex)
   371             # entity has been moved away from its original source
   371             # entity has been moved away from its original source
   372             #
   372             #
   373             # Don't give etype to entity_from_eid so we get UnknownEid if the
   373             # Don't give etype to entity_from_eid so we get UnknownEid if the
   374             # entity has been removed
   374             # entity has been removed
   375             try:
   375             try:
   376                 entity = session.entity_from_eid(-eid)
   376                 entity = cnx.entity_from_eid(-eid)
   377             except UnknownEid:
   377             except UnknownEid:
   378                 return None
   378                 return None
   379             self.notify_updated(entity) # avoid later update from the source's data
   379             self.notify_updated(entity) # avoid later update from the source's data
   380             return entity
   380             return entity
   381         if self.sourceuris is not None:
   381         if self.sourceuris is not None:
   382             self.sourceuris.pop(str(uri), None)
   382             self.sourceuris.pop(str(uri), None)
   383         return session.entity_from_eid(eid, etype)
   383         return cnx.entity_from_eid(eid, etype)
   384 
   384 
   385     def process(self, url, raise_on_error=False):
   385     def process(self, url, raise_on_error=False):
   386         """main callback: process the url"""
   386         """main callback: process the url"""
   387         raise NotImplementedError
   387         raise NotImplementedError
   388 
   388 
   409         is actually deleted. Always return True by default, put more sensible
   409         is actually deleted. Always return True by default, put more sensible
   410         stuff in sub-classes.
   410         stuff in sub-classes.
   411         """
   411         """
   412         return True
   412         return True
   413 
   413 
   414     def handle_deletion(self, config, session, myuris):
   414     def handle_deletion(self, config, cnx, myuris):
   415         if config['delete-entities'] and myuris:
   415         if config['delete-entities'] and myuris:
   416             byetype = {}
   416             byetype = {}
   417             for extid, (eid, etype) in myuris.iteritems():
   417             for extid, (eid, etype) in myuris.iteritems():
   418                 if self.is_deleted(extid, etype, eid):
   418                 if self.is_deleted(extid, etype, eid):
   419                     byetype.setdefault(etype, []).append(str(eid))
   419                     byetype.setdefault(etype, []).append(str(eid))
   420             for etype, eids in byetype.iteritems():
   420             for etype, eids in byetype.iteritems():
   421                 self.warning('delete %s %s entities', len(eids), etype)
   421                 self.warning('delete %s %s entities', len(eids), etype)
   422                 session.set_cnxset()
   422                 cnx.execute('DELETE %s X WHERE X eid IN (%s)'
   423                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   423                             % (etype, ','.join(eids)))
   424                                 % (etype, ','.join(eids)))
   424                 cnx.commit()
   425                 session.commit()
       
   426 
   425 
   427     def update_if_necessary(self, entity, attrs):
   426     def update_if_necessary(self, entity, attrs):
   428         entity.complete(tuple(attrs))
   427         entity.complete(tuple(attrs))
   429         # check modification date and compare attribute values to only update
   428         # check modification date and compare attribute values to only update
   430         # what's actually needed
   429         # what's actually needed
   448             if raise_on_error:
   447             if raise_on_error:
   449                 raise
   448                 raise
   450             self.import_log.record_error(str(ex))
   449             self.import_log.record_error(str(ex))
   451             return True
   450             return True
   452         error = False
   451         error = False
   453         # Check whether self._cw is a session or a connection
   452         commit = self._cw.commit
   454         if getattr(self._cw, 'commit', None) is not None:
   453         rollback = self._cw.rollback
   455             commit = self._cw.commit
       
   456             rollback = self._cw.rollback
       
   457         else:
       
   458             commit = self._cw.cnx.commit
       
   459             rollback = self._cw.cnx.rollback
       
   460         for args in parsed:
   454         for args in parsed:
   461             try:
   455             try:
   462                 self.process_item(*args)
   456                 self.process_item(*args)
   463                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
   457                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
   464                 # other a chance to get our connections set
   458                 # other a chance to get our connections set