diff -r e1ebf3d12098 -r 8b35a898b334 server/session.py --- a/server/session.py Wed Jun 11 15:58:01 2014 +0200 +++ b/server/session.py Wed Jun 11 15:32:07 2014 +0200 @@ -19,7 +19,6 @@ __docformat__ = "restructuredtext en" import sys -import threading from time import time from uuid import uuid4 from warnings import warn @@ -171,146 +170,6 @@ class SessionClosedError(RuntimeError): pass -class CnxSetTracker(object): - """Keep track of which connection use which cnxset. - - There should be one of these objects per session (including internal sessions). - - Session objects are responsible for creating their CnxSetTracker object. - - Connections should use the :meth:`record` and :meth:`forget` to inform the - tracker of cnxsets they have acquired. - - .. automethod:: cubicweb.server.session.CnxSetTracker.record - .. automethod:: cubicweb.server.session.CnxSetTracker.forget - - Sessions use the :meth:`close` and :meth:`wait` methods when closing. - - .. automethod:: cubicweb.server.session.CnxSetTracker.close - .. automethod:: cubicweb.server.session.CnxSetTracker.wait - - This object itself is threadsafe. It also requires caller to acquired its - lock in some situation. - """ - - def __init__(self): - self._active = True - self._condition = threading.Condition() - self._record = {} - - def __enter__(self): - return self._condition.__enter__() - - def __exit__(self, *args): - return self._condition.__exit__(*args) - - def record(self, cnxid, cnxset): - """Inform the tracker that a cnxid has acquired a cnxset - - This method is to be used by Connection objects. - - This method fails when: - - The cnxid already has a recorded cnxset. - - The tracker is not active anymore. - - Notes about the caller: - (1) It is responsible for retrieving a cnxset. - (2) It must be prepared to release the cnxset if the - `cnxsettracker.forget` call fails. - (3) It should acquire the tracker lock until the very end of the operation. - (4) However it must only lock the CnxSetTracker object after having - retrieved the cnxset to prevent deadlock. - - A typical usage look like:: - - cnxset = repo._get_cnxset() # (1) - try: - with cnxset_tracker: # (3) and (4) - cnxset_tracker.record(caller.id, cnxset) - # (3') operation ends when caller is in expected state only - caller.cnxset = cnxset - except Exception: - repo._free_cnxset(cnxset) # (2) - raise - """ - # dubious since the caller is supposed to have acquired it anyway. - with self._condition: - if not self._active: - raise SessionClosedError('Closed') - old = self._record.get(cnxid) - if old is not None: - raise ValueError('connection "%s" already has a cnx_set (%r)' - % (cnxid, old)) - self._record[cnxid] = cnxset - - def forget(self, cnxid, cnxset): - """Inform the tracker that a cnxid have release a cnxset - - This methode is to be used by Connection object. - - This method fails when: - - The cnxset for the cnxid does not match the recorded one. - - Notes about the caller: - (1) It is responsible for releasing the cnxset. - (2) It should acquire the tracker lock during the operation to ensure - the internal tracker state is always accurate regarding its own state. - - A typical usage look like:: - - cnxset = caller.cnxset - try: - with cnxset_tracker: - # (2) you can not have caller.cnxset out of sync with - # cnxset_tracker state while unlocked - caller.cnxset = None - cnxset_tracker.forget(caller.id, cnxset) - finally: - cnxset = repo._free_cnxset(cnxset) # (1) - """ - with self._condition: - old = self._record.get(cnxid, None) - if old is not cnxset: - raise ValueError('recorded cnxset for "%s" mismatch: %r != %r' - % (cnxid, old, cnxset)) - self._record.pop(cnxid) - self._condition.notify_all() - - def close(self): - """Marks the tracker as inactive. - - This method is to be used by Session objects. - - An inactive tracker does not accept new records anymore. - """ - with self._condition: - self._active = False - - def wait(self, timeout=10): - """Wait for all recorded cnxsets to be released - - This method is to be used by Session objects. - - Returns a tuple of connection ids that remain open. - """ - with self._condition: - if self._active: - raise RuntimeError('Cannot wait on active tracker.' - ' Call tracker.close() first') - while self._record and timeout > 0: - start = time() - self._condition.wait(timeout) - timeout -= time() - start - return tuple(self._record) - - -def _with_cnx_set(func): - """decorator for Connection method that ensure they run with a cnxset """ - @functools.wraps(func) - def wrapper(cnx, *args, **kwargs): - with cnx.ensure_cnx_set: - return func(cnx, *args, **kwargs) - return wrapper def _open_only(func): """decorator for Connection method that check it is open""" @@ -386,11 +245,10 @@ read/write security is currently activated. """ - + mode = 'write' is_request = False hooks_in_progress = False is_repo_in_memory = True # bw compat - mode = 'read' def __init__(self, session): # using super(Connection, self) confuse some test hack @@ -402,12 +260,6 @@ self.sessionid = session.sessionid #: reentrance handling self.ctx_count = 0 - #: count the number of entry in a context needing a cnxset - self._cnxset_count = 0 - #: Boolean for compat with the older explicite set_cnxset/free_cnx API - #: When a call set_cnxset is done, no automatic freeing will be done - #: until free_cnx is called. - self._auto_free_cnx_set = True #: server.Repository object self.repo = session.repo @@ -417,10 +269,6 @@ # other session utility self._session_timestamp = session._timestamp - #: connection set used to execute queries on sources - self._cnxset = None - #: CnxSetTracker used to report cnxset usage - self._cnxset_tracker = CnxSetTracker() # internal (root) session self.is_internal_session = isinstance(session.user, InternalManager) @@ -540,13 +388,16 @@ def __enter__(self): assert self._open is None # first opening self._open = True + self.cnxset = self.repo._get_cnxset() return self def __exit__(self, exctype=None, excvalue=None, tb=None): assert self._open # actually already open - assert self._cnxset_count == 0 - self.rollback() + self.clear() self._open = False + self.cnxset.cnxset_freed() + self.repo._free_cnxset(self.cnxset) + self.cnxset = None @contextmanager def running_hooks_ops(self): @@ -604,83 +455,23 @@ self.local_perm_cache.clear() self.rewriter = RQLRewriter(self) - # Connection Set Management ############################################### - @property - @_open_only - def cnxset(self): - return self._cnxset - - @cnxset.setter - @_open_only - def cnxset(self, new_cnxset): - with self._cnxset_tracker: - old_cnxset = self._cnxset - if new_cnxset is old_cnxset: - return #nothing to do - if old_cnxset is not None: - old_cnxset.rollback() - self._cnxset = None - self.ctx_count -= 1 - self._cnxset_tracker.forget(self.connectionid, old_cnxset) - if new_cnxset is not None: - self._cnxset_tracker.record(self.connectionid, new_cnxset) - self._cnxset = new_cnxset - self.ctx_count += 1 - - @_open_only - def _set_cnxset(self): - """the connection need a connections set to execute some queries""" - if self.cnxset is None: - cnxset = self.repo._get_cnxset() - try: - self.cnxset = cnxset - except: - self.repo._free_cnxset(cnxset) - raise - return self.cnxset - - @_open_only - def _free_cnxset(self, ignoremode=False): - """the connection is no longer using its connections set, at least for some time""" - # cnxset may be none if no operation has been done since last commit - # or rollback - cnxset = self.cnxset - if cnxset is not None and (ignoremode or self.mode == 'read'): - assert self._cnxset_count == 0 - try: - self.cnxset = None - finally: - cnxset.cnxset_freed() - self.repo._free_cnxset(cnxset) - @deprecated('[3.19] cnxset are automatically managed now.' ' stop using explicit set and free.') def set_cnxset(self): - self._auto_free_cnx_set = False - return self._set_cnxset() + pass @deprecated('[3.19] cnxset are automatically managed now.' ' stop using explicit set and free.') def free_cnxset(self, ignoremode=False): - self._auto_free_cnx_set = True - return self._free_cnxset(ignoremode=ignoremode) - + pass @property @contextmanager @_open_only + @deprecated('[3.21] a cnxset is automatically set on __enter__ call now.' + ' stop using .ensure_cnx_set') def ensure_cnx_set(self): - assert self._cnxset_count >= 0 - if self._cnxset_count == 0: - self._set_cnxset() - try: - self._cnxset_count += 1 - yield - finally: - self._cnxset_count = max(self._cnxset_count - 1, 0) - if self._cnxset_count == 0 and self._auto_free_cnx_set: - self._free_cnxset() - + yield # Entity cache management ################################################# # @@ -992,7 +783,6 @@ return self.repo.source_defs() @deprecated('[3.19] use .entity_metas(eid) instead') - @_with_cnx_set @_open_only def describe(self, eid, asdict=False): """return a tuple (type, sourceuri, extid) for the entity with id """ @@ -1003,7 +793,6 @@ return metas return etype, source, extid - @_with_cnx_set @_open_only def entity_metas(self, eid): """return a tuple (type, sourceuri, extid) for the entity with id """ @@ -1012,7 +801,6 @@ # core method ############################################################# - @_with_cnx_set @_open_only def execute(self, rql, kwargs=None, build_descr=True): """db-api like method directly linked to the querier execute method. @@ -1026,21 +814,16 @@ return rset @_open_only - def rollback(self, free_cnxset=True, reset_pool=None): + def rollback(self, free_cnxset=None, reset_pool=None): """rollback the current transaction""" - if reset_pool is not None: - warn('[3.13] use free_cnxset argument instead for reset_pool', + if free_cnxset is not None: + warn('[3.21] free_cnxset is now unneeded', DeprecationWarning, stacklevel=2) - free_cnxset = reset_pool - if self._cnxset_count != 0: - # we are inside ensure_cnx_set, don't lose it - free_cnxset = False + if reset_pool is not None: + warn('[3.13] reset_pool is now unneeded', + DeprecationWarning, stacklevel=2) cnxset = self.cnxset - if cnxset is None: - self.clear() - self._session_timestamp.touch() - self.debug('rollback transaction %s done (no db activity)', self.connectionid) - return + assert cnxset is not None try: # by default, operations are executed with security turned off with self.security_enabled(False, False): @@ -1055,26 +838,18 @@ self.debug('rollback for transaction %s done', self.connectionid) finally: self._session_timestamp.touch() - if free_cnxset: - self._free_cnxset(ignoremode=True) self.clear() @_open_only - def commit(self, free_cnxset=True, reset_pool=None): + def commit(self, free_cnxset=None, reset_pool=None): """commit the current session's transaction""" - if reset_pool is not None: - warn('[3.13] use free_cnxset argument instead for reset_pool', + if free_cnxset is not None: + warn('[3.21] free_cnxset is now unneeded', DeprecationWarning, stacklevel=2) - free_cnxset = reset_pool - if self.cnxset is None: - assert not self.pending_operations - self.clear() - self._session_timestamp.touch() - self.debug('commit transaction %s done (no db activity)', self.connectionid) - return - if self._cnxset_count != 0: - # we are inside ensure_cnx_set, don't lose it - free_cnxset = False + if reset_pool is not None: + warn('[3.13] reset_pool is now unneeded', + DeprecationWarning, stacklevel=2) + assert self.cnxset is not None cstate = self.commit_state if cstate == 'uncommitable': raise QueryError('transaction must be rolled back') @@ -1134,7 +909,7 @@ # XXX use slice notation since self.pending_operations is a # read-only property. self.pending_operations[:] = processed + self.pending_operations - self.rollback(free_cnxset) + self.rollback() raise self.cnxset.commit() self.commit_state = 'postcommit' @@ -1155,25 +930,19 @@ return self.transaction_uuid(set=False) finally: self._session_timestamp.touch() - if free_cnxset: - self._free_cnxset(ignoremode=True) self.clear() # resource accessors ###################################################### - @_with_cnx_set @_open_only def call_service(self, regid, **kwargs): self.debug('calling service %s', regid) service = self.vreg['services'].select(regid, self, **kwargs) return service.call(**kwargs) - @_with_cnx_set @_open_only def system_sql(self, sql, args=None, rollback_on_failure=True): """return a sql cursor on the system database""" - if sql.split(None, 1)[0].upper() != 'SELECT': - self.mode = 'write' source = self.repo.system_source try: return source.doexec(self, sql, args, rollback=rollback_on_failure)