--- a/server/session.py Tue Mar 26 15:26:29 2013 +0100
+++ b/server/session.py Wed Mar 27 19:54:36 2013 +0100
@@ -145,6 +145,142 @@
HOOKS_DENY_ALL = object()
DEFAULT_SECURITY = object() # evaluated to true by design
+class SessionClosedError(RuntimeError):
+ pass
+
+class CnxSetTracker(object):
+ """Keep track of which transaction use which cnxset.
+
+ There should be one of this object per session plus one another for
+ internal session.
+
+ Session object are responsible of creating their CnxSetTracker object.
+
+ Transaction should use the :meth:`record` and :meth:`forget` to inform the
+ tracker of cnxset they have acquired.
+
+ .. automethod:: cubicweb.server.session.CnxSetTracker.record
+ .. automethod:: cubicweb.server.session.CnxSetTracker.forget
+
+ Session use the :meth:`close` and :meth:`wait` method when closing.
+
+ .. automethod:: cubicweb.server.session.CnxSetTracker.close
+ .. automethod:: cubicweb.server.session.CnxSetTracker.wait
+
+ This object itself is threadsafe. It also requires caller to acquired its
+ lock in some situation.
+ """
+
+ def __init__(self):
+ self._active = True
+ self._condition = threading.Condition()
+ self._record = {}
+
+ def __enter__(self):
+ self._condition.__enter__()
+
+ def __exit__(self, *args):
+ self._condition.__exit__(*args)
+
+ def record(self, txid, cnxset):
+ """Inform the tracker that a txid have acquired a cnxset
+
+ This methode is to be used by Transaction object.
+
+ This method fails when:
+ - The txid already have a recorded cnxset.
+ - The tracker is not active anymore.
+
+ Notes about the caller:
+ (1) It is responsible for retrieving a cnxset.
+ (2) It must be prepared to release the cnxset if the
+ `cnxsettracker.forget` call fails.
+ (3) It should acquire the tracker lock until the very end of the operation.
+ (4) However It take care to lock the CnxSetTracker object after having
+ retrieved the cnxset to prevent deadlock.
+
+ A typical usage look like::
+
+ cnxset = repo._get_cnxset() # (1)
+ try:
+ with cnxset_tracker: # (3) and (4)
+ cnxset_tracker.record(caller.id, cnxset)
+ # (3') operation ends when caller is in expected state only
+ caller.cnxset = cnxset
+ except Exception:
+ repo._free_cnxset(cnxset) # (2)
+ raise
+ """
+ # dubious since the caller is suppose to have acquired it anyway.
+ with self._condition:
+ if not self._active:
+ raise SessionClosedError('Closed')
+ old = self._record.get(txid)
+ if old is not None:
+ raise ValueError('"%s" already have a cnx_set (%r)'
+ % (txid, old))
+ self._record[txid] = cnxset
+
+ def forget(self, txid, cnxset):
+ """Inform the tracker that a txid have release a cnxset
+
+ This methode is to be used by Transaction object.
+
+ This method fails when:
+ - The cnxset for the txid does not match the recorded one.
+
+ Notes about the caller:
+ (1) It is responsible for releasing the cnxset.
+ (2) It should acquire the tracker lock during the operation to ensure
+ the internal tracker state is always accurate regarding its own state.
+
+ A typical usage look like::
+
+ cnxset = caller.cnxset
+ try:
+ with cnxset_tracker:
+ # (2) you can not have caller.cnxset out of sync with
+ # cnxset_tracker state while unlocked
+ caller.cnxset = None
+ cnxset_tracker.forget(caller.id, cnxset)
+ finally:
+ cnxset = repo._free_cnxset(cnxset) # (1)
+ """
+ with self._condition:
+ old = self._record.get(txid, None)
+ if old is not cnxset:
+ raise ValueError('recorded cnxset for "%s" mismatch: %r != %r'
+ % (txid, old, cnxset))
+ self._record.pop(txid)
+ self._condition.notify_all()
+
+ def close(self):
+ """Marks the tracker as inactive.
+
+ This methode is to be used by Session object.
+
+ Inactive tracker does not accept new record anymore.
+ """
+ with self._condition:
+ self._active = False
+
+ def wait(self, timeout=10):
+ """Wait for all recorded cnxset to be released
+
+ This methode is to be used by Session object.
+
+ returns a tuple of transaction id that remains open.
+ """
+ with self._condition:
+ if self._active:
+ raise RuntimeError('Cannot wait on active tracker.'
+ ' Call tracker.close() first')
+ while self._record and timeout > 0:
+ start = time()
+ self._condition.wait(timeout)
+ timeout -= time() - start
+ return tuple(self._record)
+
class Transaction(object):
"""Repository Transaction
@@ -199,7 +335,7 @@
"""
- def __init__(self, txid, mode, rewriter):
+ def __init__(self, txid, cnxset_tracker, mode, rewriter):
#: transaction unique id
self.transactionid = txid
#: reentrance handling
@@ -208,7 +344,9 @@
#: connection handling mode
self.mode = mode
#: connection set used to execute queries on sources
- self.cnxset = None
+ self._cnxset = None
+ #: CnxSetTracker used to report cnxset usage
+ self._cnxset_tracker = cnxset_tracker
#: is this transaction from a client or internal to the repo
self.running_dbapi_query = True
@@ -246,6 +384,25 @@
#: (None, 'precommit', 'postcommit', 'uncommitable')
self.commit_state = None
self.pruned_hooks_cache = {}
+ # Connection Set Management ###############################################
+ @property
+ def cnxset(self):
+ return self._cnxset
+
+ @cnxset.setter
+ def cnxset(self, new_cnxset):
+ with self._cnxset_tracker:
+ old_cnxset = self._cnxset
+ if new_cnxset is old_cnxset:
+ return #nothing to do
+ if old_cnxset is not None:
+ self._cnxset = None
+ self.ctx_count -= 1
+ self._cnxset_tracker.forget(self.transactionid, old_cnxset)
+ if new_cnxset is not None:
+ self._cnxset_tracker.record(self.transactionid, new_cnxset)
+ self._cnxset = new_cnxset
+ self.ctx_count += 1
# Entity cache management #################################################
#
@@ -439,10 +596,6 @@
refers to the proper instance of :class:`Transaction` according to the
transaction.
- :attr:`_threads_in_transaction` is a set of (thread, connections set)
- referencing threads that currently hold a connections set for the session.
- .. automethod:: cubicweb.server.session.transaction
-
You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`,
simply access transaction data transparently through the :attr:`_tx`
property. Also, you usually don't have to access it directly since current
@@ -556,7 +709,7 @@
self._txs = {}
# Data local to the thread
self.__threaddata = threading.local()
- self._threads_in_transaction = set()
+ self._cnxset_tracker = CnxSetTracker()
self._closed = False
self._lock = threading.RLock()
@@ -573,7 +726,8 @@
tx = self._txs[txid]
except KeyError:
rewriter = RQLRewriter(self)
- tx = Transaction(txid, self.default_mode, rewriter)
+ tx = Transaction(txid, self._cnxset_tracker, self.default_mode,
+ rewriter)
self._txs[txid] = tx
return tx
@@ -613,9 +767,6 @@
session = Session(user, self.repo)
tx = session._tx
tx.cnxset = self.cnxset
- # we attributed a connections set, need to update ctx_count else it will be freed
- # while undesired
- tx.ctx_count = 1
# share pending_operations, else operation added in the hi-jacked
# session such as SendMailOp won't ever be processed
tx.pending_operations = self.pending_operations
@@ -901,49 +1052,37 @@
def set_cnxset(self):
"""the session need a connections set to execute some queries"""
- with self._lock:
+ with self._lock: # can probably be removed
if self._closed:
self.free_cnxset(True)
raise Exception('try to set connections set on a closed session %s' % self.id)
- if self.cnxset is None:
- # get connections set first to avoid race-condition
- self._tx.cnxset = cnxset = self.repo._get_cnxset()
- self._tx.ctx_count += 1
+ tx = self._tx
+ if tx.cnxset is None:
+ cnxset = self.repo._get_cnxset()
try:
- cnxset.cnxset_set()
- except Exception:
- self._tx.cnxset = None
+ self._tx.cnxset = cnxset
+ try:
+ cnxset.cnxset_set()
+ except:
+ self._tx.cnxset = None
+ raise
+ except:
self.repo._free_cnxset(cnxset)
raise
- self._threads_in_transaction.add(
- (threading.currentThread(), cnxset) )
- return self._tx.cnxset
-
- def _free_thread_cnxset(self, thread, cnxset, force_close=False):
- try:
- self._threads_in_transaction.remove( (thread, cnxset) )
- except KeyError:
- # race condition on cnxset freeing (freed by commit or rollback vs
- # close)
- pass
- else:
- if force_close:
- cnxset.reconnect()
- else:
- cnxset.cnxset_freed()
- # free cnxset once everything is done to avoid race-condition
- self.repo._free_cnxset(cnxset)
+ return tx.cnxset
def free_cnxset(self, ignoremode=False):
"""the session is no longer using its connections set, at least for some time"""
# cnxset may be none if no operation has been done since last commit
# or rollback
- cnxset = self._tx.cnxset
+ tx = self._tx
+ cnxset = tx.cnxset
if cnxset is not None and (ignoremode or self.mode == 'read'):
- # even in read mode, we must release the current transaction
- self._free_thread_cnxset(threading.currentThread(), cnxset)
- self._tx.cnxset = None
- self._tx.ctx_count -= 1
+ try:
+ tx.cnxset = None
+ finally:
+ cnxset.cnxset_freed()
+ self.repo._free_cnxset(cnxset)
def _touch(self):
"""update latest session usage timestamp and reset mode to read"""
@@ -1174,27 +1313,35 @@
self._clear_thread_data(free_cnxset)
def close(self):
- """do not close connections set on session close, since they are shared now"""
+ # do not close connections set on session close, since they are shared now
+ tracker = self._cnxset_tracker
with self._lock:
self._closed = True
- # copy since _threads_in_transaction maybe modified while waiting
- for thread, cnxset in self._threads_in_transaction.copy():
- if thread is threading.currentThread():
- continue
- self.info('waiting for thread %s', thread)
- # do this loop/break instead of a simple join(10) in case thread is
- # the main thread (in which case it will be removed from
- # self._threads_in_transaction but still be alive...)
- for i in xrange(10):
- thread.join(1)
- if not (thread.isAlive() and
- (thread, cnxset) in self._threads_in_transaction):
- break
- else:
- self.error('thread %s still alive after 10 seconds, will close '
- 'session anyway', thread)
- self._free_thread_cnxset(thread, cnxset, force_close=True)
+ tracker.close()
self.rollback()
+ self.info('waiting for open transaction of session: %s', self)
+ timeout = 10
+ pendings = tracker.wait(timeout)
+ if pendings:
+ self.error('%i transaction still alive after 10 seconds, will close '
+ 'session anyway', len(pendings))
+ for txid in pendings:
+ tx = self._txs.get(txid)
+ if tx is not None:
+ # drop tx.cnxset
+ with tracker:
+ try:
+ cnxset = tx.cnxset
+ if cnxset is None:
+ continue
+ tx.cnxset = None
+ except RuntimeError:
+ msg = 'issue while force free of cnxset in %s'
+ self.error(msg, tx)
+ # cnxset.reconnect() do an hard reset of the cnxset
+ # it force it to be freed
+ cnxset.reconnect()
+ self.repo._free_cnxset(cnxset)
del self.__threaddata
del self._txs