server/sources/datafeed.py
changeset 9897 fa44db7da2dc
parent 9825 946b483bc8a1
parent 9879 21278eb03bbf
child 9990 c84ad981fc4a
--- a/server/sources/datafeed.py	Thu Jul 17 11:08:56 2014 +0200
+++ b/server/sources/datafeed.py	Fri Jul 18 17:35:25 2014 +0200
@@ -126,18 +126,18 @@
         self.parser_id = source_entity.parser
         self.load_mapping(source_entity._cw)
 
-    def _get_parser(self, session, **kwargs):
+    def _get_parser(self, cnx, **kwargs):
         return self.repo.vreg['parsers'].select(
-            self.parser_id, session, source=self, **kwargs)
+            self.parser_id, cnx, source=self, **kwargs)
 
-    def load_mapping(self, session):
+    def load_mapping(self, cnx):
         self.mapping = {}
         self.mapping_idx = {}
         try:
-            parser = self._get_parser(session)
+            parser = self._get_parser(cnx)
         except (RegistryNotFound, ObjectNotFound):
             return # no parser yet, don't go further
-        self._load_mapping(session, parser=parser)
+        self._load_mapping(cnx, parser=parser)
 
     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
         """added CWSourceSchemaConfig, modify mapping accordingly"""
@@ -159,7 +159,7 @@
     def update_latest_retrieval(self, cnx):
         self.latest_retrieval = datetime.utcnow()
         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
-                        {'x': self.eid, 'date': self.latest_retrieval})
+                    {'x': self.eid, 'date': self.latest_retrieval})
         cnx.commit()
 
     def acquire_synchronization_lock(self, cnx):
@@ -178,7 +178,7 @@
 
     def release_synchronization_lock(self, cnx):
         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
-                        {'x': self.eid})
+                    {'x': self.eid})
         cnx.commit()
 
     def pull_data(self, cnx, force=False, raise_on_error=False):
@@ -238,7 +238,7 @@
                 error = True
         return error
 
-    def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
+    def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
         """called by the repository when an eid has been attributed for an
         entity stored here but the entity has not been inserted in the system
         table yet.
@@ -247,40 +247,40 @@
         entity.
         """
         entity = super(DataFeedSource, self).before_entity_insertion(
-            session, lid, etype, eid, sourceparams)
+            cnx, lid, etype, eid, sourceparams)
         entity.cw_edited['cwuri'] = lid.decode('utf-8')
         entity.cw_edited.set_defaults()
         sourceparams['parser'].before_entity_copy(entity, sourceparams)
         return entity
 
-    def after_entity_insertion(self, session, lid, entity, sourceparams):
+    def after_entity_insertion(self, cnx, lid, entity, sourceparams):
         """called by the repository after an entity stored here has been
         inserted in the system table.
         """
-        relations = preprocess_inlined_relations(session, entity)
-        if session.is_hook_category_activated('integrity'):
+        relations = preprocess_inlined_relations(cnx, entity)
+        if cnx.is_hook_category_activated('integrity'):
             entity.cw_edited.check(creation=True)
-        self.repo.system_source.add_entity(session, entity)
+        self.repo.system_source.add_entity(cnx, entity)
         entity.cw_edited.saved = entity._cw_is_saved = True
         sourceparams['parser'].after_entity_copy(entity, sourceparams)
         # call hooks for inlined relations
         call_hooks = self.repo.hm.call_hooks
         if self.should_call_hooks:
             for attr, value in relations:
-                call_hooks('before_add_relation', session,
+                call_hooks('before_add_relation', cnx,
                            eidfrom=entity.eid, rtype=attr, eidto=value)
-                call_hooks('after_add_relation', session,
+                call_hooks('after_add_relation', cnx,
                            eidfrom=entity.eid, rtype=attr, eidto=value)
 
-    def source_cwuris(self, session):
+    def source_cwuris(self, cnx):
         sql = ('SELECT extid, eid, type FROM entities, cw_source_relation '
                'WHERE entities.eid=cw_source_relation.eid_from '
                'AND cw_source_relation.eid_to=%s' % self.eid)
         return dict((b64decode(uri), (eid, type))
-                    for uri, eid, type in session.system_sql(sql).fetchall())
+                    for uri, eid, type in cnx.system_sql(sql).fetchall())
 
-    def init_import_log(self, session, **kwargs):
-        dataimport = session.create_entity('CWDataImport', cw_import_of=self,
+    def init_import_log(self, cnx, **kwargs):
+        dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
                                            start_timestamp=datetime.utcnow(),
                                            **kwargs)
         dataimport.init()
@@ -290,8 +290,8 @@
 class DataFeedParser(AppObject):
     __registry__ = 'parsers'
 
-    def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
-        super(DataFeedParser, self).__init__(session, **kwargs)
+    def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
+        super(DataFeedParser, self).__init__(cnx, **kwargs)
         self.source = source
         self.sourceuris = sourceuris
         self.import_log = import_log
@@ -345,20 +345,20 @@
         """return an entity for the given uri. May return None if it should be
         skipped
         """
-        session = self._cw
+        cnx = self._cw
         # if cwsource is specified and repository has a source with the same
         # name, call extid2eid on that source so entity will be properly seen as
         # coming from this source
         source_uri = sourceparams.pop('cwsource', None)
         if source_uri is not None and source_uri != 'system':
-            source = session.repo.sources_by_uri.get(source_uri, self.source)
+            source = cnx.repo.sources_by_uri.get(source_uri, self.source)
         else:
             source = self.source
         sourceparams['parser'] = self
         if isinstance(uri, unicode):
             uri = uri.encode('utf-8')
         try:
-            eid = session.repo.extid2eid(source, str(uri), etype, session,
+            eid = cnx.repo.extid2eid(source, str(uri), etype, cnx,
                                          sourceparams=sourceparams)
         except ValidationError as ex:
             # XXX use critical so they are seen during tests. Should consider
@@ -373,14 +373,14 @@
             # Don't give etype to entity_from_eid so we get UnknownEid if the
             # entity has been removed
             try:
-                entity = session.entity_from_eid(-eid)
+                entity = cnx.entity_from_eid(-eid)
             except UnknownEid:
                 return None
             self.notify_updated(entity) # avoid later update from the source's data
             return entity
         if self.sourceuris is not None:
             self.sourceuris.pop(str(uri), None)
-        return session.entity_from_eid(eid, etype)
+        return cnx.entity_from_eid(eid, etype)
 
     def process(self, url, raise_on_error=False):
         """main callback: process the url"""
@@ -411,7 +411,7 @@
         """
         return True
 
-    def handle_deletion(self, config, session, myuris):
+    def handle_deletion(self, config, cnx, myuris):
         if config['delete-entities'] and myuris:
             byetype = {}
             for extid, (eid, etype) in myuris.iteritems():
@@ -419,10 +419,9 @@
                     byetype.setdefault(etype, []).append(str(eid))
             for etype, eids in byetype.iteritems():
                 self.warning('delete %s %s entities', len(eids), etype)
-                session.set_cnxset()
-                session.execute('DELETE %s X WHERE X eid IN (%s)'
-                                % (etype, ','.join(eids)))
-                session.commit()
+                cnx.execute('DELETE %s X WHERE X eid IN (%s)'
+                            % (etype, ','.join(eids)))
+                cnx.commit()
 
     def update_if_necessary(self, entity, attrs):
         entity.complete(tuple(attrs))
@@ -450,13 +449,8 @@
             self.import_log.record_error(str(ex))
             return True
         error = False
-        # Check whether self._cw is a session or a connection
-        if getattr(self._cw, 'commit', None) is not None:
-            commit = self._cw.commit
-            rollback = self._cw.rollback
-        else:
-            commit = self._cw.cnx.commit
-            rollback = self._cw.cnx.rollback
+        commit = self._cw.commit
+        rollback = self._cw.rollback
         for args in parsed:
             try:
                 self.process_item(*args)