[session] use a dedicated class to track cnxset
authorPierre-Yves David <pierre-yves.david@logilab.fr>
Wed, 27 Mar 2013 19:54:36 +0100
changeset 8809 9ee4d0c65ad2
parent 8808 538852f69b48
child 8810 1a25bdd49f9a
[session] use a dedicated class to track cnxset We introduce a new CnxSetTracker to track `cnxset` used by Transaction and allows to wait for them. This new class does not use Thread ID not thread joining to work. This allows to use multiple transaction per thread and a transaction in multiple thread. The class itself is totally threadsafe by the Transaction is still not thread safe. The old _threads_in_transaction attribute is dropped in favor of a new logic based on this object. The registration of cnxset used is not done by the Transaction itself. tx.cnset is a property handling the Consistency of its value with the CnxSetTracker instance. Note: The CnxSetTracker instance only track transaction id, not transaction itself, So not reference cycle are created.
--- a/devtools/repotest.py	Tue Mar 26 15:26:29 2013 +0100
+++ b/devtools/repotest.py	Wed Mar 27 19:54:36 2013 +0100
@@ -311,7 +311,8 @@
             del self.repo.sources_by_uri[source.uri]
         for session in self._dumb_sessions:
-            session._tx.cnxset = None
+            if session._tx.cnxset is not None:
+                session._tx.cnxset = None
     def _prepare_plan(self, rql, kwargs=None):
--- a/server/session.py	Tue Mar 26 15:26:29 2013 +0100
+++ b/server/session.py	Wed Mar 27 19:54:36 2013 +0100
@@ -145,6 +145,142 @@
 HOOKS_DENY_ALL = object()
 DEFAULT_SECURITY = object() # evaluated to true by design
+class SessionClosedError(RuntimeError):
+    pass
+class CnxSetTracker(object):
+    """Keep track of which transaction use which cnxset.
+    There should be one of this object per session plus one another for
+    internal session.
+    Session object are responsible of creating their CnxSetTracker object.
+    Transaction should use the :meth:`record` and :meth:`forget` to inform the
+    tracker of cnxset they have acquired.
+    .. automethod:: cubicweb.server.session.CnxSetTracker.record
+    .. automethod:: cubicweb.server.session.CnxSetTracker.forget
+    Session use the :meth:`close` and :meth:`wait` method when closing.
+    .. automethod:: cubicweb.server.session.CnxSetTracker.close
+    .. automethod:: cubicweb.server.session.CnxSetTracker.wait
+    This object itself is threadsafe. It also requires caller to acquired its
+    lock in some situation.
+    """
+    def __init__(self):
+        self._active = True
+        self._condition = threading.Condition()
+        self._record = {}
+    def __enter__(self):
+        self._condition.__enter__()
+    def __exit__(self, *args):
+        self._condition.__exit__(*args)
+    def record(self, txid, cnxset):
+        """Inform the tracker that a txid have acquired a cnxset
+        This methode is to be used by Transaction object.
+        This method fails when:
+        - The txid already have a recorded cnxset.
+        - The tracker is not active anymore.
+        Notes about the caller:
+        (1) It is responsible for retrieving a cnxset.
+        (2) It must be prepared to release the cnxset if the
+            `cnxsettracker.forget` call fails.
+        (3) It should acquire the tracker lock until the very end of the operation.
+        (4) However It take care to lock the CnxSetTracker object after having
+            retrieved the cnxset to prevent deadlock.
+        A typical usage look like::
+        cnxset = repo._get_cnxset() # (1)
+        try:
+            with cnxset_tracker: # (3) and (4)
+                cnxset_tracker.record(caller.id, cnxset)
+                # (3') operation ends when caller is in expected state only
+                caller.cnxset = cnxset
+        except Exception:
+            repo._free_cnxset(cnxset) # (2)
+            raise
+        """
+        # dubious since the caller is suppose to have acquired it anyway.
+        with self._condition:
+            if not self._active:
+                raise SessionClosedError('Closed')
+            old = self._record.get(txid)
+            if old is not None:
+                raise ValueError('"%s" already have a cnx_set (%r)'
+                                 % (txid, old))
+            self._record[txid] = cnxset
+    def forget(self, txid, cnxset):
+        """Inform the tracker that a txid have release a cnxset
+        This methode is to be used by Transaction object.
+        This method fails when:
+        - The cnxset for the txid does not match the recorded one.
+        Notes about the caller:
+        (1) It is responsible for releasing the cnxset.
+        (2) It should acquire the tracker lock during the operation to ensure
+            the internal tracker state is always accurate regarding its own state.
+        A typical usage look like::
+        cnxset = caller.cnxset
+        try:
+            with cnxset_tracker:
+                # (2) you can not have caller.cnxset out of sync with
+                #     cnxset_tracker state while unlocked
+                caller.cnxset = None
+                cnxset_tracker.forget(caller.id, cnxset)
+        finally:
+            cnxset = repo._free_cnxset(cnxset) # (1)
+        """
+        with self._condition:
+            old = self._record.get(txid, None)
+            if old is not cnxset:
+                raise ValueError('recorded cnxset for "%s" mismatch: %r != %r'
+                                 % (txid, old, cnxset))
+            self._record.pop(txid)
+            self._condition.notify_all()
+    def close(self):
+        """Marks the tracker as inactive.
+        This methode is to be used by Session object.
+        Inactive tracker does not accept new record anymore.
+        """
+        with self._condition:
+            self._active = False
+    def wait(self, timeout=10):
+        """Wait for all recorded cnxset to be released
+        This methode is to be used by Session object.
+        returns a tuple of transaction id that remains open.
+        """
+        with self._condition:
+            if  self._active:
+                raise RuntimeError('Cannot wait on active tracker.'
+                                   ' Call tracker.close() first')
+            while self._record and timeout > 0:
+                start = time()
+                self._condition.wait(timeout)
+                timeout -= time() - start
+            return tuple(self._record)
 class Transaction(object):
     """Repository Transaction
@@ -199,7 +335,7 @@
-    def __init__(self, txid, mode, rewriter):
+    def __init__(self, txid, cnxset_tracker, mode, rewriter):
         #: transaction unique id
         self.transactionid = txid
         #: reentrance handling
@@ -208,7 +344,9 @@
         #: connection handling mode
         self.mode = mode
         #: connection set used to execute queries on sources
-        self.cnxset = None
+        self._cnxset = None
+        #: CnxSetTracker used to report cnxset usage
+        self._cnxset_tracker = cnxset_tracker
         #: is this transaction from a client or internal to the repo
         self.running_dbapi_query = True
@@ -246,6 +384,25 @@
         #: (None, 'precommit', 'postcommit', 'uncommitable')
         self.commit_state = None
         self.pruned_hooks_cache = {}
+    # Connection Set Management ###############################################
+    @property
+    def cnxset(self):
+        return self._cnxset
+    @cnxset.setter
+    def cnxset(self, new_cnxset):
+        with self._cnxset_tracker:
+            old_cnxset = self._cnxset
+            if new_cnxset is old_cnxset:
+                return #nothing to do
+            if old_cnxset is not None:
+                self._cnxset = None
+                self.ctx_count -= 1
+                self._cnxset_tracker.forget(self.transactionid, old_cnxset)
+            if new_cnxset is not None:
+                self._cnxset_tracker.record(self.transactionid, new_cnxset)
+                self._cnxset = new_cnxset
+                self.ctx_count += 1
     # Entity cache management #################################################
@@ -439,10 +596,6 @@
       refers to the proper instance of :class:`Transaction` according to the
-      :attr:`_threads_in_transaction` is a set of (thread, connections set)
-      referencing threads that currently hold a connections set for the session.
-    .. automethod:: cubicweb.server.session.transaction
     You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`,
     simply access transaction data transparently through the :attr:`_tx`
     property. Also, you usually don't have to access it directly since current
@@ -556,7 +709,7 @@
         self._txs = {}
         # Data local to the thread
         self.__threaddata = threading.local()
-        self._threads_in_transaction = set()
+        self._cnxset_tracker = CnxSetTracker()
         self._closed = False
         self._lock = threading.RLock()
@@ -573,7 +726,8 @@
                 tx = self._txs[txid]
             except KeyError:
                 rewriter = RQLRewriter(self)
-                tx = Transaction(txid, self.default_mode, rewriter)
+                tx = Transaction(txid, self._cnxset_tracker, self.default_mode,
+                                 rewriter)
                 self._txs[txid] = tx
         return tx
@@ -613,9 +767,6 @@
         session = Session(user, self.repo)
         tx = session._tx
         tx.cnxset = self.cnxset
-        # we attributed a connections set, need to update ctx_count else it will be freed
-        # while undesired
-        tx.ctx_count = 1
         # share pending_operations, else operation added in the hi-jacked
         # session such as SendMailOp won't ever be processed
         tx.pending_operations = self.pending_operations
@@ -901,49 +1052,37 @@
     def set_cnxset(self):
         """the session need a connections set to execute some queries"""
-        with self._lock:
+        with self._lock: # can probably be removed
             if self._closed:
                 raise Exception('try to set connections set on a closed session %s' % self.id)
-            if self.cnxset is None:
-                # get connections set first to avoid race-condition
-                self._tx.cnxset = cnxset = self.repo._get_cnxset()
-                self._tx.ctx_count += 1
+            tx = self._tx
+            if tx.cnxset is None:
+                cnxset = self.repo._get_cnxset()
-                    cnxset.cnxset_set()
-                except Exception:
-                    self._tx.cnxset = None
+                    self._tx.cnxset = cnxset
+                    try:
+                        cnxset.cnxset_set()
+                    except:
+                        self._tx.cnxset = None
+                        raise
+                except:
-                self._threads_in_transaction.add(
-                    (threading.currentThread(), cnxset) )
-            return self._tx.cnxset
-    def _free_thread_cnxset(self, thread, cnxset, force_close=False):
-        try:
-            self._threads_in_transaction.remove( (thread, cnxset) )
-        except KeyError:
-            # race condition on cnxset freeing (freed by commit or rollback vs
-            # close)
-            pass
-        else:
-            if force_close:
-                cnxset.reconnect()
-            else:
-                cnxset.cnxset_freed()
-            # free cnxset once everything is done to avoid race-condition
-            self.repo._free_cnxset(cnxset)
+        return tx.cnxset
     def free_cnxset(self, ignoremode=False):
         """the session is no longer using its connections set, at least for some time"""
         # cnxset may be none if no operation has been done since last commit
         # or rollback
-        cnxset = self._tx.cnxset
+        tx = self._tx
+        cnxset = tx.cnxset
         if cnxset is not None and (ignoremode or self.mode == 'read'):
-            # even in read mode, we must release the current transaction
-            self._free_thread_cnxset(threading.currentThread(), cnxset)
-            self._tx.cnxset = None
-            self._tx.ctx_count -= 1
+            try:
+                tx.cnxset = None
+            finally:
+                cnxset.cnxset_freed()
+                self.repo._free_cnxset(cnxset)
     def _touch(self):
         """update latest session usage timestamp and reset mode to read"""
@@ -1174,27 +1313,35 @@
     def close(self):
-        """do not close connections set on session close, since they are shared now"""
+        # do not close connections set on session close, since they are shared now
+        tracker = self._cnxset_tracker
         with self._lock:
             self._closed = True
-        # copy since _threads_in_transaction maybe modified while waiting
-        for thread, cnxset in self._threads_in_transaction.copy():
-            if thread is threading.currentThread():
-                continue
-            self.info('waiting for thread %s', thread)
-            # do this loop/break instead of a simple join(10) in case thread is
-            # the main thread (in which case it will be removed from
-            # self._threads_in_transaction but still be alive...)
-            for i in xrange(10):
-                thread.join(1)
-                if not (thread.isAlive() and
-                        (thread, cnxset) in self._threads_in_transaction):
-                    break
-            else:
-                self.error('thread %s still alive after 10 seconds, will close '
-                           'session anyway', thread)
-                self._free_thread_cnxset(thread, cnxset, force_close=True)
+        tracker.close()
+        self.info('waiting for open transaction of session: %s', self)
+        timeout = 10
+        pendings = tracker.wait(timeout)
+        if pendings:
+            self.error('%i transaction still alive after 10 seconds, will close '
+                       'session anyway', len(pendings))
+            for txid in pendings:
+                tx = self._txs.get(txid)
+                if tx is not None:
+                    # drop tx.cnxset
+                    with tracker:
+                        try:
+                            cnxset = tx.cnxset
+                            if cnxset is None:
+                                continue
+                            tx.cnxset = None
+                        except RuntimeError:
+                            msg = 'issue while force free of cnxset in %s'
+                            self.error(msg, tx)
+                    # cnxset.reconnect() do an hard reset of the cnxset
+                    # it force it to be freed
+                    cnxset.reconnect()
+                    self.repo._free_cnxset(cnxset)
         del self.__threaddata
         del self._txs