diff -r 928732ec00dd -r fa44db7da2dc server/sources/datafeed.py --- a/server/sources/datafeed.py Thu Jul 17 11:08:56 2014 +0200 +++ b/server/sources/datafeed.py Fri Jul 18 17:35:25 2014 +0200 @@ -126,18 +126,18 @@ self.parser_id = source_entity.parser self.load_mapping(source_entity._cw) - def _get_parser(self, session, **kwargs): + def _get_parser(self, cnx, **kwargs): return self.repo.vreg['parsers'].select( - self.parser_id, session, source=self, **kwargs) + self.parser_id, cnx, source=self, **kwargs) - def load_mapping(self, session): + def load_mapping(self, cnx): self.mapping = {} self.mapping_idx = {} try: - parser = self._get_parser(session) + parser = self._get_parser(cnx) except (RegistryNotFound, ObjectNotFound): return # no parser yet, don't go further - self._load_mapping(session, parser=parser) + self._load_mapping(cnx, parser=parser) def add_schema_config(self, schemacfg, checkonly=False, parser=None): """added CWSourceSchemaConfig, modify mapping accordingly""" @@ -159,7 +159,7 @@ def update_latest_retrieval(self, cnx): self.latest_retrieval = datetime.utcnow() cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', - {'x': self.eid, 'date': self.latest_retrieval}) + {'x': self.eid, 'date': self.latest_retrieval}) cnx.commit() def acquire_synchronization_lock(self, cnx): @@ -178,7 +178,7 @@ def release_synchronization_lock(self, cnx): cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s', - {'x': self.eid}) + {'x': self.eid}) cnx.commit() def pull_data(self, cnx, force=False, raise_on_error=False): @@ -238,7 +238,7 @@ error = True return error - def before_entity_insertion(self, session, lid, etype, eid, sourceparams): + def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams): """called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. @@ -247,40 +247,40 @@ entity. """ entity = super(DataFeedSource, self).before_entity_insertion( - session, lid, etype, eid, sourceparams) + cnx, lid, etype, eid, sourceparams) entity.cw_edited['cwuri'] = lid.decode('utf-8') entity.cw_edited.set_defaults() sourceparams['parser'].before_entity_copy(entity, sourceparams) return entity - def after_entity_insertion(self, session, lid, entity, sourceparams): + def after_entity_insertion(self, cnx, lid, entity, sourceparams): """called by the repository after an entity stored here has been inserted in the system table. """ - relations = preprocess_inlined_relations(session, entity) - if session.is_hook_category_activated('integrity'): + relations = preprocess_inlined_relations(cnx, entity) + if cnx.is_hook_category_activated('integrity'): entity.cw_edited.check(creation=True) - self.repo.system_source.add_entity(session, entity) + self.repo.system_source.add_entity(cnx, entity) entity.cw_edited.saved = entity._cw_is_saved = True sourceparams['parser'].after_entity_copy(entity, sourceparams) # call hooks for inlined relations call_hooks = self.repo.hm.call_hooks if self.should_call_hooks: for attr, value in relations: - call_hooks('before_add_relation', session, + call_hooks('before_add_relation', cnx, eidfrom=entity.eid, rtype=attr, eidto=value) - call_hooks('after_add_relation', session, + call_hooks('after_add_relation', cnx, eidfrom=entity.eid, rtype=attr, eidto=value) - def source_cwuris(self, session): + def source_cwuris(self, cnx): sql = ('SELECT extid, eid, type FROM entities, cw_source_relation ' 'WHERE entities.eid=cw_source_relation.eid_from ' 'AND cw_source_relation.eid_to=%s' % self.eid) return dict((b64decode(uri), (eid, type)) - for uri, eid, type in session.system_sql(sql).fetchall()) + for uri, eid, type in cnx.system_sql(sql).fetchall()) - def init_import_log(self, session, **kwargs): - dataimport = session.create_entity('CWDataImport', cw_import_of=self, + def init_import_log(self, cnx, **kwargs): + dataimport = cnx.create_entity('CWDataImport', cw_import_of=self, start_timestamp=datetime.utcnow(), **kwargs) dataimport.init() @@ -290,8 +290,8 @@ class DataFeedParser(AppObject): __registry__ = 'parsers' - def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs): - super(DataFeedParser, self).__init__(session, **kwargs) + def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs): + super(DataFeedParser, self).__init__(cnx, **kwargs) self.source = source self.sourceuris = sourceuris self.import_log = import_log @@ -345,20 +345,20 @@ """return an entity for the given uri. May return None if it should be skipped """ - session = self._cw + cnx = self._cw # if cwsource is specified and repository has a source with the same # name, call extid2eid on that source so entity will be properly seen as # coming from this source source_uri = sourceparams.pop('cwsource', None) if source_uri is not None and source_uri != 'system': - source = session.repo.sources_by_uri.get(source_uri, self.source) + source = cnx.repo.sources_by_uri.get(source_uri, self.source) else: source = self.source sourceparams['parser'] = self if isinstance(uri, unicode): uri = uri.encode('utf-8') try: - eid = session.repo.extid2eid(source, str(uri), etype, session, + eid = cnx.repo.extid2eid(source, str(uri), etype, cnx, sourceparams=sourceparams) except ValidationError as ex: # XXX use critical so they are seen during tests. Should consider @@ -373,14 +373,14 @@ # Don't give etype to entity_from_eid so we get UnknownEid if the # entity has been removed try: - entity = session.entity_from_eid(-eid) + entity = cnx.entity_from_eid(-eid) except UnknownEid: return None self.notify_updated(entity) # avoid later update from the source's data return entity if self.sourceuris is not None: self.sourceuris.pop(str(uri), None) - return session.entity_from_eid(eid, etype) + return cnx.entity_from_eid(eid, etype) def process(self, url, raise_on_error=False): """main callback: process the url""" @@ -411,7 +411,7 @@ """ return True - def handle_deletion(self, config, session, myuris): + def handle_deletion(self, config, cnx, myuris): if config['delete-entities'] and myuris: byetype = {} for extid, (eid, etype) in myuris.iteritems(): @@ -419,10 +419,9 @@ byetype.setdefault(etype, []).append(str(eid)) for etype, eids in byetype.iteritems(): self.warning('delete %s %s entities', len(eids), etype) - session.set_cnxset() - session.execute('DELETE %s X WHERE X eid IN (%s)' - % (etype, ','.join(eids))) - session.commit() + cnx.execute('DELETE %s X WHERE X eid IN (%s)' + % (etype, ','.join(eids))) + cnx.commit() def update_if_necessary(self, entity, attrs): entity.complete(tuple(attrs)) @@ -450,13 +449,8 @@ self.import_log.record_error(str(ex)) return True error = False - # Check whether self._cw is a session or a connection - if getattr(self._cw, 'commit', None) is not None: - commit = self._cw.commit - rollback = self._cw.rollback - else: - commit = self._cw.cnx.commit - rollback = self._cw.cnx.rollback + commit = self._cw.commit + rollback = self._cw.rollback for args in parsed: try: self.process_item(*args)