# HG changeset patch # User Sylvain Thénault # Date 1277127146 -7200 # Node ID 0b250d72fcfae56f06e1b6bc8345021afddc2983 # Parent d970049d7cfddbd3f07053f4ad3e7144c051b81e [transaction w/ separated web/repo processes] the dbapi should explicitly specify a transaction id to avoid confusion when web server / repository run in separated processes diff -r d970049d7cfd -r 0b250d72fcfa dbapi.py --- a/dbapi.py Mon Jun 21 15:29:10 2010 +0200 +++ b/dbapi.py Mon Jun 21 15:32:26 2010 +0200 @@ -20,10 +20,11 @@ Take a look at http://www.python.org/peps/pep-0249.html (most parts of this document are reported here in docstrings) +""" -""" __docformat__ = "restructuredtext en" +from threading import currentThread from logging import getLogger from time import time, clock from itertools import count @@ -403,6 +404,9 @@ """no effect""" pass + def _txid(self): + return self.connection._txid(self) + def execute(self, rql, args=None, eid_key=None, build_descr=True): """execute a rql query, return resulting rows and their description in a :class:`~cubicweb.rset.ResultSet` object @@ -438,7 +442,8 @@ warn('[3.8] eid_key is deprecated, you can safely remove this argument', DeprecationWarning, stacklevel=2) # XXX use named argument for build_descr in case repo is < 3.8 - rset = self._repo.execute(self._sessid, rql, args, build_descr=build_descr) + rset = self._repo.execute(self._sessid, rql, args, + build_descr=build_descr, txid=self._txid()) rset.req = self.req return rset @@ -493,6 +498,9 @@ self.rollback() return False #propagate the exception + def _txid(self, cursor=None): # XXX could now handle various isolation level! + return currentThread().getName() + def request(self): return DBAPIRequest(self.vreg, DBAPISession(self)) @@ -628,7 +636,7 @@ def describe(self, eid): if self._closed is not None: raise ProgrammingError('Closed connection') - return self._repo.describe(self.sessionid, eid) + return self._repo.describe(self.sessionid, eid, txid=self._txid()) def close(self): """Close the connection now (rather than whenever __del__ is called). @@ -641,7 +649,7 @@ """ if self._closed: raise ProgrammingError('Connection is already closed') - self._repo.close(self.sessionid) + self._repo.close(self.sessionid, txid=self._txid()) del self._repo # necessary for proper garbage collection self._closed = 1 @@ -655,7 +663,7 @@ """ if not self._closed is None: raise ProgrammingError('Connection is already closed') - return self._repo.commit(self.sessionid) + return self._repo.commit(self.sessionid, txid=self._txid()) def rollback(self): """This method is optional since not all databases provide transaction @@ -668,7 +676,7 @@ """ if not self._closed is None: raise ProgrammingError('Connection is already closed') - self._repo.rollback(self.sessionid) + self._repo.rollback(self.sessionid, txid=self._txid()) def cursor(self, req=None): """Return a new Cursor Object using the connection. @@ -709,6 +717,7 @@ and set to false. """ txinfos = self._repo.undoable_transactions(self.sessionid, ueid, + txid=self._txid(), **actionfilters) if req is None: req = self.request() @@ -723,7 +732,8 @@ allowed (eg not in managers group and the transaction doesn't belong to him). """ - txinfo = self._repo.transaction_info(self.sessionid, txuuid) + txinfo = self._repo.transaction_info(self.sessionid, txuuid, + txid=self._txid()) if req is None: req = self.request() txinfo.req = req @@ -739,7 +749,8 @@ session's user is not allowed (eg not in managers group and the transaction doesn't belong to him). """ - return self._repo.transaction_actions(self.sessionid, txuuid, public) + return self._repo.transaction_actions(self.sessionid, txuuid, public, + txid=self._txid()) def undo_transaction(self, txuuid): """Undo the given transaction. Return potential restoration errors. @@ -748,4 +759,5 @@ allowed (eg not in managers group and the transaction doesn't belong to him). """ - return self._repo.undo_transaction(self.sessionid, txuuid) + return self._repo.undo_transaction(self.sessionid, txuuid, + txid=self._txid()) diff -r d970049d7cfd -r 0b250d72fcfa devtools/testlib.py --- a/devtools/testlib.py Mon Jun 21 15:29:10 2010 +0200 +++ b/devtools/testlib.py Mon Jun 21 15:32:26 2010 +0200 @@ -15,9 +15,8 @@ # # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . -"""this module contains base classes and utilities for cubicweb tests +"""this module contains base classes and utilities for cubicweb tests""" -""" from __future__ import with_statement __docformat__ = "restructuredtext en" diff -r d970049d7cfd -r 0b250d72fcfa hooks/security.py --- a/hooks/security.py Mon Jun 21 15:29:10 2010 +0200 +++ b/hooks/security.py Mon Jun 21 15:32:26 2010 +0200 @@ -17,8 +17,8 @@ # with CubicWeb. If not, see . """Security hooks: check permissions to add/delete/update entities according to the user connected to a session +""" -""" __docformat__ = "restructuredtext en" from cubicweb import Unauthorized diff -r d970049d7cfd -r 0b250d72fcfa server/repository.py --- a/server/repository.py Mon Jun 21 15:29:10 2010 +0200 +++ b/server/repository.py Mon Jun 21 15:32:26 2010 +0200 @@ -581,7 +581,8 @@ session.commit() return session.id - def execute(self, sessionid, rqlstring, args=None, build_descr=True): + def execute(self, sessionid, rqlstring, args=None, build_descr=True, + txid=None): """execute a RQL query * rqlstring should be an unicode string or a plain ascii string @@ -589,7 +590,7 @@ * build_descr is a flag indicating if the description should be built on select queries """ - session = self._get_session(sessionid, setpool=True) + session = self._get_session(sessionid, setpool=True, txid=txid) try: try: rset = self.querier.execute(session, rqlstring, args, @@ -617,9 +618,9 @@ finally: session.reset_pool() - def describe(self, sessionid, eid): + def describe(self, sessionid, eid, txid=None): """return a tuple (type, source, extid) for the entity with id """ - session = self._get_session(sessionid, setpool=True) + session = self._get_session(sessionid, setpool=True, txid=txid) try: return self.type_and_source_from_eid(eid, session) finally: @@ -645,32 +646,36 @@ session = self._get_session(sessionid, setpool=False) session.set_shared_data(key, value, querydata) - def commit(self, sessionid): + def commit(self, sessionid, txid=None): """commit transaction for the session with the given id""" self.debug('begin commit for session %s', sessionid) try: - return self._get_session(sessionid).commit() + session = self._get_session(sessionid) + session.set_tx_data(txid) + return session.commit() except (ValidationError, Unauthorized): raise except: self.exception('unexpected error') raise - def rollback(self, sessionid): + def rollback(self, sessionid, txid=None): """commit transaction for the session with the given id""" self.debug('begin rollback for session %s', sessionid) try: - self._get_session(sessionid).rollback() + session = self._get_session(sessionid) + session.set_tx_data(txid) + session.rollback() except: self.exception('unexpected error') raise - def close(self, sessionid, checkshuttingdown=True): + def close(self, sessionid, txid=None, checkshuttingdown=True): """close the session with the given id""" - session = self._get_session(sessionid, setpool=True, + session = self._get_session(sessionid, setpool=True, txid=txid, checkshuttingdown=checkshuttingdown) # operation uncommited before close are rollbacked before hook is called - session.rollback() + session.rollback(reset_pool=False) self.hm.call_hooks('session_close', session) # commit session at this point in case write operation has been done # during `session_close` hooks @@ -701,34 +706,35 @@ for prop, value in props.items(): session.change_property(prop, value) - def undoable_transactions(self, sessionid, ueid=None, **actionfilters): + def undoable_transactions(self, sessionid, ueid=None, txid=None, + **actionfilters): """See :class:`cubicweb.dbapi.Connection.undoable_transactions`""" - session = self._get_session(sessionid, setpool=True) + session = self._get_session(sessionid, setpool=True, txid=txid) try: return self.system_source.undoable_transactions(session, ueid, **actionfilters) finally: session.reset_pool() - def transaction_info(self, sessionid, txuuid): + def transaction_info(self, sessionid, txuuid, txid=None): """See :class:`cubicweb.dbapi.Connection.transaction_info`""" - session = self._get_session(sessionid, setpool=True) + session = self._get_session(sessionid, setpool=True, txid=txid) try: return self.system_source.tx_info(session, txuuid) finally: session.reset_pool() - def transaction_actions(self, sessionid, txuuid, public=True): + def transaction_actions(self, sessionid, txuuid, public=True, txid=None): """See :class:`cubicweb.dbapi.Connection.transaction_actions`""" - session = self._get_session(sessionid, setpool=True) + session = self._get_session(sessionid, setpool=True, txid=txid) try: return self.system_source.tx_actions(session, txuuid, public) finally: session.reset_pool() - def undo_transaction(self, sessionid, txuuid): + def undo_transaction(self, sessionid, txuuid, txid=None): """See :class:`cubicweb.dbapi.Connection.undo_transaction`""" - session = self._get_session(sessionid, setpool=True) + session = self._get_session(sessionid, setpool=True, txid=txid) try: return self.system_source.undo_transaction(session, txuuid) finally: @@ -791,7 +797,8 @@ session.set_pool() return session - def _get_session(self, sessionid, setpool=False, checkshuttingdown=True): + def _get_session(self, sessionid, setpool=False, txid=None, + checkshuttingdown=True): """return the user associated to the given session identifier""" if checkshuttingdown and self._shutting_down: raise Exception('Repository is shutting down') @@ -800,6 +807,7 @@ except KeyError: raise BadConnectionId('No such session %s' % sessionid) if setpool: + session.set_tx_data(txid) # must be done before set_pool session.set_pool() return session diff -r d970049d7cfd -r 0b250d72fcfa server/session.py --- a/server/session.py Mon Jun 21 15:29:10 2010 +0200 +++ b/server/session.py Mon Jun 21 15:32:26 2010 +0200 @@ -117,6 +117,9 @@ # print INDENT + 'reset write to', self.oldwrite +class TransactionData(object): + def __init__(self, txid): + self.transactionid = txid class Session(RequestSessionBase): """tie session id, user, connections pool and other session data all @@ -148,7 +151,8 @@ # i18n initialization self.set_language(cnxprops.lang) # internals - self._threaddata = threading.local() + self._tx_data = {} + self.__threaddata = threading.local() self._threads_in_transaction = set() self._closed = False @@ -156,6 +160,23 @@ return '<%ssession %s (%s 0x%x)>' % ( self.cnxtype, unicode(self.user.login), self.id, id(self)) + def set_tx_data(self, txid=None): + if txid is None: + txid = threading.currentThread().getName() + try: + self.__threaddata.txdata = self._tx_data[txid] + except KeyError: + self.__threaddata.txdata = self._tx_data[txid] = TransactionData(txid) + + @property + def _threaddata(self): + try: + return self.__threaddata.txdata + except AttributeError: + self.set_tx_data() + return self.__threaddata.txdata + + def hijack_user(self, user): """return a fake request/session using specified user""" session = Session(user, self.repo) @@ -338,11 +359,14 @@ @property def read_security(self): """return a boolean telling if read security is activated or not""" + txstore = self._threaddata + if txstore is None: + return self.DEFAULT_SECURITY try: - return self._threaddata.read_security + return txstore.read_security except AttributeError: - self._threaddata.read_security = self.DEFAULT_SECURITY - return self._threaddata.read_security + txstore.read_security = self.DEFAULT_SECURITY + return txstore.read_security def set_read_security(self, activated): """[de]activate read security, returning the previous value set for @@ -351,8 +375,11 @@ you should usually use the `security_enabled` context manager instead of this to change security settings. """ - oldmode = self.read_security - self._threaddata.read_security = activated + txstore = self._threaddata + if txstore is None: + return self.DEFAULT_SECURITY + oldmode = getattr(txstore, 'read_security', self.DEFAULT_SECURITY) + txstore.read_security = activated # dbapi_query used to detect hooks triggered by a 'dbapi' query (eg not # issued on the session). This is tricky since we the execution model of # a (write) user query is: @@ -369,18 +396,21 @@ # else (False actually) is not perfect but should be enough # # also reset dbapi_query to true when we go back to DEFAULT_SECURITY - self._threaddata.dbapi_query = (oldmode is self.DEFAULT_SECURITY - or activated is self.DEFAULT_SECURITY) + txstore.dbapi_query = (oldmode is self.DEFAULT_SECURITY + or activated is self.DEFAULT_SECURITY) return oldmode @property def write_security(self): """return a boolean telling if write security is activated or not""" + txstore = self._threaddata + if txstore is None: + return self.DEFAULT_SECURITY try: - return self._threaddata.write_security + return txstore.write_security except: - self._threaddata.write_security = self.DEFAULT_SECURITY - return self._threaddata.write_security + txstore.write_security = self.DEFAULT_SECURITY + return txstore.write_security def set_write_security(self, activated): """[de]activate write security, returning the previous value set for @@ -389,8 +419,11 @@ you should usually use the `security_enabled` context manager instead of this to change security settings. """ - oldmode = self.write_security - self._threaddata.write_security = activated + txstore = self._threaddata + if txstore is None: + return self.DEFAULT_SECURITY + oldmode = getattr(txstore, 'write_security', self.DEFAULT_SECURITY) + txstore.write_security = activated return oldmode @property @@ -567,7 +600,6 @@ """update latest session usage timestamp and reset mode to read""" self.timestamp = time() self.local_perm_cache.clear() # XXX simply move in transaction_data, no? - self._threaddata.mode = self.default_mode # shared data handling ################################################### @@ -657,18 +689,29 @@ rset.req = self return rset - def _clear_thread_data(self): + def _clear_thread_data(self, reset_pool=True): """remove everything from the thread local storage, except pool which is explicitly removed by reset_pool, and mode which is set anyway by _touch """ - store = self._threaddata - for name in ('commit_state', 'transaction_data', 'pending_operations', - '_rewriter'): - try: - delattr(store, name) - except AttributeError: - pass + try: + txstore = self.__threaddata.txdata + except AttributeError: + pass + else: + if reset_pool: + self._tx_data.pop(txstore.transactionid, None) + try: + del self.__threaddata.txdata + except AttributeError: + pass + else: + for name in ('commit_state', 'transaction_data', + 'pending_operations', '_rewriter'): + try: + delattr(txstore, name) + except AttributeError: + continue def commit(self, reset_pool=True): """commit the current session's transaction""" @@ -680,13 +723,13 @@ return if self.commit_state: return - # by default, operations are executed with security turned off - with security_enabled(self, False, False): - # on rollback, an operation should have the following state - # information: - # - processed by the precommit/commit event or not - # - if processed, is it the failed operation - try: + # on rollback, an operation should have the following state + # information: + # - processed by the precommit/commit event or not + # - if processed, is it the failed operation + try: + # by default, operations are executed with security turned off + with security_enabled(self, False, False): for trstate in ('precommit', 'commit'): processed = [] self.commit_state = trstate @@ -730,23 +773,22 @@ exc_info=sys.exc_info()) self.info('%s session %s done', trstate, self.id) return self.transaction_uuid(set=False) - finally: - self._clear_thread_data() - self._touch() - if reset_pool: - self.reset_pool(ignoremode=True) + finally: + self._touch() + if reset_pool: + self.reset_pool(ignoremode=True) + self._clear_thread_data(reset_pool) def rollback(self, reset_pool=True): """rollback the current session's transaction""" if self.pool is None: - assert not self.pending_operations self._clear_thread_data() self._touch() self.debug('rollback session %s done (no db activity)', self.id) return - # by default, operations are executed with security turned off - with security_enabled(self, False, False): - try: + try: + # by default, operations are executed with security turned off + with security_enabled(self, False, False): while self.pending_operations: try: operation = self.pending_operations.pop(0) @@ -756,11 +798,11 @@ continue self.pool.rollback() self.debug('rollback for session %s done', self.id) - finally: - self._clear_thread_data() - self._touch() - if reset_pool: - self.reset_pool(ignoremode=True) + finally: + self._touch() + if reset_pool: + self.reset_pool(ignoremode=True) + self._clear_thread_data(reset_pool) def close(self): """do not close pool on session close, since they are shared now""" @@ -782,7 +824,8 @@ self.error('thread %s still alive after 10 seconds, will close ' 'session anyway', thread) self.rollback() - del self._threaddata + del self.__threaddata + del self._tx_data # transaction data/operations management ##################################