two in one: #343320: Logging out while deleting a CWUser blocks the cw server / #342692: ensure transaction state when Ctrl-C or other stop signal is received
--- a/server/repository.py Wed May 20 11:55:33 2009 +0200
+++ b/server/repository.py Wed May 20 14:19:02 2009 +0200
@@ -217,6 +217,7 @@
self._get_pool().close(True)
for i in xrange(config['connections-pool-size']):
self._available_pools.put_nowait(ConnectionsPool(self.sources))
+ self._shuting_down = False
# internals ###############################################################
@@ -349,6 +350,7 @@
"""called on server stop event to properly close opened sessions and
connections
"""
+ self._shuting_down = True
if isinstance(self._looping_tasks, tuple): # if tasks have been started
for looptask in self._looping_tasks:
self.info('canceling task %s...', looptask.name)
@@ -618,7 +620,7 @@
"""commit transaction for the session with the given id"""
self.debug('begin commit for session %s', sessionid)
try:
- self._get_session(sessionid, setpool=True).commit()
+ self._get_session(sessionid).commit()
except (ValidationError, Unauthorized):
raise
except:
@@ -629,7 +631,7 @@
"""commit transaction for the session with the given id"""
self.debug('begin rollback for session %s', sessionid)
try:
- self._get_session(sessionid, setpool=True).rollback()
+ self._get_session(sessionid).rollback()
except:
self.exception('unexpected error')
raise
@@ -720,6 +722,8 @@
def _get_session(self, sessionid, setpool=False):
"""return the user associated to the given session identifier"""
+ if self._shuting_down:
+ raise Exception('Repository is shuting down')
try:
session = self._sessions[sessionid]
except KeyError:
--- a/server/session.py Wed May 20 11:55:33 2009 +0200
+++ b/server/session.py Wed May 20 14:19:02 2009 +0200
@@ -65,6 +65,8 @@
# i18n initialization
self.set_language(cnxprops.lang)
self._threaddata = threading.local()
+ self._threads_in_transaction = set()
+ self._closed = False
def get_mode(self):
return getattr(self._threaddata, 'mode', 'read')
@@ -150,6 +152,8 @@
def set_pool(self):
"""the session need a pool to execute some queries"""
+ if self._closed:
+ raise Exception('try to set pool on a closed session')
if self.pool is None:
self._threaddata.pool = self.repo._get_pool()
try:
@@ -158,6 +162,7 @@
self.repo._free_pool(self.pool)
self._threaddata.pool = None
raise
+ self._threads_in_transaction.add(threading.currentThread())
return self._threaddata.pool
def reset_pool(self):
@@ -167,6 +172,7 @@
# or rollback
if self.pool is not None and self.mode == 'read':
# even in read mode, we must release the current transaction
+ self._threads_in_transaction.remove(threading.currentThread())
self.repo._free_pool(self.pool)
self.pool.pool_reset(self)
self._threaddata.pool = None
@@ -343,6 +349,23 @@
def close(self):
"""do not close pool on session close, since they are shared now"""
+ self._closed = True
+ # copy since _threads_in_transaction maybe modified while waiting
+ for thread in self._threads_in_transaction.copy():
+ if thread is threading.currentThread():
+ continue
+ self.info('waiting for thread %s', thread)
+ # do this loop/break instead of a simple join(10) in case thread is
+ # the main thread (in which case it will be removed from
+ # self._threads_in_transaction but still be alive...)
+ for i in xrange(10):
+ thread.join(1)
+ if not (thread.isAlive() and
+ thread in self._threads_in_transaction):
+ break
+ else:
+ self.error('thread %s still alive after 10 seconds, will close '
+ 'session anyway', thread)
self.rollback()
# transaction data/operations management ##################################
--- a/server/test/unittest_repository.py Wed May 20 11:55:33 2009 +0200
+++ b/server/test/unittest_repository.py Wed May 20 14:19:02 2009 +0200
@@ -199,6 +199,26 @@
def test_transaction_interleaved(self):
self.skip('implement me')
+ def test_close_wait_processing_request(self):
+ repo = self.repo
+ cnxid = repo.connect(*self.default_user_password())
+ repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
+ repo.commit(cnxid)
+ # close has to be in the thread due to sqlite limitations
+ def close_in_a_few_moment():
+ time.sleep(0.1)
+ repo.close(cnxid)
+ t = threading.Thread(target=close_in_a_few_moment)
+ t.start()
+ try:
+ print 'execute'
+ repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"')
+ print 'commit'
+ repo.commit(cnxid)
+ print 'commited'
+ finally:
+ t.join()
+
def test_initial_schema(self):
schema = self.repo.schema
# check order of attributes is respected
@@ -238,31 +258,29 @@
def test_pyro(self):
import Pyro
Pyro.config.PYRO_MULTITHREADED = 0
- lock = threading.Lock()
+ done = []
# the client part has to be in the thread due to sqlite limitations
- t = threading.Thread(target=self._pyro_client, args=(lock,))
+ t = threading.Thread(target=self._pyro_client, args=(done,))
try:
daemon = self.repo.pyro_register()
t.start()
- # connection
- daemon.handleRequests(1.0)
- daemon.handleRequests(1.0)
- daemon.handleRequests(1.0)
- # get schema
- daemon.handleRequests(1.0)
- # execute
- daemon.handleRequests(1.0)
- t.join()
+ while not done:
+ daemon.handleRequests(1.0)
+ t.join(1)
+ if t.isAlive():
+ self.fail('something went wrong, thread still alive')
finally:
repository.pyro_unregister(self.repo.config)
- def _pyro_client(self, lock):
+ def _pyro_client(self, done):
cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
# check we can get the schema
schema = cnx.get_schema()
self.assertEquals(schema.__hashmode__, None)
- rset = cnx.cursor().execute('Any U,G WHERE U in_group G')
-
+ cu = cnx.cursor()
+ rset = cu.execute('Any U,G WHERE U in_group G')
+ cnx.close()
+ done.append(True)
def test_internal_api(self):
repo = self.repo