server/sources/datafeed.py
changeset 9879 21278eb03bbf
parent 9860 e24bf60428d3
child 9897 fa44db7da2dc
child 9975 98b4f7fa2e3a
equal deleted inserted replaced
9878:f3936f64bd98 9879:21278eb03bbf
   111     def init(self, activated, source_entity):
   111     def init(self, activated, source_entity):
   112         super(DataFeedSource, self).init(activated, source_entity)
   112         super(DataFeedSource, self).init(activated, source_entity)
   113         self.parser_id = source_entity.parser
   113         self.parser_id = source_entity.parser
   114         self.load_mapping(source_entity._cw)
   114         self.load_mapping(source_entity._cw)
   115 
   115 
   116     def _get_parser(self, session, **kwargs):
   116     def _get_parser(self, cnx, **kwargs):
   117         return self.repo.vreg['parsers'].select(
   117         return self.repo.vreg['parsers'].select(
   118             self.parser_id, session, source=self, **kwargs)
   118             self.parser_id, cnx, source=self, **kwargs)
   119 
   119 
   120     def load_mapping(self, session):
   120     def load_mapping(self, cnx):
   121         self.mapping = {}
   121         self.mapping = {}
   122         self.mapping_idx = {}
   122         self.mapping_idx = {}
   123         try:
   123         try:
   124             parser = self._get_parser(session)
   124             parser = self._get_parser(cnx)
   125         except (RegistryNotFound, ObjectNotFound):
   125         except (RegistryNotFound, ObjectNotFound):
   126             return # no parser yet, don't go further
   126             return # no parser yet, don't go further
   127         self._load_mapping(session, parser=parser)
   127         self._load_mapping(cnx, parser=parser)
   128 
   128 
   129     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
   129     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
   130         """added CWSourceSchemaConfig, modify mapping accordingly"""
   130         """added CWSourceSchemaConfig, modify mapping accordingly"""
   131         if parser is None:
   131         if parser is None:
   132             parser = self._get_parser(schemacfg._cw)
   132             parser = self._get_parser(schemacfg._cw)
   144         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   144         return datetime.utcnow() < (self.latest_retrieval + self.synchro_interval)
   145 
   145 
   146     def update_latest_retrieval(self, cnx):
   146     def update_latest_retrieval(self, cnx):
   147         self.latest_retrieval = datetime.utcnow()
   147         self.latest_retrieval = datetime.utcnow()
   148         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   148         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
   149                         {'x': self.eid, 'date': self.latest_retrieval})
   149                     {'x': self.eid, 'date': self.latest_retrieval})
   150         cnx.commit()
   150         cnx.commit()
   151 
   151 
   152     def acquire_synchronization_lock(self, cnx):
   152     def acquire_synchronization_lock(self, cnx):
   153         # XXX race condition until WHERE of SET queries is executed using
   153         # XXX race condition until WHERE of SET queries is executed using
   154         # 'SELECT FOR UPDATE'
   154         # 'SELECT FOR UPDATE'
   163         cnx.commit()
   163         cnx.commit()
   164         return True
   164         return True
   165 
   165 
   166     def release_synchronization_lock(self, cnx):
   166     def release_synchronization_lock(self, cnx):
   167         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   167         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
   168                         {'x': self.eid})
   168                     {'x': self.eid})
   169         cnx.commit()
   169         cnx.commit()
   170 
   170 
   171     def pull_data(self, cnx, force=False, raise_on_error=False):
   171     def pull_data(self, cnx, force=False, raise_on_error=False):
   172         """Launch synchronization of the source if needed.
   172         """Launch synchronization of the source if needed.
   173 
   173 
   223                 self.exception('error while processing %s: %s',
   223                 self.exception('error while processing %s: %s',
   224                                url, exc)
   224                                url, exc)
   225                 error = True
   225                 error = True
   226         return error
   226         return error
   227 
   227 
   228     def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
   228     def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
   229         """called by the repository when an eid has been attributed for an
   229         """called by the repository when an eid has been attributed for an
   230         entity stored here but the entity has not been inserted in the system
   230         entity stored here but the entity has not been inserted in the system
   231         table yet.
   231         table yet.
   232 
   232 
   233         This method must return the an Entity instance representation of this
   233         This method must return the an Entity instance representation of this
   234         entity.
   234         entity.
   235         """
   235         """
   236         entity = super(DataFeedSource, self).before_entity_insertion(
   236         entity = super(DataFeedSource, self).before_entity_insertion(
   237             session, lid, etype, eid, sourceparams)
   237             cnx, lid, etype, eid, sourceparams)
   238         entity.cw_edited['cwuri'] = lid.decode('utf-8')
   238         entity.cw_edited['cwuri'] = lid.decode('utf-8')
   239         entity.cw_edited.set_defaults()
   239         entity.cw_edited.set_defaults()
   240         sourceparams['parser'].before_entity_copy(entity, sourceparams)
   240         sourceparams['parser'].before_entity_copy(entity, sourceparams)
   241         return entity
   241         return entity
   242 
   242 
   243     def after_entity_insertion(self, session, lid, entity, sourceparams):
   243     def after_entity_insertion(self, cnx, lid, entity, sourceparams):
   244         """called by the repository after an entity stored here has been
   244         """called by the repository after an entity stored here has been
   245         inserted in the system table.
   245         inserted in the system table.
   246         """
   246         """
   247         relations = preprocess_inlined_relations(session, entity)
   247         relations = preprocess_inlined_relations(cnx, entity)
   248         if session.is_hook_category_activated('integrity'):
   248         if cnx.is_hook_category_activated('integrity'):
   249             entity.cw_edited.check(creation=True)
   249             entity.cw_edited.check(creation=True)
   250         self.repo.system_source.add_entity(session, entity)
   250         self.repo.system_source.add_entity(cnx, entity)
   251         entity.cw_edited.saved = entity._cw_is_saved = True
   251         entity.cw_edited.saved = entity._cw_is_saved = True
   252         sourceparams['parser'].after_entity_copy(entity, sourceparams)
   252         sourceparams['parser'].after_entity_copy(entity, sourceparams)
   253         # call hooks for inlined relations
   253         # call hooks for inlined relations
   254         call_hooks = self.repo.hm.call_hooks
   254         call_hooks = self.repo.hm.call_hooks
   255         if self.should_call_hooks:
   255         if self.should_call_hooks:
   256             for attr, value in relations:
   256             for attr, value in relations:
   257                 call_hooks('before_add_relation', session,
   257                 call_hooks('before_add_relation', cnx,
   258                            eidfrom=entity.eid, rtype=attr, eidto=value)
   258                            eidfrom=entity.eid, rtype=attr, eidto=value)
   259                 call_hooks('after_add_relation', session,
   259                 call_hooks('after_add_relation', cnx,
   260                            eidfrom=entity.eid, rtype=attr, eidto=value)
   260                            eidfrom=entity.eid, rtype=attr, eidto=value)
   261 
   261 
   262     def source_cwuris(self, cnx):
   262     def source_cwuris(self, cnx):
   263         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
   263         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
   264                'WHERE entities.eid=cw_source_relation.eid_from '
   264                'WHERE entities.eid=cw_source_relation.eid_from '
   265                'AND cw_source_relation.eid_to=%s' % self.eid)
   265                'AND cw_source_relation.eid_to=%s' % self.eid)
   266         return dict((b64decode(uri), (eid, type))
   266         return dict((b64decode(uri), (eid, type))
   267                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   267                     for uri, eid, type in cnx.system_sql(sql).fetchall())
   268 
   268 
   269     def init_import_log(self, session, **kwargs):
   269     def init_import_log(self, cnx, **kwargs):
   270         dataimport = session.create_entity('CWDataImport', cw_import_of=self,
   270         dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
   271                                            start_timestamp=datetime.utcnow(),
   271                                            start_timestamp=datetime.utcnow(),
   272                                            **kwargs)
   272                                            **kwargs)
   273         dataimport.init()
   273         dataimport.init()
   274         return dataimport
   274         return dataimport
   275 
   275 
   276 
   276 
   277 class DataFeedParser(AppObject):
   277 class DataFeedParser(AppObject):
   278     __registry__ = 'parsers'
   278     __registry__ = 'parsers'
   279 
   279 
   280     def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
   280     def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
   281         super(DataFeedParser, self).__init__(session, **kwargs)
   281         super(DataFeedParser, self).__init__(cnx, **kwargs)
   282         self.source = source
   282         self.source = source
   283         self.sourceuris = sourceuris
   283         self.sourceuris = sourceuris
   284         self.import_log = import_log
   284         self.import_log = import_log
   285         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   285         self.stats = {'created': set(), 'updated': set(), 'checked': set()}
   286 
   286 
   303 
   303 
   304     def extid2entity(self, uri, etype, **sourceparams):
   304     def extid2entity(self, uri, etype, **sourceparams):
   305         """return an entity for the given uri. May return None if it should be
   305         """return an entity for the given uri. May return None if it should be
   306         skipped
   306         skipped
   307         """
   307         """
   308         session = self._cw
   308         cnx = self._cw
   309         # if cwsource is specified and repository has a source with the same
   309         # if cwsource is specified and repository has a source with the same
   310         # name, call extid2eid on that source so entity will be properly seen as
   310         # name, call extid2eid on that source so entity will be properly seen as
   311         # coming from this source
   311         # coming from this source
   312         source_uri = sourceparams.pop('cwsource', None)
   312         source_uri = sourceparams.pop('cwsource', None)
   313         if source_uri is not None and source_uri != 'system':
   313         if source_uri is not None and source_uri != 'system':
   314             source = session.repo.sources_by_uri.get(source_uri, self.source)
   314             source = cnx.repo.sources_by_uri.get(source_uri, self.source)
   315         else:
   315         else:
   316             source = self.source
   316             source = self.source
   317         sourceparams['parser'] = self
   317         sourceparams['parser'] = self
   318         if isinstance(uri, unicode):
   318         if isinstance(uri, unicode):
   319             uri = uri.encode('utf-8')
   319             uri = uri.encode('utf-8')
   320         try:
   320         try:
   321             eid = session.repo.extid2eid(source, str(uri), etype, session,
   321             eid = cnx.repo.extid2eid(source, str(uri), etype, cnx,
   322                                          sourceparams=sourceparams)
   322                                          sourceparams=sourceparams)
   323         except ValidationError as ex:
   323         except ValidationError as ex:
   324             # XXX use critical so they are seen during tests. Should consider
   324             # XXX use critical so they are seen during tests. Should consider
   325             # raise_on_error instead?
   325             # raise_on_error instead?
   326             self.source.critical('error while creating %s: %s', etype, ex)
   326             self.source.critical('error while creating %s: %s', etype, ex)
   331             # entity has been moved away from its original source
   331             # entity has been moved away from its original source
   332             #
   332             #
   333             # Don't give etype to entity_from_eid so we get UnknownEid if the
   333             # Don't give etype to entity_from_eid so we get UnknownEid if the
   334             # entity has been removed
   334             # entity has been removed
   335             try:
   335             try:
   336                 entity = session.entity_from_eid(-eid)
   336                 entity = cnx.entity_from_eid(-eid)
   337             except UnknownEid:
   337             except UnknownEid:
   338                 return None
   338                 return None
   339             self.notify_updated(entity) # avoid later update from the source's data
   339             self.notify_updated(entity) # avoid later update from the source's data
   340             return entity
   340             return entity
   341         if self.sourceuris is not None:
   341         if self.sourceuris is not None:
   342             self.sourceuris.pop(str(uri), None)
   342             self.sourceuris.pop(str(uri), None)
   343         return session.entity_from_eid(eid, etype)
   343         return cnx.entity_from_eid(eid, etype)
   344 
   344 
   345     def process(self, url, raise_on_error=False):
   345     def process(self, url, raise_on_error=False):
   346         """main callback: process the url"""
   346         """main callback: process the url"""
   347         raise NotImplementedError
   347         raise NotImplementedError
   348 
   348 
   369         is actually deleted. Always return True by default, put more sensible
   369         is actually deleted. Always return True by default, put more sensible
   370         stuff in sub-classes.
   370         stuff in sub-classes.
   371         """
   371         """
   372         return True
   372         return True
   373 
   373 
   374     def handle_deletion(self, config, session, myuris):
   374     def handle_deletion(self, config, cnx, myuris):
   375         if config['delete-entities'] and myuris:
   375         if config['delete-entities'] and myuris:
   376             byetype = {}
   376             byetype = {}
   377             for extid, (eid, etype) in myuris.iteritems():
   377             for extid, (eid, etype) in myuris.iteritems():
   378                 if self.is_deleted(extid, etype, eid):
   378                 if self.is_deleted(extid, etype, eid):
   379                     byetype.setdefault(etype, []).append(str(eid))
   379                     byetype.setdefault(etype, []).append(str(eid))
   380             for etype, eids in byetype.iteritems():
   380             for etype, eids in byetype.iteritems():
   381                 self.warning('delete %s %s entities', len(eids), etype)
   381                 self.warning('delete %s %s entities', len(eids), etype)
   382                 session.set_cnxset()
   382                 cnx.execute('DELETE %s X WHERE X eid IN (%s)'
   383                 session.execute('DELETE %s X WHERE X eid IN (%s)'
   383                             % (etype, ','.join(eids)))
   384                                 % (etype, ','.join(eids)))
   384                 cnx.commit()
   385                 session.commit()
       
   386 
   385 
   387     def update_if_necessary(self, entity, attrs):
   386     def update_if_necessary(self, entity, attrs):
   388         entity.complete(tuple(attrs))
   387         entity.complete(tuple(attrs))
   389         # check modification date and compare attribute values to only update
   388         # check modification date and compare attribute values to only update
   390         # what's actually needed
   389         # what's actually needed
   408             if raise_on_error:
   407             if raise_on_error:
   409                 raise
   408                 raise
   410             self.import_log.record_error(str(ex))
   409             self.import_log.record_error(str(ex))
   411             return True
   410             return True
   412         error = False
   411         error = False
   413         # Check whether self._cw is a session or a connection
   412         commit = self._cw.commit
   414         if getattr(self._cw, 'commit', None) is not None:
   413         rollback = self._cw.rollback
   415             commit = self._cw.commit
       
   416             rollback = self._cw.rollback
       
   417         else:
       
   418             commit = self._cw.cnx.commit
       
   419             rollback = self._cw.cnx.rollback
       
   420         for args in parsed:
   414         for args in parsed:
   421             try:
   415             try:
   422                 self.process_item(*args)
   416                 self.process_item(*args)
   423                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
   417                 # commit+set_cnxset instead of commit(free_cnxset=False) to let
   424                 # other a chance to get our connections set
   418                 # other a chance to get our connections set