two in one: #343320: Logging out while deleting a CWUser blocks the cw server / #342692: ensure transaction state when Ctrl-C or other stop signal is received stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 20 May 2009 14:19:02 +0200
branchstable
changeset 1880 293fe4b49e28
parent 1879 cb3466e08d81
child 1881 75540944ae18
two in one: #343320: Logging out while deleting a CWUser blocks the cw server / #342692: ensure transaction state when Ctrl-C or other stop signal is received
server/repository.py
server/session.py
server/test/unittest_repository.py
--- a/server/repository.py	Wed May 20 11:55:33 2009 +0200
+++ b/server/repository.py	Wed May 20 14:19:02 2009 +0200
@@ -217,6 +217,7 @@
         self._get_pool().close(True)
         for i in xrange(config['connections-pool-size']):
             self._available_pools.put_nowait(ConnectionsPool(self.sources))
+        self._shuting_down = False
 
     # internals ###############################################################
 
@@ -349,6 +350,7 @@
         """called on server stop event to properly close opened sessions and
         connections
         """
+        self._shuting_down = True
         if isinstance(self._looping_tasks, tuple): # if tasks have been started
             for looptask in self._looping_tasks:
                 self.info('canceling task %s...', looptask.name)
@@ -618,7 +620,7 @@
         """commit transaction for the session with the given id"""
         self.debug('begin commit for session %s', sessionid)
         try:
-            self._get_session(sessionid, setpool=True).commit()
+            self._get_session(sessionid).commit()
         except (ValidationError, Unauthorized):
             raise
         except:
@@ -629,7 +631,7 @@
         """commit transaction for the session with the given id"""
         self.debug('begin rollback for session %s', sessionid)
         try:
-            self._get_session(sessionid, setpool=True).rollback()
+            self._get_session(sessionid).rollback()
         except:
             self.exception('unexpected error')
             raise
@@ -720,6 +722,8 @@
 
     def _get_session(self, sessionid, setpool=False):
         """return the user associated to the given session identifier"""
+        if self._shuting_down:
+            raise Exception('Repository is shuting down')
         try:
             session = self._sessions[sessionid]
         except KeyError:
--- a/server/session.py	Wed May 20 11:55:33 2009 +0200
+++ b/server/session.py	Wed May 20 14:19:02 2009 +0200
@@ -65,6 +65,8 @@
         # i18n initialization
         self.set_language(cnxprops.lang)
         self._threaddata = threading.local()
+        self._threads_in_transaction = set()
+        self._closed = False
 
     def get_mode(self):
         return getattr(self._threaddata, 'mode', 'read')
@@ -150,6 +152,8 @@
 
     def set_pool(self):
         """the session need a pool to execute some queries"""
+        if self._closed:
+            raise Exception('try to set pool on a closed session')
         if self.pool is None:
             self._threaddata.pool = self.repo._get_pool()
             try:
@@ -158,6 +162,7 @@
                 self.repo._free_pool(self.pool)
                 self._threaddata.pool = None
                 raise
+            self._threads_in_transaction.add(threading.currentThread())
         return self._threaddata.pool
 
     def reset_pool(self):
@@ -167,6 +172,7 @@
         # or rollback
         if self.pool is not None and self.mode == 'read':
             # even in read mode, we must release the current transaction
+            self._threads_in_transaction.remove(threading.currentThread())
             self.repo._free_pool(self.pool)
             self.pool.pool_reset(self)
             self._threaddata.pool = None
@@ -343,6 +349,23 @@
 
     def close(self):
         """do not close pool on session close, since they are shared now"""
+        self._closed = True
+        # copy since _threads_in_transaction maybe modified while waiting
+        for thread in self._threads_in_transaction.copy():
+            if thread is threading.currentThread():
+                continue
+            self.info('waiting for thread %s', thread)
+            # do this loop/break instead of a simple join(10) in case thread is
+            # the main thread (in which case it will be removed from
+            # self._threads_in_transaction but still be alive...)
+            for i in xrange(10):
+                thread.join(1)
+                if not (thread.isAlive() and
+                        thread in self._threads_in_transaction):
+                    break
+            else:
+                self.error('thread %s still alive after 10 seconds, will close '
+                           'session anyway', thread)
         self.rollback()
 
     # transaction data/operations management ##################################
--- a/server/test/unittest_repository.py	Wed May 20 11:55:33 2009 +0200
+++ b/server/test/unittest_repository.py	Wed May 20 14:19:02 2009 +0200
@@ -199,6 +199,26 @@
     def test_transaction_interleaved(self):
         self.skip('implement me')
 
+    def test_close_wait_processing_request(self):
+        repo = self.repo
+        cnxid = repo.connect(*self.default_user_password())
+        repo.execute(cnxid, 'INSERT CWUser X: X login "toto", X upassword "tutu", X in_group G WHERE G name "users"')
+        repo.commit(cnxid)
+        # close has to be in the thread due to sqlite limitations
+        def close_in_a_few_moment():
+            time.sleep(0.1)
+            repo.close(cnxid)
+        t = threading.Thread(target=close_in_a_few_moment)
+        t.start()
+        try:
+            print 'execute'
+            repo.execute(cnxid, 'DELETE CWUser X WHERE X login "toto"')
+            print 'commit'
+            repo.commit(cnxid)
+            print 'commited'
+        finally:
+            t.join()
+
     def test_initial_schema(self):
         schema = self.repo.schema
         # check order of attributes is respected
@@ -238,31 +258,29 @@
     def test_pyro(self):
         import Pyro
         Pyro.config.PYRO_MULTITHREADED = 0
-        lock = threading.Lock()
+        done = []
         # the client part has to be in the thread due to sqlite limitations
-        t = threading.Thread(target=self._pyro_client, args=(lock,))
+        t = threading.Thread(target=self._pyro_client, args=(done,))
         try:
             daemon = self.repo.pyro_register()
             t.start()
-            # connection
-            daemon.handleRequests(1.0)
-            daemon.handleRequests(1.0)
-            daemon.handleRequests(1.0)
-            # get schema
-            daemon.handleRequests(1.0)
-            # execute
-            daemon.handleRequests(1.0)
-            t.join()
+            while not done:
+                daemon.handleRequests(1.0)
+            t.join(1)
+            if t.isAlive():
+                self.fail('something went wrong, thread still alive')
         finally:
             repository.pyro_unregister(self.repo.config)
 
-    def _pyro_client(self, lock):
+    def _pyro_client(self, done):
         cnx = connect(self.repo.config.appid, u'admin', 'gingkow')
         # check we can get the schema
         schema = cnx.get_schema()
         self.assertEquals(schema.__hashmode__, None)
-        rset = cnx.cursor().execute('Any U,G WHERE U in_group G')
-
+        cu = cnx.cursor()
+        rset = cu.execute('Any U,G WHERE U in_group G')
+        cnx.close()
+        done.append(True)
 
     def test_internal_api(self):
         repo = self.repo