server/session.py
changeset 5849 9db65b381028
parent 5815 282194aa43f3
parent 5826 462435bf5457
child 5891 99024ad59223
equal deleted inserted replaced
5840:60880c81e32e 5849:9db65b381028
   561     commit_state = property(get_commit_state, set_commit_state)
   561     commit_state = property(get_commit_state, set_commit_state)
   562 
   562 
   563     @property
   563     @property
   564     def pool(self):
   564     def pool(self):
   565         """connections pool, set according to transaction mode for each query"""
   565         """connections pool, set according to transaction mode for each query"""
       
   566         if self._closed:
       
   567             self.reset_pool(True)
       
   568             raise Exception('try to access pool on a closed session')
   566         return getattr(self._threaddata, 'pool', None)
   569         return getattr(self._threaddata, 'pool', None)
   567 
   570 
   568     def set_pool(self, checkclosed=True):
   571     def set_pool(self):
   569         """the session need a pool to execute some queries"""
   572         """the session need a pool to execute some queries"""
   570         if checkclosed and self._closed:
   573         if self._closed:
       
   574             self.reset_pool(True)
   571             raise Exception('try to set pool on a closed session')
   575             raise Exception('try to set pool on a closed session')
   572         if self.pool is None:
   576         if self.pool is None:
   573             # get pool first to avoid race-condition
   577             # get pool first to avoid race-condition
   574             self._threaddata.pool = pool = self.repo._get_pool()
   578             self._threaddata.pool = pool = self.repo._get_pool()
   575             try:
   579             try:
   576                 pool.pool_set()
   580                 pool.pool_set()
   577             except:
   581             except:
   578                 self._threaddata.pool = None
   582                 self._threaddata.pool = None
   579                 self.repo._free_pool(pool)
   583                 self.repo._free_pool(pool)
   580                 raise
   584                 raise
   581             self._threads_in_transaction.add(threading.currentThread())
   585             self._threads_in_transaction.add(
       
   586                 (threading.currentThread(), pool) )
   582         return self._threaddata.pool
   587         return self._threaddata.pool
       
   588 
       
   589     def _free_thread_pool(self, thread, pool, force_close=False):
       
   590         try:
       
   591             self._threads_in_transaction.remove( (thread, pool) )
       
   592         except KeyError:
       
   593             # race condition on pool freeing (freed by commit or rollback vs
       
   594             # close)
       
   595             pass
       
   596         else:
       
   597             if force_close:
       
   598                 pool.reconnect()
       
   599             else:
       
   600                 pool.pool_reset()
       
   601             # free pool once everything is done to avoid race-condition
       
   602             self.repo._free_pool(pool)
   583 
   603 
   584     def reset_pool(self, ignoremode=False):
   604     def reset_pool(self, ignoremode=False):
   585         """the session is no longer using its pool, at least for some time"""
   605         """the session is no longer using its pool, at least for some time"""
   586         # pool may be none if no operation has been done since last commit
   606         # pool may be none if no operation has been done since last commit
   587         # or rollback
   607         # or rollback
   588         if self.pool is not None and (ignoremode or self.mode == 'read'):
   608         pool = getattr(self._threaddata, 'pool', None)
       
   609         if pool is not None and (ignoremode or self.mode == 'read'):
   589             # even in read mode, we must release the current transaction
   610             # even in read mode, we must release the current transaction
   590             pool = self.pool
   611             self._free_thread_pool(threading.currentThread(), pool)
   591             try:
       
   592                 self._threads_in_transaction.remove(threading.currentThread())
       
   593             except KeyError:
       
   594                 pass
       
   595             pool.pool_reset()
       
   596             del self._threaddata.pool
   612             del self._threaddata.pool
   597             # free pool once everything is done to avoid race-condition
       
   598             self.repo._free_pool(pool)
       
   599 
   613 
   600     def _touch(self):
   614     def _touch(self):
   601         """update latest session usage timestamp and reset mode to read"""
   615         """update latest session usage timestamp and reset mode to read"""
   602         self.timestamp = time()
   616         self.timestamp = time()
   603         self.local_perm_cache.clear() # XXX simply move in transaction_data, no?
   617         self.local_perm_cache.clear() # XXX simply move in transaction_data, no?
   770                 self.reset_pool(ignoremode=True)
   784                 self.reset_pool(ignoremode=True)
   771             self._clear_thread_data(reset_pool)
   785             self._clear_thread_data(reset_pool)
   772 
   786 
   773     def rollback(self, reset_pool=True):
   787     def rollback(self, reset_pool=True):
   774         """rollback the current session's transaction"""
   788         """rollback the current session's transaction"""
   775         if self.pool is None:
   789         # don't use self.pool, rollback may be called with _closed == True
       
   790         pool = getattr(self._threaddata, 'pool', None)
       
   791         if pool is None:
   776             self._clear_thread_data()
   792             self._clear_thread_data()
   777             self._touch()
   793             self._touch()
   778             self.debug('rollback session %s done (no db activity)', self.id)
   794             self.debug('rollback session %s done (no db activity)', self.id)
   779             return
   795             return
   780         try:
   796         try:
   785                         operation = self.pending_operations.pop(0)
   801                         operation = self.pending_operations.pop(0)
   786                         operation.handle_event('rollback_event')
   802                         operation.handle_event('rollback_event')
   787                     except:
   803                     except:
   788                         self.critical('rollback error', exc_info=sys.exc_info())
   804                         self.critical('rollback error', exc_info=sys.exc_info())
   789                         continue
   805                         continue
   790                 self.pool.rollback()
   806                 pool.rollback()
   791                 self.debug('rollback for session %s done', self.id)
   807                 self.debug('rollback for session %s done', self.id)
   792         finally:
   808         finally:
   793             self._touch()
   809             self._touch()
   794             if reset_pool:
   810             if reset_pool:
   795                 self.reset_pool(ignoremode=True)
   811                 self.reset_pool(ignoremode=True)
   797 
   813 
   798     def close(self):
   814     def close(self):
   799         """do not close pool on session close, since they are shared now"""
   815         """do not close pool on session close, since they are shared now"""
   800         self._closed = True
   816         self._closed = True
   801         # copy since _threads_in_transaction maybe modified while waiting
   817         # copy since _threads_in_transaction maybe modified while waiting
   802         for thread in self._threads_in_transaction.copy():
   818         for thread, pool in self._threads_in_transaction.copy():
   803             if thread is threading.currentThread():
   819             if thread is threading.currentThread():
   804                 continue
   820                 continue
   805             self.info('waiting for thread %s', thread)
   821             self.info('waiting for thread %s', thread)
   806             # do this loop/break instead of a simple join(10) in case thread is
   822             # do this loop/break instead of a simple join(10) in case thread is
   807             # the main thread (in which case it will be removed from
   823             # the main thread (in which case it will be removed from
   808             # self._threads_in_transaction but still be alive...)
   824             # self._threads_in_transaction but still be alive...)
   809             for i in xrange(10):
   825             for i in xrange(10):
   810                 thread.join(1)
   826                 thread.join(1)
   811                 if not (thread.isAlive() and
   827                 if not (thread.isAlive() and
   812                         thread in self._threads_in_transaction):
   828                         (thread, pool) in self._threads_in_transaction):
   813                     break
   829                     break
   814             else:
   830             else:
   815                 self.error('thread %s still alive after 10 seconds, will close '
   831                 self.error('thread %s still alive after 10 seconds, will close '
   816                            'session anyway', thread)
   832                            'session anyway', thread)
       
   833                 self._free_thread_pool(thread, pool, force_close=True)
   817         self.rollback()
   834         self.rollback()
   818         del self.__threaddata
   835         del self.__threaddata
   819         del self._tx_data
   836         del self._tx_data
   820 
   837 
   821     # transaction data/operations management ##################################
   838     # transaction data/operations management ##################################