server/session.py
changeset 8809 9ee4d0c65ad2
parent 8808 538852f69b48
child 8810 1a25bdd49f9a
--- a/server/session.py	Tue Mar 26 15:26:29 2013 +0100
+++ b/server/session.py	Wed Mar 27 19:54:36 2013 +0100
@@ -145,6 +145,142 @@
 HOOKS_DENY_ALL = object()
 DEFAULT_SECURITY = object() # evaluated to true by design
 
+class SessionClosedError(RuntimeError):
+    pass
+
+class CnxSetTracker(object):
+    """Keep track of which transaction use which cnxset.
+
+    There should be one of this object per session plus one another for
+    internal session.
+
+    Session object are responsible of creating their CnxSetTracker object.
+
+    Transaction should use the :meth:`record` and :meth:`forget` to inform the
+    tracker of cnxset they have acquired.
+
+    .. automethod:: cubicweb.server.session.CnxSetTracker.record
+    .. automethod:: cubicweb.server.session.CnxSetTracker.forget
+
+    Session use the :meth:`close` and :meth:`wait` method when closing.
+
+    .. automethod:: cubicweb.server.session.CnxSetTracker.close
+    .. automethod:: cubicweb.server.session.CnxSetTracker.wait
+
+    This object itself is threadsafe. It also requires caller to acquired its
+    lock in some situation.
+    """
+
+    def __init__(self):
+        self._active = True
+        self._condition = threading.Condition()
+        self._record = {}
+
+    def __enter__(self):
+        self._condition.__enter__()
+
+    def __exit__(self, *args):
+        self._condition.__exit__(*args)
+
+    def record(self, txid, cnxset):
+        """Inform the tracker that a txid have acquired a cnxset
+
+        This methode is to be used by Transaction object.
+
+        This method fails when:
+        - The txid already have a recorded cnxset.
+        - The tracker is not active anymore.
+
+        Notes about the caller:
+        (1) It is responsible for retrieving a cnxset.
+        (2) It must be prepared to release the cnxset if the
+            `cnxsettracker.forget` call fails.
+        (3) It should acquire the tracker lock until the very end of the operation.
+        (4) However It take care to lock the CnxSetTracker object after having
+            retrieved the cnxset to prevent deadlock.
+
+        A typical usage look like::
+
+        cnxset = repo._get_cnxset() # (1)
+        try:
+            with cnxset_tracker: # (3) and (4)
+                cnxset_tracker.record(caller.id, cnxset)
+                # (3') operation ends when caller is in expected state only
+                caller.cnxset = cnxset
+        except Exception:
+            repo._free_cnxset(cnxset) # (2)
+            raise
+        """
+        # dubious since the caller is suppose to have acquired it anyway.
+        with self._condition:
+            if not self._active:
+                raise SessionClosedError('Closed')
+            old = self._record.get(txid)
+            if old is not None:
+                raise ValueError('"%s" already have a cnx_set (%r)'
+                                 % (txid, old))
+            self._record[txid] = cnxset
+
+    def forget(self, txid, cnxset):
+        """Inform the tracker that a txid have release a cnxset
+
+        This methode is to be used by Transaction object.
+
+        This method fails when:
+        - The cnxset for the txid does not match the recorded one.
+
+        Notes about the caller:
+        (1) It is responsible for releasing the cnxset.
+        (2) It should acquire the tracker lock during the operation to ensure
+            the internal tracker state is always accurate regarding its own state.
+
+        A typical usage look like::
+
+        cnxset = caller.cnxset
+        try:
+            with cnxset_tracker:
+                # (2) you can not have caller.cnxset out of sync with
+                #     cnxset_tracker state while unlocked
+                caller.cnxset = None
+                cnxset_tracker.forget(caller.id, cnxset)
+        finally:
+            cnxset = repo._free_cnxset(cnxset) # (1)
+        """
+        with self._condition:
+            old = self._record.get(txid, None)
+            if old is not cnxset:
+                raise ValueError('recorded cnxset for "%s" mismatch: %r != %r'
+                                 % (txid, old, cnxset))
+            self._record.pop(txid)
+            self._condition.notify_all()
+
+    def close(self):
+        """Marks the tracker as inactive.
+
+        This methode is to be used by Session object.
+
+        Inactive tracker does not accept new record anymore.
+        """
+        with self._condition:
+            self._active = False
+
+    def wait(self, timeout=10):
+        """Wait for all recorded cnxset to be released
+
+        This methode is to be used by Session object.
+
+        returns a tuple of transaction id that remains open.
+        """
+        with self._condition:
+            if  self._active:
+                raise RuntimeError('Cannot wait on active tracker.'
+                                   ' Call tracker.close() first')
+            while self._record and timeout > 0:
+                start = time()
+                self._condition.wait(timeout)
+                timeout -= time() - start
+            return tuple(self._record)
+
 class Transaction(object):
     """Repository Transaction
 
@@ -199,7 +335,7 @@
 
     """
 
-    def __init__(self, txid, mode, rewriter):
+    def __init__(self, txid, cnxset_tracker, mode, rewriter):
         #: transaction unique id
         self.transactionid = txid
         #: reentrance handling
@@ -208,7 +344,9 @@
         #: connection handling mode
         self.mode = mode
         #: connection set used to execute queries on sources
-        self.cnxset = None
+        self._cnxset = None
+        #: CnxSetTracker used to report cnxset usage
+        self._cnxset_tracker = cnxset_tracker
         #: is this transaction from a client or internal to the repo
         self.running_dbapi_query = True
 
@@ -246,6 +384,25 @@
         #: (None, 'precommit', 'postcommit', 'uncommitable')
         self.commit_state = None
         self.pruned_hooks_cache = {}
+    # Connection Set Management ###############################################
+    @property
+    def cnxset(self):
+        return self._cnxset
+
+    @cnxset.setter
+    def cnxset(self, new_cnxset):
+        with self._cnxset_tracker:
+            old_cnxset = self._cnxset
+            if new_cnxset is old_cnxset:
+                return #nothing to do
+            if old_cnxset is not None:
+                self._cnxset = None
+                self.ctx_count -= 1
+                self._cnxset_tracker.forget(self.transactionid, old_cnxset)
+            if new_cnxset is not None:
+                self._cnxset_tracker.record(self.transactionid, new_cnxset)
+                self._cnxset = new_cnxset
+                self.ctx_count += 1
 
     # Entity cache management #################################################
     #
@@ -439,10 +596,6 @@
       refers to the proper instance of :class:`Transaction` according to the
       transaction.
 
-      :attr:`_threads_in_transaction` is a set of (thread, connections set)
-      referencing threads that currently hold a connections set for the session.
-    .. automethod:: cubicweb.server.session.transaction
-
     You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`,
     simply access transaction data transparently through the :attr:`_tx`
     property. Also, you usually don't have to access it directly since current
@@ -556,7 +709,7 @@
         self._txs = {}
         # Data local to the thread
         self.__threaddata = threading.local()
-        self._threads_in_transaction = set()
+        self._cnxset_tracker = CnxSetTracker()
         self._closed = False
         self._lock = threading.RLock()
 
@@ -573,7 +726,8 @@
                 tx = self._txs[txid]
             except KeyError:
                 rewriter = RQLRewriter(self)
-                tx = Transaction(txid, self.default_mode, rewriter)
+                tx = Transaction(txid, self._cnxset_tracker, self.default_mode,
+                                 rewriter)
                 self._txs[txid] = tx
         return tx
 
@@ -613,9 +767,6 @@
         session = Session(user, self.repo)
         tx = session._tx
         tx.cnxset = self.cnxset
-        # we attributed a connections set, need to update ctx_count else it will be freed
-        # while undesired
-        tx.ctx_count = 1
         # share pending_operations, else operation added in the hi-jacked
         # session such as SendMailOp won't ever be processed
         tx.pending_operations = self.pending_operations
@@ -901,49 +1052,37 @@
 
     def set_cnxset(self):
         """the session need a connections set to execute some queries"""
-        with self._lock:
+        with self._lock: # can probably be removed
             if self._closed:
                 self.free_cnxset(True)
                 raise Exception('try to set connections set on a closed session %s' % self.id)
-            if self.cnxset is None:
-                # get connections set first to avoid race-condition
-                self._tx.cnxset = cnxset = self.repo._get_cnxset()
-                self._tx.ctx_count += 1
+            tx = self._tx
+            if tx.cnxset is None:
+                cnxset = self.repo._get_cnxset()
                 try:
-                    cnxset.cnxset_set()
-                except Exception:
-                    self._tx.cnxset = None
+                    self._tx.cnxset = cnxset
+                    try:
+                        cnxset.cnxset_set()
+                    except:
+                        self._tx.cnxset = None
+                        raise
+                except:
                     self.repo._free_cnxset(cnxset)
                     raise
-                self._threads_in_transaction.add(
-                    (threading.currentThread(), cnxset) )
-            return self._tx.cnxset
-
-    def _free_thread_cnxset(self, thread, cnxset, force_close=False):
-        try:
-            self._threads_in_transaction.remove( (thread, cnxset) )
-        except KeyError:
-            # race condition on cnxset freeing (freed by commit or rollback vs
-            # close)
-            pass
-        else:
-            if force_close:
-                cnxset.reconnect()
-            else:
-                cnxset.cnxset_freed()
-            # free cnxset once everything is done to avoid race-condition
-            self.repo._free_cnxset(cnxset)
+        return tx.cnxset
 
     def free_cnxset(self, ignoremode=False):
         """the session is no longer using its connections set, at least for some time"""
         # cnxset may be none if no operation has been done since last commit
         # or rollback
-        cnxset = self._tx.cnxset
+        tx = self._tx
+        cnxset = tx.cnxset
         if cnxset is not None and (ignoremode or self.mode == 'read'):
-            # even in read mode, we must release the current transaction
-            self._free_thread_cnxset(threading.currentThread(), cnxset)
-            self._tx.cnxset = None
-            self._tx.ctx_count -= 1
+            try:
+                tx.cnxset = None
+            finally:
+                cnxset.cnxset_freed()
+                self.repo._free_cnxset(cnxset)
 
     def _touch(self):
         """update latest session usage timestamp and reset mode to read"""
@@ -1174,27 +1313,35 @@
             self._clear_thread_data(free_cnxset)
 
     def close(self):
-        """do not close connections set on session close, since they are shared now"""
+        # do not close connections set on session close, since they are shared now
+        tracker = self._cnxset_tracker
         with self._lock:
             self._closed = True
-        # copy since _threads_in_transaction maybe modified while waiting
-        for thread, cnxset in self._threads_in_transaction.copy():
-            if thread is threading.currentThread():
-                continue
-            self.info('waiting for thread %s', thread)
-            # do this loop/break instead of a simple join(10) in case thread is
-            # the main thread (in which case it will be removed from
-            # self._threads_in_transaction but still be alive...)
-            for i in xrange(10):
-                thread.join(1)
-                if not (thread.isAlive() and
-                        (thread, cnxset) in self._threads_in_transaction):
-                    break
-            else:
-                self.error('thread %s still alive after 10 seconds, will close '
-                           'session anyway', thread)
-                self._free_thread_cnxset(thread, cnxset, force_close=True)
+        tracker.close()
         self.rollback()
+        self.info('waiting for open transaction of session: %s', self)
+        timeout = 10
+        pendings = tracker.wait(timeout)
+        if pendings:
+            self.error('%i transaction still alive after 10 seconds, will close '
+                       'session anyway', len(pendings))
+            for txid in pendings:
+                tx = self._txs.get(txid)
+                if tx is not None:
+                    # drop tx.cnxset
+                    with tracker:
+                        try:
+                            cnxset = tx.cnxset
+                            if cnxset is None:
+                                continue
+                            tx.cnxset = None
+                        except RuntimeError:
+                            msg = 'issue while force free of cnxset in %s'
+                            self.error(msg, tx)
+                    # cnxset.reconnect() do an hard reset of the cnxset
+                    # it force it to be freed
+                    cnxset.reconnect()
+                    self.repo._free_cnxset(cnxset)
         del self.__threaddata
         del self._txs