--- a/server/session.py Wed Jun 11 15:58:01 2014 +0200
+++ b/server/session.py Wed Jun 11 15:32:07 2014 +0200
@@ -19,7 +19,6 @@
__docformat__ = "restructuredtext en"
import sys
-import threading
from time import time
from uuid import uuid4
from warnings import warn
@@ -171,146 +170,6 @@
class SessionClosedError(RuntimeError):
pass
-class CnxSetTracker(object):
- """Keep track of which connection use which cnxset.
-
- There should be one of these objects per session (including internal sessions).
-
- Session objects are responsible for creating their CnxSetTracker object.
-
- Connections should use the :meth:`record` and :meth:`forget` to inform the
- tracker of cnxsets they have acquired.
-
- .. automethod:: cubicweb.server.session.CnxSetTracker.record
- .. automethod:: cubicweb.server.session.CnxSetTracker.forget
-
- Sessions use the :meth:`close` and :meth:`wait` methods when closing.
-
- .. automethod:: cubicweb.server.session.CnxSetTracker.close
- .. automethod:: cubicweb.server.session.CnxSetTracker.wait
-
- This object itself is threadsafe. It also requires caller to acquired its
- lock in some situation.
- """
-
- def __init__(self):
- self._active = True
- self._condition = threading.Condition()
- self._record = {}
-
- def __enter__(self):
- return self._condition.__enter__()
-
- def __exit__(self, *args):
- return self._condition.__exit__(*args)
-
- def record(self, cnxid, cnxset):
- """Inform the tracker that a cnxid has acquired a cnxset
-
- This method is to be used by Connection objects.
-
- This method fails when:
- - The cnxid already has a recorded cnxset.
- - The tracker is not active anymore.
-
- Notes about the caller:
- (1) It is responsible for retrieving a cnxset.
- (2) It must be prepared to release the cnxset if the
- `cnxsettracker.forget` call fails.
- (3) It should acquire the tracker lock until the very end of the operation.
- (4) However it must only lock the CnxSetTracker object after having
- retrieved the cnxset to prevent deadlock.
-
- A typical usage look like::
-
- cnxset = repo._get_cnxset() # (1)
- try:
- with cnxset_tracker: # (3) and (4)
- cnxset_tracker.record(caller.id, cnxset)
- # (3') operation ends when caller is in expected state only
- caller.cnxset = cnxset
- except Exception:
- repo._free_cnxset(cnxset) # (2)
- raise
- """
- # dubious since the caller is supposed to have acquired it anyway.
- with self._condition:
- if not self._active:
- raise SessionClosedError('Closed')
- old = self._record.get(cnxid)
- if old is not None:
- raise ValueError('connection "%s" already has a cnx_set (%r)'
- % (cnxid, old))
- self._record[cnxid] = cnxset
-
- def forget(self, cnxid, cnxset):
- """Inform the tracker that a cnxid have release a cnxset
-
- This methode is to be used by Connection object.
-
- This method fails when:
- - The cnxset for the cnxid does not match the recorded one.
-
- Notes about the caller:
- (1) It is responsible for releasing the cnxset.
- (2) It should acquire the tracker lock during the operation to ensure
- the internal tracker state is always accurate regarding its own state.
-
- A typical usage look like::
-
- cnxset = caller.cnxset
- try:
- with cnxset_tracker:
- # (2) you can not have caller.cnxset out of sync with
- # cnxset_tracker state while unlocked
- caller.cnxset = None
- cnxset_tracker.forget(caller.id, cnxset)
- finally:
- cnxset = repo._free_cnxset(cnxset) # (1)
- """
- with self._condition:
- old = self._record.get(cnxid, None)
- if old is not cnxset:
- raise ValueError('recorded cnxset for "%s" mismatch: %r != %r'
- % (cnxid, old, cnxset))
- self._record.pop(cnxid)
- self._condition.notify_all()
-
- def close(self):
- """Marks the tracker as inactive.
-
- This method is to be used by Session objects.
-
- An inactive tracker does not accept new records anymore.
- """
- with self._condition:
- self._active = False
-
- def wait(self, timeout=10):
- """Wait for all recorded cnxsets to be released
-
- This method is to be used by Session objects.
-
- Returns a tuple of connection ids that remain open.
- """
- with self._condition:
- if self._active:
- raise RuntimeError('Cannot wait on active tracker.'
- ' Call tracker.close() first')
- while self._record and timeout > 0:
- start = time()
- self._condition.wait(timeout)
- timeout -= time() - start
- return tuple(self._record)
-
-
-def _with_cnx_set(func):
- """decorator for Connection method that ensure they run with a cnxset """
- @functools.wraps(func)
- def wrapper(cnx, *args, **kwargs):
- with cnx.ensure_cnx_set:
- return func(cnx, *args, **kwargs)
- return wrapper
def _open_only(func):
"""decorator for Connection method that check it is open"""
@@ -386,11 +245,10 @@
read/write security is currently activated.
"""
-
+ mode = 'write'
is_request = False
hooks_in_progress = False
is_repo_in_memory = True # bw compat
- mode = 'read'
def __init__(self, session):
# using super(Connection, self) confuse some test hack
@@ -402,12 +260,6 @@
self.sessionid = session.sessionid
#: reentrance handling
self.ctx_count = 0
- #: count the number of entry in a context needing a cnxset
- self._cnxset_count = 0
- #: Boolean for compat with the older explicite set_cnxset/free_cnx API
- #: When a call set_cnxset is done, no automatic freeing will be done
- #: until free_cnx is called.
- self._auto_free_cnx_set = True
#: server.Repository object
self.repo = session.repo
@@ -417,10 +269,6 @@
# other session utility
self._session_timestamp = session._timestamp
- #: connection set used to execute queries on sources
- self._cnxset = None
- #: CnxSetTracker used to report cnxset usage
- self._cnxset_tracker = CnxSetTracker()
# internal (root) session
self.is_internal_session = isinstance(session.user, InternalManager)
@@ -540,13 +388,16 @@
def __enter__(self):
assert self._open is None # first opening
self._open = True
+ self.cnxset = self.repo._get_cnxset()
return self
def __exit__(self, exctype=None, excvalue=None, tb=None):
assert self._open # actually already open
- assert self._cnxset_count == 0
- self.rollback()
+ self.clear()
self._open = False
+ self.cnxset.cnxset_freed()
+ self.repo._free_cnxset(self.cnxset)
+ self.cnxset = None
@contextmanager
def running_hooks_ops(self):
@@ -604,83 +455,23 @@
self.local_perm_cache.clear()
self.rewriter = RQLRewriter(self)
- # Connection Set Management ###############################################
- @property
- @_open_only
- def cnxset(self):
- return self._cnxset
-
- @cnxset.setter
- @_open_only
- def cnxset(self, new_cnxset):
- with self._cnxset_tracker:
- old_cnxset = self._cnxset
- if new_cnxset is old_cnxset:
- return #nothing to do
- if old_cnxset is not None:
- old_cnxset.rollback()
- self._cnxset = None
- self.ctx_count -= 1
- self._cnxset_tracker.forget(self.connectionid, old_cnxset)
- if new_cnxset is not None:
- self._cnxset_tracker.record(self.connectionid, new_cnxset)
- self._cnxset = new_cnxset
- self.ctx_count += 1
-
- @_open_only
- def _set_cnxset(self):
- """the connection need a connections set to execute some queries"""
- if self.cnxset is None:
- cnxset = self.repo._get_cnxset()
- try:
- self.cnxset = cnxset
- except:
- self.repo._free_cnxset(cnxset)
- raise
- return self.cnxset
-
- @_open_only
- def _free_cnxset(self, ignoremode=False):
- """the connection is no longer using its connections set, at least for some time"""
- # cnxset may be none if no operation has been done since last commit
- # or rollback
- cnxset = self.cnxset
- if cnxset is not None and (ignoremode or self.mode == 'read'):
- assert self._cnxset_count == 0
- try:
- self.cnxset = None
- finally:
- cnxset.cnxset_freed()
- self.repo._free_cnxset(cnxset)
-
@deprecated('[3.19] cnxset are automatically managed now.'
' stop using explicit set and free.')
def set_cnxset(self):
- self._auto_free_cnx_set = False
- return self._set_cnxset()
+ pass
@deprecated('[3.19] cnxset are automatically managed now.'
' stop using explicit set and free.')
def free_cnxset(self, ignoremode=False):
- self._auto_free_cnx_set = True
- return self._free_cnxset(ignoremode=ignoremode)
-
+ pass
@property
@contextmanager
@_open_only
+ @deprecated('[3.21] a cnxset is automatically set on __enter__ call now.'
+ ' stop using .ensure_cnx_set')
def ensure_cnx_set(self):
- assert self._cnxset_count >= 0
- if self._cnxset_count == 0:
- self._set_cnxset()
- try:
- self._cnxset_count += 1
- yield
- finally:
- self._cnxset_count = max(self._cnxset_count - 1, 0)
- if self._cnxset_count == 0 and self._auto_free_cnx_set:
- self._free_cnxset()
-
+ yield
# Entity cache management #################################################
#
@@ -992,7 +783,6 @@
return self.repo.source_defs()
@deprecated('[3.19] use .entity_metas(eid) instead')
- @_with_cnx_set
@_open_only
def describe(self, eid, asdict=False):
"""return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
@@ -1003,7 +793,6 @@
return metas
return etype, source, extid
- @_with_cnx_set
@_open_only
def entity_metas(self, eid):
"""return a tuple (type, sourceuri, extid) for the entity with id <eid>"""
@@ -1012,7 +801,6 @@
# core method #############################################################
- @_with_cnx_set
@_open_only
def execute(self, rql, kwargs=None, build_descr=True):
"""db-api like method directly linked to the querier execute method.
@@ -1026,21 +814,16 @@
return rset
@_open_only
- def rollback(self, free_cnxset=True, reset_pool=None):
+ def rollback(self, free_cnxset=None, reset_pool=None):
"""rollback the current transaction"""
- if reset_pool is not None:
- warn('[3.13] use free_cnxset argument instead for reset_pool',
+ if free_cnxset is not None:
+ warn('[3.21] free_cnxset is now unneeded',
DeprecationWarning, stacklevel=2)
- free_cnxset = reset_pool
- if self._cnxset_count != 0:
- # we are inside ensure_cnx_set, don't lose it
- free_cnxset = False
+ if reset_pool is not None:
+ warn('[3.13] reset_pool is now unneeded',
+ DeprecationWarning, stacklevel=2)
cnxset = self.cnxset
- if cnxset is None:
- self.clear()
- self._session_timestamp.touch()
- self.debug('rollback transaction %s done (no db activity)', self.connectionid)
- return
+ assert cnxset is not None
try:
# by default, operations are executed with security turned off
with self.security_enabled(False, False):
@@ -1055,26 +838,18 @@
self.debug('rollback for transaction %s done', self.connectionid)
finally:
self._session_timestamp.touch()
- if free_cnxset:
- self._free_cnxset(ignoremode=True)
self.clear()
@_open_only
- def commit(self, free_cnxset=True, reset_pool=None):
+ def commit(self, free_cnxset=None, reset_pool=None):
"""commit the current session's transaction"""
- if reset_pool is not None:
- warn('[3.13] use free_cnxset argument instead for reset_pool',
+ if free_cnxset is not None:
+ warn('[3.21] free_cnxset is now unneeded',
DeprecationWarning, stacklevel=2)
- free_cnxset = reset_pool
- if self.cnxset is None:
- assert not self.pending_operations
- self.clear()
- self._session_timestamp.touch()
- self.debug('commit transaction %s done (no db activity)', self.connectionid)
- return
- if self._cnxset_count != 0:
- # we are inside ensure_cnx_set, don't lose it
- free_cnxset = False
+ if reset_pool is not None:
+ warn('[3.13] reset_pool is now unneeded',
+ DeprecationWarning, stacklevel=2)
+ assert self.cnxset is not None
cstate = self.commit_state
if cstate == 'uncommitable':
raise QueryError('transaction must be rolled back')
@@ -1134,7 +909,7 @@
# XXX use slice notation since self.pending_operations is a
# read-only property.
self.pending_operations[:] = processed + self.pending_operations
- self.rollback(free_cnxset)
+ self.rollback()
raise
self.cnxset.commit()
self.commit_state = 'postcommit'
@@ -1155,25 +930,19 @@
return self.transaction_uuid(set=False)
finally:
self._session_timestamp.touch()
- if free_cnxset:
- self._free_cnxset(ignoremode=True)
self.clear()
# resource accessors ######################################################
- @_with_cnx_set
@_open_only
def call_service(self, regid, **kwargs):
self.debug('calling service %s', regid)
service = self.vreg['services'].select(regid, self, **kwargs)
return service.call(**kwargs)
- @_with_cnx_set
@_open_only
def system_sql(self, sql, args=None, rollback_on_failure=True):
"""return a sql cursor on the system database"""
- if sql.split(None, 1)[0].upper() != 'SELECT':
- self.mode = 'write'
source = self.repo.system_source
try:
return source.doexec(self, sql, args, rollback=rollback_on_failure)