560 commit_state = property(get_commit_state, set_commit_state) |
560 commit_state = property(get_commit_state, set_commit_state) |
561 |
561 |
562 @property |
562 @property |
563 def pool(self): |
563 def pool(self): |
564 """connections pool, set according to transaction mode for each query""" |
564 """connections pool, set according to transaction mode for each query""" |
|
565 if self._closed: |
|
566 self.reset_pool(True) |
|
567 raise Exception('try to access pool on a closed session') |
565 return getattr(self._threaddata, 'pool', None) |
568 return getattr(self._threaddata, 'pool', None) |
566 |
569 |
567 def set_pool(self, checkclosed=True): |
570 def set_pool(self): |
568 """the session need a pool to execute some queries""" |
571 """the session need a pool to execute some queries""" |
569 if checkclosed and self._closed: |
572 if self._closed: |
|
573 self.reset_pool(True) |
570 raise Exception('try to set pool on a closed session') |
574 raise Exception('try to set pool on a closed session') |
571 if self.pool is None: |
575 if self.pool is None: |
572 # get pool first to avoid race-condition |
576 # get pool first to avoid race-condition |
573 self._threaddata.pool = pool = self.repo._get_pool() |
577 self._threaddata.pool = pool = self.repo._get_pool() |
574 try: |
578 try: |
575 pool.pool_set() |
579 pool.pool_set() |
576 except: |
580 except: |
577 self._threaddata.pool = None |
581 self._threaddata.pool = None |
578 self.repo._free_pool(pool) |
582 self.repo._free_pool(pool) |
579 raise |
583 raise |
580 self._threads_in_transaction.add(threading.currentThread()) |
584 self._threads_in_transaction.add( |
|
585 (threading.currentThread(), pool) ) |
581 return self._threaddata.pool |
586 return self._threaddata.pool |
|
587 |
|
588 def _free_thread_pool(self, thread, pool, force_close=False): |
|
589 try: |
|
590 self._threads_in_transaction.remove( (thread, pool) ) |
|
591 except KeyError: |
|
592 # race condition on pool freeing (freed by commit or rollback vs |
|
593 # close) |
|
594 pass |
|
595 else: |
|
596 if force_close: |
|
597 pool.reconnect() |
|
598 else: |
|
599 pool.pool_reset() |
|
600 # free pool once everything is done to avoid race-condition |
|
601 self.repo._free_pool(pool) |
582 |
602 |
583 def reset_pool(self, ignoremode=False): |
603 def reset_pool(self, ignoremode=False): |
584 """the session is no longer using its pool, at least for some time""" |
604 """the session is no longer using its pool, at least for some time""" |
585 # pool may be none if no operation has been done since last commit |
605 # pool may be none if no operation has been done since last commit |
586 # or rollback |
606 # or rollback |
587 if self.pool is not None and (ignoremode or self.mode == 'read'): |
607 pool = getattr(self._threaddata, 'pool', None) |
|
608 if pool is not None and (ignoremode or self.mode == 'read'): |
588 # even in read mode, we must release the current transaction |
609 # even in read mode, we must release the current transaction |
589 pool = self.pool |
610 self._free_thread_pool(threading.currentThread(), pool) |
590 try: |
|
591 self._threads_in_transaction.remove(threading.currentThread()) |
|
592 except KeyError: |
|
593 pass |
|
594 pool.pool_reset() |
|
595 del self._threaddata.pool |
611 del self._threaddata.pool |
596 # free pool once everything is done to avoid race-condition |
|
597 self.repo._free_pool(pool) |
|
598 |
612 |
599 def _touch(self): |
613 def _touch(self): |
600 """update latest session usage timestamp and reset mode to read""" |
614 """update latest session usage timestamp and reset mode to read""" |
601 self.timestamp = time() |
615 self.timestamp = time() |
602 self.local_perm_cache.clear() # XXX simply move in transaction_data, no? |
616 self.local_perm_cache.clear() # XXX simply move in transaction_data, no? |
779 self.reset_pool(ignoremode=True) |
793 self.reset_pool(ignoremode=True) |
780 self._clear_thread_data(reset_pool) |
794 self._clear_thread_data(reset_pool) |
781 |
795 |
782 def rollback(self, reset_pool=True): |
796 def rollback(self, reset_pool=True): |
783 """rollback the current session's transaction""" |
797 """rollback the current session's transaction""" |
784 if self.pool is None: |
798 # don't use self.pool, rollback may be called with _closed == True |
|
799 pool = getattr(self._threaddata, 'pool', None) |
|
800 if pool is None: |
785 self._clear_thread_data() |
801 self._clear_thread_data() |
786 self._touch() |
802 self._touch() |
787 self.debug('rollback session %s done (no db activity)', self.id) |
803 self.debug('rollback session %s done (no db activity)', self.id) |
788 return |
804 return |
789 try: |
805 try: |
806 |
822 |
807 def close(self): |
823 def close(self): |
808 """do not close pool on session close, since they are shared now""" |
824 """do not close pool on session close, since they are shared now""" |
809 self._closed = True |
825 self._closed = True |
810 # copy since _threads_in_transaction maybe modified while waiting |
826 # copy since _threads_in_transaction maybe modified while waiting |
811 for thread in self._threads_in_transaction.copy(): |
827 for thread, pool in self._threads_in_transaction.copy(): |
812 if thread is threading.currentThread(): |
828 if thread is threading.currentThread(): |
813 continue |
829 continue |
814 self.info('waiting for thread %s', thread) |
830 self.info('waiting for thread %s', thread) |
815 # do this loop/break instead of a simple join(10) in case thread is |
831 # do this loop/break instead of a simple join(10) in case thread is |
816 # the main thread (in which case it will be removed from |
832 # the main thread (in which case it will be removed from |
817 # self._threads_in_transaction but still be alive...) |
833 # self._threads_in_transaction but still be alive...) |
818 for i in xrange(10): |
834 for i in xrange(10): |
819 thread.join(1) |
835 thread.join(1) |
820 if not (thread.isAlive() and |
836 if not (thread.isAlive() and |
821 thread in self._threads_in_transaction): |
837 (thread, pool) in self._threads_in_transaction): |
822 break |
838 break |
823 else: |
839 else: |
824 self.error('thread %s still alive after 10 seconds, will close ' |
840 self.error('thread %s still alive after 10 seconds, will close ' |
825 'session anyway', thread) |
841 'session anyway', thread) |
|
842 self._free_thread_pool(thread, pool, force_close=True) |
826 self.rollback() |
843 self.rollback() |
827 del self.__threaddata |
844 del self.__threaddata |
828 del self._tx_data |
845 del self._tx_data |
829 |
846 |
830 # transaction data/operations management ################################## |
847 # transaction data/operations management ################################## |