diff -r 538852f69b48 -r 9ee4d0c65ad2 server/session.py --- a/server/session.py Tue Mar 26 15:26:29 2013 +0100 +++ b/server/session.py Wed Mar 27 19:54:36 2013 +0100 @@ -145,6 +145,142 @@ HOOKS_DENY_ALL = object() DEFAULT_SECURITY = object() # evaluated to true by design +class SessionClosedError(RuntimeError): + pass + +class CnxSetTracker(object): + """Keep track of which transaction use which cnxset. + + There should be one of this object per session plus one another for + internal session. + + Session object are responsible of creating their CnxSetTracker object. + + Transaction should use the :meth:`record` and :meth:`forget` to inform the + tracker of cnxset they have acquired. + + .. automethod:: cubicweb.server.session.CnxSetTracker.record + .. automethod:: cubicweb.server.session.CnxSetTracker.forget + + Session use the :meth:`close` and :meth:`wait` method when closing. + + .. automethod:: cubicweb.server.session.CnxSetTracker.close + .. automethod:: cubicweb.server.session.CnxSetTracker.wait + + This object itself is threadsafe. It also requires caller to acquired its + lock in some situation. + """ + + def __init__(self): + self._active = True + self._condition = threading.Condition() + self._record = {} + + def __enter__(self): + self._condition.__enter__() + + def __exit__(self, *args): + self._condition.__exit__(*args) + + def record(self, txid, cnxset): + """Inform the tracker that a txid have acquired a cnxset + + This methode is to be used by Transaction object. + + This method fails when: + - The txid already have a recorded cnxset. + - The tracker is not active anymore. + + Notes about the caller: + (1) It is responsible for retrieving a cnxset. + (2) It must be prepared to release the cnxset if the + `cnxsettracker.forget` call fails. + (3) It should acquire the tracker lock until the very end of the operation. + (4) However It take care to lock the CnxSetTracker object after having + retrieved the cnxset to prevent deadlock. + + A typical usage look like:: + + cnxset = repo._get_cnxset() # (1) + try: + with cnxset_tracker: # (3) and (4) + cnxset_tracker.record(caller.id, cnxset) + # (3') operation ends when caller is in expected state only + caller.cnxset = cnxset + except Exception: + repo._free_cnxset(cnxset) # (2) + raise + """ + # dubious since the caller is suppose to have acquired it anyway. + with self._condition: + if not self._active: + raise SessionClosedError('Closed') + old = self._record.get(txid) + if old is not None: + raise ValueError('"%s" already have a cnx_set (%r)' + % (txid, old)) + self._record[txid] = cnxset + + def forget(self, txid, cnxset): + """Inform the tracker that a txid have release a cnxset + + This methode is to be used by Transaction object. + + This method fails when: + - The cnxset for the txid does not match the recorded one. + + Notes about the caller: + (1) It is responsible for releasing the cnxset. + (2) It should acquire the tracker lock during the operation to ensure + the internal tracker state is always accurate regarding its own state. + + A typical usage look like:: + + cnxset = caller.cnxset + try: + with cnxset_tracker: + # (2) you can not have caller.cnxset out of sync with + # cnxset_tracker state while unlocked + caller.cnxset = None + cnxset_tracker.forget(caller.id, cnxset) + finally: + cnxset = repo._free_cnxset(cnxset) # (1) + """ + with self._condition: + old = self._record.get(txid, None) + if old is not cnxset: + raise ValueError('recorded cnxset for "%s" mismatch: %r != %r' + % (txid, old, cnxset)) + self._record.pop(txid) + self._condition.notify_all() + + def close(self): + """Marks the tracker as inactive. + + This methode is to be used by Session object. + + Inactive tracker does not accept new record anymore. + """ + with self._condition: + self._active = False + + def wait(self, timeout=10): + """Wait for all recorded cnxset to be released + + This methode is to be used by Session object. + + returns a tuple of transaction id that remains open. + """ + with self._condition: + if self._active: + raise RuntimeError('Cannot wait on active tracker.' + ' Call tracker.close() first') + while self._record and timeout > 0: + start = time() + self._condition.wait(timeout) + timeout -= time() - start + return tuple(self._record) + class Transaction(object): """Repository Transaction @@ -199,7 +335,7 @@ """ - def __init__(self, txid, mode, rewriter): + def __init__(self, txid, cnxset_tracker, mode, rewriter): #: transaction unique id self.transactionid = txid #: reentrance handling @@ -208,7 +344,9 @@ #: connection handling mode self.mode = mode #: connection set used to execute queries on sources - self.cnxset = None + self._cnxset = None + #: CnxSetTracker used to report cnxset usage + self._cnxset_tracker = cnxset_tracker #: is this transaction from a client or internal to the repo self.running_dbapi_query = True @@ -246,6 +384,25 @@ #: (None, 'precommit', 'postcommit', 'uncommitable') self.commit_state = None self.pruned_hooks_cache = {} + # Connection Set Management ############################################### + @property + def cnxset(self): + return self._cnxset + + @cnxset.setter + def cnxset(self, new_cnxset): + with self._cnxset_tracker: + old_cnxset = self._cnxset + if new_cnxset is old_cnxset: + return #nothing to do + if old_cnxset is not None: + self._cnxset = None + self.ctx_count -= 1 + self._cnxset_tracker.forget(self.transactionid, old_cnxset) + if new_cnxset is not None: + self._cnxset_tracker.record(self.transactionid, new_cnxset) + self._cnxset = new_cnxset + self.ctx_count += 1 # Entity cache management ################################################# # @@ -439,10 +596,6 @@ refers to the proper instance of :class:`Transaction` according to the transaction. - :attr:`_threads_in_transaction` is a set of (thread, connections set) - referencing threads that currently hold a connections set for the session. - .. automethod:: cubicweb.server.session.transaction - You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`, simply access transaction data transparently through the :attr:`_tx` property. Also, you usually don't have to access it directly since current @@ -556,7 +709,7 @@ self._txs = {} # Data local to the thread self.__threaddata = threading.local() - self._threads_in_transaction = set() + self._cnxset_tracker = CnxSetTracker() self._closed = False self._lock = threading.RLock() @@ -573,7 +726,8 @@ tx = self._txs[txid] except KeyError: rewriter = RQLRewriter(self) - tx = Transaction(txid, self.default_mode, rewriter) + tx = Transaction(txid, self._cnxset_tracker, self.default_mode, + rewriter) self._txs[txid] = tx return tx @@ -613,9 +767,6 @@ session = Session(user, self.repo) tx = session._tx tx.cnxset = self.cnxset - # we attributed a connections set, need to update ctx_count else it will be freed - # while undesired - tx.ctx_count = 1 # share pending_operations, else operation added in the hi-jacked # session such as SendMailOp won't ever be processed tx.pending_operations = self.pending_operations @@ -901,49 +1052,37 @@ def set_cnxset(self): """the session need a connections set to execute some queries""" - with self._lock: + with self._lock: # can probably be removed if self._closed: self.free_cnxset(True) raise Exception('try to set connections set on a closed session %s' % self.id) - if self.cnxset is None: - # get connections set first to avoid race-condition - self._tx.cnxset = cnxset = self.repo._get_cnxset() - self._tx.ctx_count += 1 + tx = self._tx + if tx.cnxset is None: + cnxset = self.repo._get_cnxset() try: - cnxset.cnxset_set() - except Exception: - self._tx.cnxset = None + self._tx.cnxset = cnxset + try: + cnxset.cnxset_set() + except: + self._tx.cnxset = None + raise + except: self.repo._free_cnxset(cnxset) raise - self._threads_in_transaction.add( - (threading.currentThread(), cnxset) ) - return self._tx.cnxset - - def _free_thread_cnxset(self, thread, cnxset, force_close=False): - try: - self._threads_in_transaction.remove( (thread, cnxset) ) - except KeyError: - # race condition on cnxset freeing (freed by commit or rollback vs - # close) - pass - else: - if force_close: - cnxset.reconnect() - else: - cnxset.cnxset_freed() - # free cnxset once everything is done to avoid race-condition - self.repo._free_cnxset(cnxset) + return tx.cnxset def free_cnxset(self, ignoremode=False): """the session is no longer using its connections set, at least for some time""" # cnxset may be none if no operation has been done since last commit # or rollback - cnxset = self._tx.cnxset + tx = self._tx + cnxset = tx.cnxset if cnxset is not None and (ignoremode or self.mode == 'read'): - # even in read mode, we must release the current transaction - self._free_thread_cnxset(threading.currentThread(), cnxset) - self._tx.cnxset = None - self._tx.ctx_count -= 1 + try: + tx.cnxset = None + finally: + cnxset.cnxset_freed() + self.repo._free_cnxset(cnxset) def _touch(self): """update latest session usage timestamp and reset mode to read""" @@ -1174,27 +1313,35 @@ self._clear_thread_data(free_cnxset) def close(self): - """do not close connections set on session close, since they are shared now""" + # do not close connections set on session close, since they are shared now + tracker = self._cnxset_tracker with self._lock: self._closed = True - # copy since _threads_in_transaction maybe modified while waiting - for thread, cnxset in self._threads_in_transaction.copy(): - if thread is threading.currentThread(): - continue - self.info('waiting for thread %s', thread) - # do this loop/break instead of a simple join(10) in case thread is - # the main thread (in which case it will be removed from - # self._threads_in_transaction but still be alive...) - for i in xrange(10): - thread.join(1) - if not (thread.isAlive() and - (thread, cnxset) in self._threads_in_transaction): - break - else: - self.error('thread %s still alive after 10 seconds, will close ' - 'session anyway', thread) - self._free_thread_cnxset(thread, cnxset, force_close=True) + tracker.close() self.rollback() + self.info('waiting for open transaction of session: %s', self) + timeout = 10 + pendings = tracker.wait(timeout) + if pendings: + self.error('%i transaction still alive after 10 seconds, will close ' + 'session anyway', len(pendings)) + for txid in pendings: + tx = self._txs.get(txid) + if tx is not None: + # drop tx.cnxset + with tracker: + try: + cnxset = tx.cnxset + if cnxset is None: + continue + tx.cnxset = None + except RuntimeError: + msg = 'issue while force free of cnxset in %s' + self.error(msg, tx) + # cnxset.reconnect() do an hard reset of the cnxset + # it force it to be freed + cnxset.reconnect() + self.repo._free_cnxset(cnxset) del self.__threaddata del self._txs