server/session.py
branchstable
changeset 5826 462435bf5457
parent 5813 0b250d72fcfa
child 5849 9db65b381028
child 5906 d40ced753291
equal deleted inserted replaced
5825:2daf1ac79d5c 5826:462435bf5457
   560     commit_state = property(get_commit_state, set_commit_state)
   560     commit_state = property(get_commit_state, set_commit_state)
   561 
   561 
   562     @property
   562     @property
   563     def pool(self):
   563     def pool(self):
   564         """connections pool, set according to transaction mode for each query"""
   564         """connections pool, set according to transaction mode for each query"""
       
   565         if self._closed:
       
   566             self.reset_pool(True)
       
   567             raise Exception('try to access pool on a closed session')
   565         return getattr(self._threaddata, 'pool', None)
   568         return getattr(self._threaddata, 'pool', None)
   566 
   569 
   567     def set_pool(self, checkclosed=True):
   570     def set_pool(self):
   568         """the session need a pool to execute some queries"""
   571         """the session need a pool to execute some queries"""
   569         if checkclosed and self._closed:
   572         if self._closed:
       
   573             self.reset_pool(True)
   570             raise Exception('try to set pool on a closed session')
   574             raise Exception('try to set pool on a closed session')
   571         if self.pool is None:
   575         if self.pool is None:
   572             # get pool first to avoid race-condition
   576             # get pool first to avoid race-condition
   573             self._threaddata.pool = pool = self.repo._get_pool()
   577             self._threaddata.pool = pool = self.repo._get_pool()
   574             try:
   578             try:
   575                 pool.pool_set()
   579                 pool.pool_set()
   576             except:
   580             except:
   577                 self._threaddata.pool = None
   581                 self._threaddata.pool = None
   578                 self.repo._free_pool(pool)
   582                 self.repo._free_pool(pool)
   579                 raise
   583                 raise
   580             self._threads_in_transaction.add(threading.currentThread())
   584             self._threads_in_transaction.add(
       
   585                 (threading.currentThread(), pool) )
   581         return self._threaddata.pool
   586         return self._threaddata.pool
       
   587 
       
   588     def _free_thread_pool(self, thread, pool, force_close=False):
       
   589         try:
       
   590             self._threads_in_transaction.remove( (thread, pool) )
       
   591         except KeyError:
       
   592             # race condition on pool freeing (freed by commit or rollback vs
       
   593             # close)
       
   594             pass
       
   595         else:
       
   596             if force_close:
       
   597                 pool.reconnect()
       
   598             else:
       
   599                 pool.pool_reset()
       
   600             # free pool once everything is done to avoid race-condition
       
   601             self.repo._free_pool(pool)
   582 
   602 
   583     def reset_pool(self, ignoremode=False):
   603     def reset_pool(self, ignoremode=False):
   584         """the session is no longer using its pool, at least for some time"""
   604         """the session is no longer using its pool, at least for some time"""
   585         # pool may be none if no operation has been done since last commit
   605         # pool may be none if no operation has been done since last commit
   586         # or rollback
   606         # or rollback
   587         if self.pool is not None and (ignoremode or self.mode == 'read'):
   607         pool = getattr(self._threaddata, 'pool', None)
       
   608         if pool is not None and (ignoremode or self.mode == 'read'):
   588             # even in read mode, we must release the current transaction
   609             # even in read mode, we must release the current transaction
   589             pool = self.pool
   610             self._free_thread_pool(threading.currentThread(), pool)
   590             try:
       
   591                 self._threads_in_transaction.remove(threading.currentThread())
       
   592             except KeyError:
       
   593                 pass
       
   594             pool.pool_reset()
       
   595             del self._threaddata.pool
   611             del self._threaddata.pool
   596             # free pool once everything is done to avoid race-condition
       
   597             self.repo._free_pool(pool)
       
   598 
   612 
   599     def _touch(self):
   613     def _touch(self):
   600         """update latest session usage timestamp and reset mode to read"""
   614         """update latest session usage timestamp and reset mode to read"""
   601         self.timestamp = time()
   615         self.timestamp = time()
   602         self.local_perm_cache.clear() # XXX simply move in transaction_data, no?
   616         self.local_perm_cache.clear() # XXX simply move in transaction_data, no?
   779                 self.reset_pool(ignoremode=True)
   793                 self.reset_pool(ignoremode=True)
   780             self._clear_thread_data(reset_pool)
   794             self._clear_thread_data(reset_pool)
   781 
   795 
   782     def rollback(self, reset_pool=True):
   796     def rollback(self, reset_pool=True):
   783         """rollback the current session's transaction"""
   797         """rollback the current session's transaction"""
   784         if self.pool is None:
   798         # don't use self.pool, rollback may be called with _closed == True
       
   799         pool = getattr(self._threaddata, 'pool', None)
       
   800         if pool is None:
   785             self._clear_thread_data()
   801             self._clear_thread_data()
   786             self._touch()
   802             self._touch()
   787             self.debug('rollback session %s done (no db activity)', self.id)
   803             self.debug('rollback session %s done (no db activity)', self.id)
   788             return
   804             return
   789         try:
   805         try:
   794                         operation = self.pending_operations.pop(0)
   810                         operation = self.pending_operations.pop(0)
   795                         operation.handle_event('rollback_event')
   811                         operation.handle_event('rollback_event')
   796                     except:
   812                     except:
   797                         self.critical('rollback error', exc_info=sys.exc_info())
   813                         self.critical('rollback error', exc_info=sys.exc_info())
   798                         continue
   814                         continue
   799                 self.pool.rollback()
   815                 pool.rollback()
   800                 self.debug('rollback for session %s done', self.id)
   816                 self.debug('rollback for session %s done', self.id)
   801         finally:
   817         finally:
   802             self._touch()
   818             self._touch()
   803             if reset_pool:
   819             if reset_pool:
   804                 self.reset_pool(ignoremode=True)
   820                 self.reset_pool(ignoremode=True)
   806 
   822 
   807     def close(self):
   823     def close(self):
   808         """do not close pool on session close, since they are shared now"""
   824         """do not close pool on session close, since they are shared now"""
   809         self._closed = True
   825         self._closed = True
   810         # copy since _threads_in_transaction maybe modified while waiting
   826         # copy since _threads_in_transaction maybe modified while waiting
   811         for thread in self._threads_in_transaction.copy():
   827         for thread, pool in self._threads_in_transaction.copy():
   812             if thread is threading.currentThread():
   828             if thread is threading.currentThread():
   813                 continue
   829                 continue
   814             self.info('waiting for thread %s', thread)
   830             self.info('waiting for thread %s', thread)
   815             # do this loop/break instead of a simple join(10) in case thread is
   831             # do this loop/break instead of a simple join(10) in case thread is
   816             # the main thread (in which case it will be removed from
   832             # the main thread (in which case it will be removed from
   817             # self._threads_in_transaction but still be alive...)
   833             # self._threads_in_transaction but still be alive...)
   818             for i in xrange(10):
   834             for i in xrange(10):
   819                 thread.join(1)
   835                 thread.join(1)
   820                 if not (thread.isAlive() and
   836                 if not (thread.isAlive() and
   821                         thread in self._threads_in_transaction):
   837                         (thread, pool) in self._threads_in_transaction):
   822                     break
   838                     break
   823             else:
   839             else:
   824                 self.error('thread %s still alive after 10 seconds, will close '
   840                 self.error('thread %s still alive after 10 seconds, will close '
   825                            'session anyway', thread)
   841                            'session anyway', thread)
       
   842                 self._free_thread_pool(thread, pool, force_close=True)
   826         self.rollback()
   843         self.rollback()
   827         del self.__threaddata
   844         del self.__threaddata
   828         del self._tx_data
   845         del self._tx_data
   829 
   846 
   830     # transaction data/operations management ##################################
   847     # transaction data/operations management ##################################