[datafeed sources] finish the session -> cnx switch
authorAurelien Campeas <aurelien.campeas@logilab.fr>
Wed, 11 Jun 2014 16:54:31 +0200
changeset 9879 21278eb03bbf
parent 9878 f3936f64bd98
child 9880 9adf36ce805e
[datafeed sources] finish the session -> cnx switch Related to #3933480.
server/sources/datafeed.py
sobjects/ldapparser.py
sobjects/test/unittest_cwxmlparser.py
--- a/server/sources/datafeed.py	Fri Jun 06 15:56:24 2014 +0200
+++ b/server/sources/datafeed.py	Wed Jun 11 16:54:31 2014 +0200
@@ -113,18 +113,18 @@
         self.parser_id = source_entity.parser
         self.load_mapping(source_entity._cw)
 
-    def _get_parser(self, session, **kwargs):
+    def _get_parser(self, cnx, **kwargs):
         return self.repo.vreg['parsers'].select(
-            self.parser_id, session, source=self, **kwargs)
+            self.parser_id, cnx, source=self, **kwargs)
 
-    def load_mapping(self, session):
+    def load_mapping(self, cnx):
         self.mapping = {}
         self.mapping_idx = {}
         try:
-            parser = self._get_parser(session)
+            parser = self._get_parser(cnx)
         except (RegistryNotFound, ObjectNotFound):
             return # no parser yet, don't go further
-        self._load_mapping(session, parser=parser)
+        self._load_mapping(cnx, parser=parser)
 
     def add_schema_config(self, schemacfg, checkonly=False, parser=None):
         """added CWSourceSchemaConfig, modify mapping accordingly"""
@@ -146,7 +146,7 @@
     def update_latest_retrieval(self, cnx):
         self.latest_retrieval = datetime.utcnow()
         cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
-                        {'x': self.eid, 'date': self.latest_retrieval})
+                    {'x': self.eid, 'date': self.latest_retrieval})
         cnx.commit()
 
     def acquire_synchronization_lock(self, cnx):
@@ -165,7 +165,7 @@
 
     def release_synchronization_lock(self, cnx):
         cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',
-                        {'x': self.eid})
+                    {'x': self.eid})
         cnx.commit()
 
     def pull_data(self, cnx, force=False, raise_on_error=False):
@@ -225,7 +225,7 @@
                 error = True
         return error
 
-    def before_entity_insertion(self, session, lid, etype, eid, sourceparams):
+    def before_entity_insertion(self, cnx, lid, etype, eid, sourceparams):
         """called by the repository when an eid has been attributed for an
         entity stored here but the entity has not been inserted in the system
         table yet.
@@ -234,29 +234,29 @@
         entity.
         """
         entity = super(DataFeedSource, self).before_entity_insertion(
-            session, lid, etype, eid, sourceparams)
+            cnx, lid, etype, eid, sourceparams)
         entity.cw_edited['cwuri'] = lid.decode('utf-8')
         entity.cw_edited.set_defaults()
         sourceparams['parser'].before_entity_copy(entity, sourceparams)
         return entity
 
-    def after_entity_insertion(self, session, lid, entity, sourceparams):
+    def after_entity_insertion(self, cnx, lid, entity, sourceparams):
         """called by the repository after an entity stored here has been
         inserted in the system table.
         """
-        relations = preprocess_inlined_relations(session, entity)
-        if session.is_hook_category_activated('integrity'):
+        relations = preprocess_inlined_relations(cnx, entity)
+        if cnx.is_hook_category_activated('integrity'):
             entity.cw_edited.check(creation=True)
-        self.repo.system_source.add_entity(session, entity)
+        self.repo.system_source.add_entity(cnx, entity)
         entity.cw_edited.saved = entity._cw_is_saved = True
         sourceparams['parser'].after_entity_copy(entity, sourceparams)
         # call hooks for inlined relations
         call_hooks = self.repo.hm.call_hooks
         if self.should_call_hooks:
             for attr, value in relations:
-                call_hooks('before_add_relation', session,
+                call_hooks('before_add_relation', cnx,
                            eidfrom=entity.eid, rtype=attr, eidto=value)
-                call_hooks('after_add_relation', session,
+                call_hooks('after_add_relation', cnx,
                            eidfrom=entity.eid, rtype=attr, eidto=value)
 
     def source_cwuris(self, cnx):
@@ -266,8 +266,8 @@
         return dict((b64decode(uri), (eid, type))
                     for uri, eid, type in cnx.system_sql(sql).fetchall())
 
-    def init_import_log(self, session, **kwargs):
-        dataimport = session.create_entity('CWDataImport', cw_import_of=self,
+    def init_import_log(self, cnx, **kwargs):
+        dataimport = cnx.create_entity('CWDataImport', cw_import_of=self,
                                            start_timestamp=datetime.utcnow(),
                                            **kwargs)
         dataimport.init()
@@ -277,8 +277,8 @@
 class DataFeedParser(AppObject):
     __registry__ = 'parsers'
 
-    def __init__(self, session, source, sourceuris=None, import_log=None, **kwargs):
-        super(DataFeedParser, self).__init__(session, **kwargs)
+    def __init__(self, cnx, source, sourceuris=None, import_log=None, **kwargs):
+        super(DataFeedParser, self).__init__(cnx, **kwargs)
         self.source = source
         self.sourceuris = sourceuris
         self.import_log = import_log
@@ -305,20 +305,20 @@
         """return an entity for the given uri. May return None if it should be
         skipped
         """
-        session = self._cw
+        cnx = self._cw
         # if cwsource is specified and repository has a source with the same
         # name, call extid2eid on that source so entity will be properly seen as
         # coming from this source
         source_uri = sourceparams.pop('cwsource', None)
         if source_uri is not None and source_uri != 'system':
-            source = session.repo.sources_by_uri.get(source_uri, self.source)
+            source = cnx.repo.sources_by_uri.get(source_uri, self.source)
         else:
             source = self.source
         sourceparams['parser'] = self
         if isinstance(uri, unicode):
             uri = uri.encode('utf-8')
         try:
-            eid = session.repo.extid2eid(source, str(uri), etype, session,
+            eid = cnx.repo.extid2eid(source, str(uri), etype, cnx,
                                          sourceparams=sourceparams)
         except ValidationError as ex:
             # XXX use critical so they are seen during tests. Should consider
@@ -333,14 +333,14 @@
             # Don't give etype to entity_from_eid so we get UnknownEid if the
             # entity has been removed
             try:
-                entity = session.entity_from_eid(-eid)
+                entity = cnx.entity_from_eid(-eid)
             except UnknownEid:
                 return None
             self.notify_updated(entity) # avoid later update from the source's data
             return entity
         if self.sourceuris is not None:
             self.sourceuris.pop(str(uri), None)
-        return session.entity_from_eid(eid, etype)
+        return cnx.entity_from_eid(eid, etype)
 
     def process(self, url, raise_on_error=False):
         """main callback: process the url"""
@@ -371,7 +371,7 @@
         """
         return True
 
-    def handle_deletion(self, config, session, myuris):
+    def handle_deletion(self, config, cnx, myuris):
         if config['delete-entities'] and myuris:
             byetype = {}
             for extid, (eid, etype) in myuris.iteritems():
@@ -379,10 +379,9 @@
                     byetype.setdefault(etype, []).append(str(eid))
             for etype, eids in byetype.iteritems():
                 self.warning('delete %s %s entities', len(eids), etype)
-                session.set_cnxset()
-                session.execute('DELETE %s X WHERE X eid IN (%s)'
-                                % (etype, ','.join(eids)))
-                session.commit()
+                cnx.execute('DELETE %s X WHERE X eid IN (%s)'
+                            % (etype, ','.join(eids)))
+                cnx.commit()
 
     def update_if_necessary(self, entity, attrs):
         entity.complete(tuple(attrs))
@@ -410,13 +409,8 @@
             self.import_log.record_error(str(ex))
             return True
         error = False
-        # Check whether self._cw is a session or a connection
-        if getattr(self._cw, 'commit', None) is not None:
-            commit = self._cw.commit
-            rollback = self._cw.rollback
-        else:
-            commit = self._cw.cnx.commit
-            rollback = self._cw.cnx.rollback
+        commit = self._cw.commit
+        rollback = self._cw.rollback
         for args in parsed:
             try:
                 self.process_item(*args)
--- a/sobjects/ldapparser.py	Fri Jun 06 15:56:24 2014 +0200
+++ b/sobjects/ldapparser.py	Wed Jun 11 16:54:31 2014 +0200
@@ -92,9 +92,9 @@
         for groupdict in self.group_source_entities_by_extid.itervalues():
             self._process('CWGroup', groupdict)
 
-    def handle_deletion(self, config, session, myuris):
+    def handle_deletion(self, config, cnx, myuris):
         if config['delete-entities']:
-            super(DataFeedLDAPAdapter, self).handle_deletion(config, session, myuris)
+            super(DataFeedLDAPAdapter, self).handle_deletion(config, cnx, myuris)
             return
         if myuris:
             byetype = {}
@@ -107,9 +107,9 @@
                     continue
                 self.info('deactivate %s %s entities', len(eids), etype)
                 for eid in eids:
-                    wf = session.entity_from_eid(eid).cw_adapt_to('IWorkflowable')
+                    wf = cnx.entity_from_eid(eid).cw_adapt_to('IWorkflowable')
                     wf.fire_transition_if_possible('deactivate')
-        session.commit(free_cnxset=False)
+        cnx.commit()
 
     def update_if_necessary(self, entity, attrs):
         # disable read security to allow password selection
--- a/sobjects/test/unittest_cwxmlparser.py	Fri Jun 06 15:56:24 2014 +0200
+++ b/sobjects/test/unittest_cwxmlparser.py	Wed Jun 11 16:54:31 2014 +0200
@@ -132,13 +132,14 @@
     REMOVE THE DATABASE TEMPLATE else it won't be considered
     """
     test_db_id = 'xmlparser'
+
     @classmethod
-    def pre_setup_database(cls, session, config):
-        myfeed = session.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
+    def pre_setup_database(cls, cnx, config):
+        myfeed = cnx.create_entity('CWSource', name=u'myfeed', type=u'datafeed',
                                    parser=u'cw.entityxml', url=BASEXML)
-        myotherfeed = session.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed',
-                                            parser=u'cw.entityxml', url=OTHERXML)
-        session.commit()
+        myotherfeed = cnx.create_entity('CWSource', name=u'myotherfeed', type=u'datafeed',
+                                        parser=u'cw.entityxml', url=OTHERXML)
+        cnx.commit()
         myfeed.init_mapping([(('CWUser', 'use_email', '*'),
                               u'role=subject\naction=copy'),
                              (('CWUser', 'in_group', '*'),
@@ -153,7 +154,8 @@
                                   (('CWUser', 'in_state', '*'),
                                    u'role=subject\naction=link\nlinkattr=name'),
                                   ])
-        session.create_entity('Tag', name=u'hop')
+        cnx.create_entity('Tag', name=u'hop')
+        cnx.commit()
 
     def test_complete_url(self):
         dfsource = self.repo.sources_by_uri['myfeed']