[connection] transparent cnx_set handling
authorPierre-Yves David <pierre-yves.david@logilab.fr>
Wed, 26 Jun 2013 11:21:39 +0200
changeset 9098 5467fb901931
parent 9097 39be1e548270
child 9099 b7f7aa1b1123
[connection] transparent cnx_set handling Connection object while take cares of there cnxset themself (as dbapi connection does). The ``set_cnxset`` and ``free_cnxset`` operation are still available for backward compatibility purpose. The ``_auto_free_cnx_set`` is introduced to handle mixed usage. A new context manager ``connection.ensure_cnx_set`` is added for code that access ``cnx.cnxset`` directly and are not wrapped in any specific ``Connection`` method. A ``_with_cnx_set`` decorator is used on all Connection method that need a cnxset.
devtools/repotest.py
repoapi.py
server/session.py
server/sources/native.py
--- a/devtools/repotest.py	Thu Jun 27 10:44:40 2013 +0200
+++ b/devtools/repotest.py	Wed Jun 26 11:21:39 2013 +0200
@@ -324,7 +324,8 @@
                 select.solutions.sort()
         else:
             rqlst.solutions.sort()
-        return self.o.plan_factory(rqlst, kwargs, self.session)
+        with self.session.ensure_cnx_set:
+            return self.o.plan_factory(rqlst, kwargs, self.session)
 
 
 # monkey patch some methods to get predicatable results #######################
--- a/repoapi.py	Thu Jun 27 10:44:40 2013 +0200
+++ b/repoapi.py	Wed Jun 26 11:21:39 2013 +0200
@@ -189,11 +189,7 @@
         session object"""
         if not self._open:
             raise ProgrammingError('Closed connection %s' % self._cnxid)
-        self._cnx.set_cnxset()
-        try:
-            yield self._cnx
-        finally:
-            self._cnx.free_cnxset()
+        yield self._cnx
 
     # Main Connection purpose in life #########################################
 
--- a/server/session.py	Thu Jun 27 10:44:40 2013 +0200
+++ b/server/session.py	Wed Jun 26 11:21:39 2013 +0200
@@ -24,6 +24,8 @@
 from uuid import uuid4
 from warnings import warn
 import json
+import functools
+from contextlib import contextmanager
 
 from logilab.common.deprecation import deprecated
 from logilab.common.textutils import unormalize
@@ -360,6 +362,16 @@
                 timeout -= time() - start
             return tuple(self._record)
 
+
+def _with_cnx_set(func):
+    """decorator for Connection method that ensure they run with a cnxset """
+    @functools.wraps(func)
+    def wrapper(cnx, *args, **kwargs):
+        with cnx.ensure_cnx_set:
+            return func(cnx, *args, **kwargs)
+    return wrapper
+
+
 class Connection(RequestSessionBase):
     """Repository Connection
 
@@ -426,6 +438,12 @@
         self.connectionid = cnxid
         #: reentrance handling
         self.ctx_count = 0
+        #: count the number of entry in a context needing a cnxset
+        self._cnxset_count = 0
+        #: Boolean for compat with the older explicite set_cnxset/free_cnx API
+        #: When a call set_cnxset is done, no automatic freeing will be done
+        #: until free_cnx is called.
+        self._auto_free_cnx_set = True
 
         #: server.Repository object
         self.repo = session.repo
@@ -541,7 +559,7 @@
                 self._cnxset = new_cnxset
                 self.ctx_count += 1
 
-    def set_cnxset(self):
+    def _set_cnxset(self):
         """the connection need a connections set to execute some queries"""
         if self.cnxset is None:
             cnxset = self.repo._get_cnxset()
@@ -557,7 +575,7 @@
                 raise
         return self.cnxset
 
-    def free_cnxset(self, ignoremode=False):
+    def _free_cnxset(self, ignoremode=False):
         """the connection is no longer using its connections set, at least for some time"""
         # cnxset may be none if no operation has been done since last commit
         # or rollback
@@ -569,6 +587,29 @@
                 cnxset.cnxset_freed()
                 self.repo._free_cnxset(cnxset)
 
+    def set_cnxset(self):
+        self._auto_free_cnx_set = False
+        return self._set_cnxset()
+
+    def free_cnxset(self, ignoremode=False):
+        self._auto_free_cnx_set = True
+        return self._free_cnxset(ignoremode=ignoremode)
+
+
+    @property
+    @contextmanager
+    def ensure_cnx_set(self):
+        assert self._cnxset_count >= 0
+        if self._cnxset_count == 0:
+            self._set_cnxset()
+        try:
+            self._cnxset_count += 1
+            yield
+        finally:
+            self._cnxset_count = max(self._cnxset_count - 1, 0)
+            if self._cnxset_count == 0 and self._auto_free_cnx_set:
+                self._free_cnxset()
+
 
     # Entity cache management #################################################
     #
@@ -869,6 +910,7 @@
     def source_defs(self):
         return self.repo.source_defs()
 
+    @_with_cnx_set
     def describe(self, eid, asdict=False):
         """return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
         metas = self.repo.type_and_source_from_eid(eid, self)
@@ -877,13 +919,14 @@
        # XXX :-1 for cw compat, use asdict=True for full information
         return metas[:-1]
 
-
+    @_with_cnx_set
     def source_from_eid(self, eid):
         """return the source where the entity with id <eid> is located"""
         return self.repo.source_from_eid(eid, self)
 
     # core method #############################################################
 
+    @_with_cnx_set
     def execute(self, rql, kwargs=None, eid_key=None, build_descr=True):
         """db-api like method directly linked to the querier execute method.
 
@@ -1018,24 +1061,22 @@
 
     # resource accessors ######################################################
 
+    @_with_cnx_set
     def call_service(self, regid, **kwargs):
         json.dumps(kwargs) # This line ensure that people use serialisable
                            # argument for call service. this is very important
                            # to enforce that from start to make sure RPC
                            # version is available.
         self.info('calling service %s', regid)
-        self.set_cnxset()
-        try:
-            service = self.vreg['services'].select(regid, self, **kwargs)
-            result = service.call(**kwargs)
-            json.dumps(result) # This line ensure that service have serialisable
-                               # output. this is very important to enforce that
-                               # from start to make sure RPC version is
-                               # available.
-            return result
-        finally:
-            self.free_cnxset()
+        service = self.vreg['services'].select(regid, self, **kwargs)
+        result = service.call(**kwargs)
+        json.dumps(result) # This line ensure that service have serialisable
+                           # output. this is very important to enforce that
+                           # from start to make sure RPC version is
+                           # available.
+        return result
 
+    @_with_cnx_set
     def system_sql(self, sql, args=None, rollback_on_failure=True):
         """return a sql cursor on the system database"""
         if sql.split(None, 1)[0].upper() != 'SELECT':
@@ -1385,6 +1426,7 @@
                 raise SessionClosedError('try to set connections set on a closed session %s' % self.id)
             return self._cnx.set_cnxset()
     free_cnxset = cnx_meth('free_cnxset')
+    ensure_cnx_set = cnx_attr('ensure_cnx_set')
 
     def _touch(self):
         """update latest session usage timestamp and reset mode to read"""
--- a/server/sources/native.py	Thu Jun 27 10:44:40 2013 +0200
+++ b/server/sources/native.py	Wed Jun 26 11:21:39 2013 +0200
@@ -977,34 +977,35 @@
 
     def add_info(self, session, entity, source, extid, complete):
         """add type and source info for an eid into the system table"""
-        # begin by inserting eid/type/source/extid into the entities table
-        if extid is not None:
-            assert isinstance(extid, str)
-            extid = b64encode(extid)
-        uri = 'system' if source.copy_based_source else source.uri
-        attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
-                 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
-        self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
-        # insert core relations: is, is_instance_of and cw_source
-        try:
-            self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
-                                         (entity.eid, eschema_eid(session, entity.e_schema)))
-        except IndexError:
-            # during schema serialization, skip
-            pass
-        else:
-            for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
-                self._handle_is_relation_sql(session,
-                                             'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
-                                             (entity.eid, eschema_eid(session, eschema)))
-        if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
-            self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
-                                         (entity.eid, source.eid))
-        # now we can update the full text index
-        if self.do_fti and self.need_fti_indexation(entity.cw_etype):
-            if complete:
-                entity.complete(entity.e_schema.indexable_attributes())
-            self.index_entity(session, entity=entity)
+        with session.ensure_cnx_set:
+            # begin by inserting eid/type/source/extid into the entities table
+            if extid is not None:
+                assert isinstance(extid, str)
+                extid = b64encode(extid)
+            uri = 'system' if source.copy_based_source else source.uri
+            attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid,
+                     'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()}
+            self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)
+            # insert core relations: is, is_instance_of and cw_source
+            try:
+                self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                             (entity.eid, eschema_eid(session, entity.e_schema)))
+            except IndexError:
+                # during schema serialization, skip
+                pass
+            else:
+                for eschema in entity.e_schema.ancestors() + [entity.e_schema]:
+                    self._handle_is_relation_sql(session,
+                                                 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                                 (entity.eid, eschema_eid(session, eschema)))
+            if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10
+                self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',
+                                             (entity.eid, source.eid))
+            # now we can update the full text index
+            if self.do_fti and self.need_fti_indexation(entity.cw_etype):
+                if complete:
+                    entity.complete(entity.e_schema.indexable_attributes())
+                self.index_entity(session, entity=entity)
 
     def update_info(self, session, entity, need_fti_update):
         """mark entity as being modified, fulltext reindex if needed"""
@@ -1227,17 +1228,19 @@
         raise `NoSuchTransaction` if there is no such transaction of if the
         session's user isn't allowed to see it.
         """
-        restr = {'tx_uuid': txuuid}
-        sql = self.sqlgen.select('transactions', restr, ('tx_time', 'tx_user'))
-        cu = self.doexec(session, sql, restr)
-        try:
-            time, ueid = cu.fetchone()
-        except TypeError:
-            raise tx.NoSuchTransaction(txuuid)
-        if not (session.user.is_in_group('managers')
-                or session.user.eid == ueid):
-            raise tx.NoSuchTransaction(txuuid)
-        return time, ueid
+        with session.ensure_cnx_set:
+            restr = {'tx_uuid': txuuid}
+            sql = self.sqlgen.select('transactions', restr,
+                                     ('tx_time', 'tx_user'))
+            cu = self.doexec(session, sql, restr)
+            try:
+                time, ueid = cu.fetchone()
+            except TypeError:
+                raise tx.NoSuchTransaction(txuuid)
+            if not (session.user.is_in_group('managers')
+                    or session.user.eid == ueid):
+                raise tx.NoSuchTransaction(txuuid)
+            return time, ueid
 
     def _reedit_entity(self, entity, changes, err):
         session = entity._cw