# HG changeset patch # User Sylvain Thénault # Date 1242821942 -7200 # Node ID 293fe4b49e288a94f850aa372a70d10c3c2b6bae # Parent cb3466e08d819b15d24944210173a72d4b229bf1 two in one: #343320: Logging out while deleting a CWUser blocks the cw server / #342692: ensure transaction state when Ctrl-C or other stop signal is received diff -r cb3466e08d81 -r 293fe4b49e28 server/repository.py --- a/server/repository.py Wed May 20 11:55:33 2009 +0200 +++ b/server/repository.py Wed May 20 14:19:02 2009 +0200 @@ -217,6 +217,7 @@ self._get_pool().close(True) for i in xrange(config['connections-pool-size']): self._available_pools.put_nowait(ConnectionsPool(self.sources)) + self._shuting_down = False # internals ############################################################### @@ -349,6 +350,7 @@ """called on server stop event to properly close opened sessions and connections """ + self._shuting_down = True if isinstance(self._looping_tasks, tuple): # if tasks have been started for looptask in self._looping_tasks: self.info('canceling task %s...', looptask.name) @@ -618,7 +620,7 @@ """commit transaction for the session with the given id""" self.debug('begin commit for session %s', sessionid) try: - self._get_session(sessionid, setpool=True).commit() + self._get_session(sessionid).commit() except (ValidationError, Unauthorized): raise except: @@ -629,7 +631,7 @@ """commit transaction for the session with the given id""" self.debug('begin rollback for session %s', sessionid) try: - self._get_session(sessionid, setpool=True).rollback() + self._get_session(sessionid).rollback() except: self.exception('unexpected error') raise @@ -720,6 +722,8 @@ def _get_session(self, sessionid, setpool=False): """return the user associated to the given session identifier""" + if self._shuting_down: + raise Exception('Repository is shuting down') try: session = self._sessions[sessionid] except KeyError: diff -r cb3466e08d81 -r 293fe4b49e28 server/session.py --- a/server/session.py Wed May 20 11:55:33 2009 +0200 +++ b/server/session.py Wed May 20 14:19:02 2009 +0200 @@ -65,6 +65,8 @@ # i18n initialization self.set_language(cnxprops.lang) self._threaddata = threading.local() + self._threads_in_transaction = set() + self._closed = False def get_mode(self): return getattr(self._threaddata, 'mode', 'read') @@ -150,6 +152,8 @@ def set_pool(self): """the session need a pool to execute some queries""" + if self._closed: + raise Exception('try to set pool on a closed session') if self.pool is None: self._threaddata.pool = self.repo._get_pool() try: @@ -158,6 +162,7 @@ self.repo._free_pool(self.pool) self._threaddata.pool = None raise + self._threads_in_transaction.add(threading.currentThread()) return self._threaddata.pool def reset_pool(self): @@ -167,6 +172,7 @@ # or rollback if self.pool is not None and self.mode == 'read': # even in read mode, we must release the current transaction + self._threads_in_transaction.remove(threading.currentThread()) self.repo._free_pool(self.pool) self.pool.pool_reset(self) self._threaddata.pool = None @@ -343,6 +349,23 @@ def close(self): """do not close pool on session close, since they are shared now""" + self._closed = True + # copy since _threads_in_transaction maybe modified while waiting + for thread in self._threads_in_transaction.copy(): + if thread is threading.currentThread(): + continue + self.info('waiting for thread %s', thread) + # do this loop/break instead of a simple join(10) in case thread is + # the main thread (in which case it will be removed from + # self._threads_in_transaction but still be alive...) + for i in xrange(10): + thread.join(1) + if not (thread.isAlive() and + thread in self._threads_in_transaction): + break + else: + self.error('thread %s still alive after 10 seconds, will close ' + 'session anyway', thread) self.rollback() # transaction data/operations management ################################## diff -r cb3466e08d81 -r 293fe4b49e28 server/test/unittest_repository.py --- a/server/test/unittest_repository.py Wed May 20 11:55:33 2009 +0200 +++ b/server/test/unittest_repository.py Wed May 20 14:19:02 2009 +0200 @@ -199,6 +199,26 @@ def test_transaction_interleaved(self): self.skip('implement me') + def test_close_wait_processing_request(self): + repo = self.repo + cnxid = repo.connect(*self.default_user_password()) + repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"') + repo.commit(cnxid) + # close has to be in the thread due to sqlite limitations + def close_in_a_few_moment(): + time.sleep(0.1) + repo.close(cnxid) + t = threading.Thread(target=close_in_a_few_moment) + t.start() + try: + print 'execute' + repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"') + print 'commit' + repo.commit(cnxid) + print 'commited' + finally: + t.join() + def test_initial_schema(self): schema = self.repo.schema # check order of attributes is respected @@ -238,31 +258,29 @@ def test_pyro(self): import Pyro Pyro.config.PYRO_MULTITHREADED = 0 - lock = threading.Lock() + done = [] # the client part has to be in the thread due to sqlite limitations - t = threading.Thread(target=self._pyro_client, args=(lock,)) + t = threading.Thread(target=self._pyro_client, args=(done,)) try: daemon = self.repo.pyro_register() t.start() - # connection - daemon.handleRequests(1.0) - daemon.handleRequests(1.0) - daemon.handleRequests(1.0) - # get schema - daemon.handleRequests(1.0) - # execute - daemon.handleRequests(1.0) - t.join() + while not done: + daemon.handleRequests(1.0) + t.join(1) + if t.isAlive(): + self.fail('something went wrong, thread still alive') finally: repository.pyro_unregister(self.repo.config) - def _pyro_client(self, lock): + def _pyro_client(self, done): cnx = connect(self.repo.config.appid, u'admin', 'gingkow') # check we can get the schema schema = cnx.get_schema() self.assertEquals(schema.__hashmode__, None) - rset = cnx.cursor().execute('Any U,G WHERE U in_group G') - + cu = cnx.cursor() + rset = cu.execute('Any U,G WHERE U in_group G') + cnx.close() + done.append(True) def test_internal_api(self): repo = self.repo