561 commit_state = property(get_commit_state, set_commit_state) |
561 commit_state = property(get_commit_state, set_commit_state) |
562 |
562 |
563 @property |
563 @property |
564 def pool(self): |
564 def pool(self): |
565 """connections pool, set according to transaction mode for each query""" |
565 """connections pool, set according to transaction mode for each query""" |
|
566 if self._closed: |
|
567 self.reset_pool(True) |
|
568 raise Exception('try to access pool on a closed session') |
566 return getattr(self._threaddata, 'pool', None) |
569 return getattr(self._threaddata, 'pool', None) |
567 |
570 |
568 def set_pool(self, checkclosed=True): |
571 def set_pool(self): |
569 """the session need a pool to execute some queries""" |
572 """the session need a pool to execute some queries""" |
570 if checkclosed and self._closed: |
573 if self._closed: |
|
574 self.reset_pool(True) |
571 raise Exception('try to set pool on a closed session') |
575 raise Exception('try to set pool on a closed session') |
572 if self.pool is None: |
576 if self.pool is None: |
573 # get pool first to avoid race-condition |
577 # get pool first to avoid race-condition |
574 self._threaddata.pool = pool = self.repo._get_pool() |
578 self._threaddata.pool = pool = self.repo._get_pool() |
575 try: |
579 try: |
576 pool.pool_set() |
580 pool.pool_set() |
577 except: |
581 except: |
578 self._threaddata.pool = None |
582 self._threaddata.pool = None |
579 self.repo._free_pool(pool) |
583 self.repo._free_pool(pool) |
580 raise |
584 raise |
581 self._threads_in_transaction.add(threading.currentThread()) |
585 self._threads_in_transaction.add( |
|
586 (threading.currentThread(), pool) ) |
582 return self._threaddata.pool |
587 return self._threaddata.pool |
|
588 |
|
589 def _free_thread_pool(self, thread, pool, force_close=False): |
|
590 try: |
|
591 self._threads_in_transaction.remove( (thread, pool) ) |
|
592 except KeyError: |
|
593 # race condition on pool freeing (freed by commit or rollback vs |
|
594 # close) |
|
595 pass |
|
596 else: |
|
597 if force_close: |
|
598 pool.reconnect() |
|
599 else: |
|
600 pool.pool_reset() |
|
601 # free pool once everything is done to avoid race-condition |
|
602 self.repo._free_pool(pool) |
583 |
603 |
584 def reset_pool(self, ignoremode=False): |
604 def reset_pool(self, ignoremode=False): |
585 """the session is no longer using its pool, at least for some time""" |
605 """the session is no longer using its pool, at least for some time""" |
586 # pool may be none if no operation has been done since last commit |
606 # pool may be none if no operation has been done since last commit |
587 # or rollback |
607 # or rollback |
588 if self.pool is not None and (ignoremode or self.mode == 'read'): |
608 pool = getattr(self._threaddata, 'pool', None) |
|
609 if pool is not None and (ignoremode or self.mode == 'read'): |
589 # even in read mode, we must release the current transaction |
610 # even in read mode, we must release the current transaction |
590 pool = self.pool |
611 self._free_thread_pool(threading.currentThread(), pool) |
591 try: |
|
592 self._threads_in_transaction.remove(threading.currentThread()) |
|
593 except KeyError: |
|
594 pass |
|
595 pool.pool_reset() |
|
596 del self._threaddata.pool |
612 del self._threaddata.pool |
597 # free pool once everything is done to avoid race-condition |
|
598 self.repo._free_pool(pool) |
|
599 |
613 |
600 def _touch(self): |
614 def _touch(self): |
601 """update latest session usage timestamp and reset mode to read""" |
615 """update latest session usage timestamp and reset mode to read""" |
602 self.timestamp = time() |
616 self.timestamp = time() |
603 self.local_perm_cache.clear() # XXX simply move in transaction_data, no? |
617 self.local_perm_cache.clear() # XXX simply move in transaction_data, no? |
770 self.reset_pool(ignoremode=True) |
784 self.reset_pool(ignoremode=True) |
771 self._clear_thread_data(reset_pool) |
785 self._clear_thread_data(reset_pool) |
772 |
786 |
773 def rollback(self, reset_pool=True): |
787 def rollback(self, reset_pool=True): |
774 """rollback the current session's transaction""" |
788 """rollback the current session's transaction""" |
775 if self.pool is None: |
789 # don't use self.pool, rollback may be called with _closed == True |
|
790 pool = getattr(self._threaddata, 'pool', None) |
|
791 if pool is None: |
776 self._clear_thread_data() |
792 self._clear_thread_data() |
777 self._touch() |
793 self._touch() |
778 self.debug('rollback session %s done (no db activity)', self.id) |
794 self.debug('rollback session %s done (no db activity)', self.id) |
779 return |
795 return |
780 try: |
796 try: |
797 |
813 |
798 def close(self): |
814 def close(self): |
799 """do not close pool on session close, since they are shared now""" |
815 """do not close pool on session close, since they are shared now""" |
800 self._closed = True |
816 self._closed = True |
801 # copy since _threads_in_transaction maybe modified while waiting |
817 # copy since _threads_in_transaction maybe modified while waiting |
802 for thread in self._threads_in_transaction.copy(): |
818 for thread, pool in self._threads_in_transaction.copy(): |
803 if thread is threading.currentThread(): |
819 if thread is threading.currentThread(): |
804 continue |
820 continue |
805 self.info('waiting for thread %s', thread) |
821 self.info('waiting for thread %s', thread) |
806 # do this loop/break instead of a simple join(10) in case thread is |
822 # do this loop/break instead of a simple join(10) in case thread is |
807 # the main thread (in which case it will be removed from |
823 # the main thread (in which case it will be removed from |
808 # self._threads_in_transaction but still be alive...) |
824 # self._threads_in_transaction but still be alive...) |
809 for i in xrange(10): |
825 for i in xrange(10): |
810 thread.join(1) |
826 thread.join(1) |
811 if not (thread.isAlive() and |
827 if not (thread.isAlive() and |
812 thread in self._threads_in_transaction): |
828 (thread, pool) in self._threads_in_transaction): |
813 break |
829 break |
814 else: |
830 else: |
815 self.error('thread %s still alive after 10 seconds, will close ' |
831 self.error('thread %s still alive after 10 seconds, will close ' |
816 'session anyway', thread) |
832 'session anyway', thread) |
|
833 self._free_thread_pool(thread, pool, force_close=True) |
817 self.rollback() |
834 self.rollback() |
818 del self.__threaddata |
835 del self.__threaddata |
819 del self._tx_data |
836 del self._tx_data |
820 |
837 |
821 # transaction data/operations management ################################## |
838 # transaction data/operations management ################################## |