128 self.transactionid = txid |
128 self.transactionid = txid |
129 self.ctx_count = 0 |
129 self.ctx_count = 0 |
130 |
130 |
131 |
131 |
132 class Session(RequestSessionBase): |
132 class Session(RequestSessionBase): |
133 """tie session id, user, connections pool and other session data all |
133 """Repository usersession, tie a session id, user, connections set and |
134 together |
134 other session data all together. |
|
135 |
|
136 About session storage / transactions |
|
137 ------------------------------------ |
|
138 |
|
139 Here is a description of internal session attributes. Besides :attr:`data` |
|
140 and :attr:`transaction_data`, you should not have to use attributes |
|
141 described here but higher level APIs. |
|
142 |
|
143 :attr:`data` is a dictionary containing shared data, used to communicate |
|
144 extra information between the client and the repository |
|
145 |
|
146 :attr:`_tx_data` is a dictionary of :class:`TransactionData` instance, one |
|
147 for each running transaction. The key is the transaction id. By default |
|
148 the transaction id is the thread name but it can be otherwise (per dbapi |
|
149 cursor for instance, or per thread name *from another process*). |
|
150 |
|
151 :attr:`__threaddata` is a thread local storage whose `txdata` attribute |
|
152 refers to the proper instance of :class:`TransactionData` according to the |
|
153 transaction. |
|
154 |
|
155 :attr:`_threads_in_transaction` is a set of (thread, connections set) |
|
156 referencing threads that currently hold a connections set for the session. |
|
157 |
|
158 You should not have to use neither :attr:`_txdata` nor :attr:`__threaddata`, |
|
159 simply access transaction data transparently through the :attr:`_threaddata` |
|
160 property. Also, you usually don't have to access it directly since current |
|
161 transaction's data may be accessed/modified through properties / methods: |
|
162 |
|
163 :attr:`transaction_data`, similarly to :attr:`data`, is a dictionary |
|
164 containing some shared data that should be cleared at the end of the |
|
165 transaction. Hooks and operations may put arbitrary data in there, and |
|
166 this may also be used as a communication channel between the client and |
|
167 the repository. |
|
168 |
|
169 :attr:`cnxset`, the connections set to use to execute queries on sources. |
|
170 During a transaction, the connection set may be freed so that is may be |
|
171 used by another session as long as no writing is done. This means we can |
|
172 have multiple sessions with a reasonably low connections set pool size. |
|
173 |
|
174 :attr:`mode`, string telling the connections set handling mode, may be one |
|
175 of 'read' (connections set may be freed), 'write' (some write was done in |
|
176 the connections set, it can't be freed before end of the transaction), |
|
177 'transaction' (we want to keep the connections set during all the |
|
178 transaction, with or without writing) |
|
179 |
|
180 :attr:`pending_operations`, ordered list of operations to be processed on |
|
181 commit/rollback |
|
182 |
|
183 :attr:`commit_state`, describing the transaction commit state, may be one |
|
184 of None (not yet committing), 'precommit' (calling precommit event on |
|
185 operations), 'postcommit' (calling postcommit event on operations), |
|
186 'uncommitable' (some :exc:`ValidationError` or :exc:`Unauthorized` error |
|
187 has been raised during the transaction and so it must be rollbacked). |
|
188 |
|
189 :attr:`read_security` and :attr:`write_security`, boolean flags telling if |
|
190 read/write security is currently activated. |
|
191 |
|
192 :attr:`hooks_mode`, may be either `HOOKS_ALLOW_ALL` or `HOOKS_DENY_ALL`. |
|
193 |
|
194 :attr:`enabled_hook_categories`, when :attr:`hooks_mode` is |
|
195 `HOOKS_DENY_ALL`, this set contains hooks categories that are enabled. |
|
196 |
|
197 :attr:`disabled_hook_categories`, when :attr:`hooks_mode` is |
|
198 `HOOKS_ALLOW_ALL`, this set contains hooks categories that are disabled. |
|
199 |
|
200 |
|
201 :attr:`running_dbapi_query`, boolean flag telling if the executing query |
|
202 is coming from a dbapi connection or is a query from within the repository |
135 """ |
203 """ |
136 is_internal_session = False |
204 is_internal_session = False |
137 |
205 |
138 def __init__(self, user, repo, cnxprops=None, _id=None): |
206 def __init__(self, user, repo, cnxprops=None, _id=None): |
139 super(Session, self).__init__(repo.vreg) |
207 super(Session, self).__init__(repo.vreg) |
188 |
256 |
189 def hijack_user(self, user): |
257 def hijack_user(self, user): |
190 """return a fake request/session using specified user""" |
258 """return a fake request/session using specified user""" |
191 session = Session(user, self.repo) |
259 session = Session(user, self.repo) |
192 threaddata = session._threaddata |
260 threaddata = session._threaddata |
193 threaddata.pool = self.pool |
261 threaddata.cnxset = self.cnxset |
194 # we attributed a pool, need to update ctx_count else it will be freed |
262 # we attributed a connections set, need to update ctx_count else it will be freed |
195 # while undesired |
263 # while undesired |
196 threaddata.ctx_count = 1 |
264 threaddata.ctx_count = 1 |
197 # share pending_operations, else operation added in the hi-jacked |
265 # share pending_operations, else operation added in the hi-jacked |
198 # session such as SendMailOp won't ever be processed |
266 # session such as SendMailOp won't ever be processed |
199 threaddata.pending_operations = self.pending_operations |
267 threaddata.pending_operations = self.pending_operations |
333 |
401 |
334 def system_sql(self, sql, args=None, rollback_on_failure=True): |
402 def system_sql(self, sql, args=None, rollback_on_failure=True): |
335 """return a sql cursor on the system database""" |
403 """return a sql cursor on the system database""" |
336 if sql.split(None, 1)[0].upper() != 'SELECT': |
404 if sql.split(None, 1)[0].upper() != 'SELECT': |
337 self.mode = 'write' |
405 self.mode = 'write' |
338 source = self.pool.source('system') |
406 source = self.cnxset.source('system') |
339 try: |
407 try: |
340 return source.doexec(self, sql, args, rollback=rollback_on_failure) |
408 return source.doexec(self, sql, args, rollback=rollback_on_failure) |
341 except (source.OperationalError, source.InterfaceError): |
409 except (source.OperationalError, source.InterfaceError): |
342 if not rollback_on_failure: |
410 if not rollback_on_failure: |
343 raise |
411 raise |
344 source.warning("trying to reconnect") |
412 source.warning("trying to reconnect") |
345 self.pool.reconnect(source) |
413 self.cnxset.reconnect(source) |
346 return source.doexec(self, sql, args, rollback=rollback_on_failure) |
414 return source.doexec(self, sql, args, rollback=rollback_on_failure) |
347 |
415 |
348 def set_language(self, language): |
416 def set_language(self, language): |
349 """i18n configuration for translation""" |
417 """i18n configuration for translation""" |
350 language = language or self.user.property_value('ui.language') |
418 language = language or self.user.property_value('ui.language') |
604 """ |
672 """ |
605 return self.is_hook_category_activated(hook.category) |
673 return self.is_hook_category_activated(hook.category) |
606 |
674 |
607 # connection management ################################################### |
675 # connection management ################################################### |
608 |
676 |
609 def keep_pool_mode(self, mode): |
677 def keep_cnxset_mode(self, mode): |
610 """set pool_mode, e.g. how the session will keep its pool: |
678 """set `mode`, e.g. how the session will keep its connections set: |
611 |
679 |
612 * if mode == 'write', the pool is freed after each ready query, but kept |
680 * if mode == 'write', the connections set is freed after each ready |
613 until the transaction's end (eg commit or rollback) when a write query |
681 query, but kept until the transaction's end (eg commit or rollback) |
614 is detected (eg INSERT/SET/DELETE queries) |
682 when a write query is detected (eg INSERT/SET/DELETE queries) |
615 |
683 |
616 * if mode == 'transaction', the pool is only freed after the |
684 * if mode == 'transaction', the connections set is only freed after the |
617 transaction's end |
685 transaction's end |
618 |
686 |
619 notice that a repository has a limited set of pools, and a session has to |
687 notice that a repository has a limited set of connections sets, and a |
620 wait for a free pool to run any rql query (unless it already has a pool |
688 session has to wait for a free connections set to run any rql query |
621 set). |
689 (unless it already has one set). |
622 """ |
690 """ |
623 assert mode in ('transaction', 'write') |
691 assert mode in ('transaction', 'write') |
624 if mode == 'transaction': |
692 if mode == 'transaction': |
625 self.default_mode = 'transaction' |
693 self.default_mode = 'transaction' |
626 else: # mode == 'write' |
694 else: # mode == 'write' |
639 def set_commit_state(self, value): |
707 def set_commit_state(self, value): |
640 self._threaddata.commit_state = value |
708 self._threaddata.commit_state = value |
641 commit_state = property(get_commit_state, set_commit_state) |
709 commit_state = property(get_commit_state, set_commit_state) |
642 |
710 |
643 @property |
711 @property |
644 def pool(self): |
712 def cnxset(self): |
645 """connections pool, set according to transaction mode for each query""" |
713 """connections set, set according to transaction mode for each query""" |
646 if self._closed: |
714 if self._closed: |
647 self.reset_pool(True) |
715 self.free_cnxset(True) |
648 raise Exception('try to access pool on a closed session') |
716 raise Exception('try to access connections set on a closed session') |
649 return getattr(self._threaddata, 'pool', None) |
717 return getattr(self._threaddata, 'cnxset', None) |
650 |
718 |
651 def set_pool(self): |
719 def set_cnxset(self): |
652 """the session need a pool to execute some queries""" |
720 """the session need a connections set to execute some queries""" |
653 with self._closed_lock: |
721 with self._closed_lock: |
654 if self._closed: |
722 if self._closed: |
655 self.reset_pool(True) |
723 self.free_cnxset(True) |
656 raise Exception('try to set pool on a closed session') |
724 raise Exception('try to set connections set on a closed session') |
657 if self.pool is None: |
725 if self.cnxset is None: |
658 # get pool first to avoid race-condition |
726 # get connections set first to avoid race-condition |
659 self._threaddata.pool = pool = self.repo._get_pool() |
727 self._threaddata.cnxset = cnxset = self.repo._get_cnxset() |
660 self._threaddata.ctx_count += 1 |
728 self._threaddata.ctx_count += 1 |
661 try: |
729 try: |
662 pool.pool_set() |
730 cnxset.cnxset_set() |
663 except: |
731 except: |
664 self._threaddata.pool = None |
732 self._threaddata.cnxset = None |
665 self.repo._free_pool(pool) |
733 self.repo._free_cnxset(cnxset) |
666 raise |
734 raise |
667 self._threads_in_transaction.add( |
735 self._threads_in_transaction.add( |
668 (threading.currentThread(), pool) ) |
736 (threading.currentThread(), cnxset) ) |
669 return self._threaddata.pool |
737 return self._threaddata.cnxset |
670 |
738 |
671 def _free_thread_pool(self, thread, pool, force_close=False): |
739 def _free_thread_cnxset(self, thread, cnxset, force_close=False): |
672 try: |
740 try: |
673 self._threads_in_transaction.remove( (thread, pool) ) |
741 self._threads_in_transaction.remove( (thread, cnxset) ) |
674 except KeyError: |
742 except KeyError: |
675 # race condition on pool freeing (freed by commit or rollback vs |
743 # race condition on cnxset freeing (freed by commit or rollback vs |
676 # close) |
744 # close) |
677 pass |
745 pass |
678 else: |
746 else: |
679 if force_close: |
747 if force_close: |
680 pool.reconnect() |
748 cnxset.reconnect() |
681 else: |
749 else: |
682 pool.pool_reset() |
750 cnxset.cnxset_freed() |
683 # free pool once everything is done to avoid race-condition |
751 # free cnxset once everything is done to avoid race-condition |
684 self.repo._free_pool(pool) |
752 self.repo._free_cnxset(cnxset) |
685 |
753 |
686 def reset_pool(self, ignoremode=False): |
754 def free_cnxset(self, ignoremode=False): |
687 """the session is no longer using its pool, at least for some time""" |
755 """the session is no longer using its connections set, at least for some time""" |
688 # pool may be none if no operation has been done since last commit |
756 # cnxset may be none if no operation has been done since last commit |
689 # or rollback |
757 # or rollback |
690 pool = getattr(self._threaddata, 'pool', None) |
758 cnxset = getattr(self._threaddata, 'cnxset', None) |
691 if pool is not None and (ignoremode or self.mode == 'read'): |
759 if cnxset is not None and (ignoremode or self.mode == 'read'): |
692 # even in read mode, we must release the current transaction |
760 # even in read mode, we must release the current transaction |
693 self._free_thread_pool(threading.currentThread(), pool) |
761 self._free_thread_cnxset(threading.currentThread(), cnxset) |
694 del self._threaddata.pool |
762 del self._threaddata.cnxset |
695 self._threaddata.ctx_count -= 1 |
763 self._threaddata.ctx_count -= 1 |
696 |
764 |
697 def _touch(self): |
765 def _touch(self): |
698 """update latest session usage timestamp and reset mode to read""" |
766 """update latest session usage timestamp and reset mode to read""" |
699 self.timestamp = time() |
767 self.timestamp = time() |
779 self.timestamp = time() # update timestamp |
847 self.timestamp = time() # update timestamp |
780 rset = self._execute(self, rql, kwargs, build_descr) |
848 rset = self._execute(self, rql, kwargs, build_descr) |
781 rset.req = self |
849 rset.req = self |
782 return rset |
850 return rset |
783 |
851 |
784 def _clear_thread_data(self, reset_pool=True): |
852 def _clear_thread_data(self, free_cnxset=True): |
785 """remove everything from the thread local storage, except pool |
853 """remove everything from the thread local storage, except connections set |
786 which is explicitly removed by reset_pool, and mode which is set anyway |
854 which is explicitly removed by free_cnxset, and mode which is set anyway |
787 by _touch |
855 by _touch |
788 """ |
856 """ |
789 try: |
857 try: |
790 txstore = self.__threaddata.txdata |
858 txstore = self.__threaddata.txdata |
791 except AttributeError: |
859 except AttributeError: |
792 pass |
860 pass |
793 else: |
861 else: |
794 if reset_pool: |
862 if free_cnxset: |
795 self.reset_pool() |
863 self.free_cnxset() |
796 if txstore.ctx_count == 0: |
864 if txstore.ctx_count == 0: |
797 self._clear_thread_storage(txstore) |
865 self._clear_thread_storage(txstore) |
798 else: |
866 else: |
799 self._clear_tx_storage(txstore) |
867 self._clear_tx_storage(txstore) |
800 else: |
868 else: |
814 try: |
882 try: |
815 delattr(txstore, name) |
883 delattr(txstore, name) |
816 except AttributeError: |
884 except AttributeError: |
817 continue |
885 continue |
818 |
886 |
819 def commit(self, reset_pool=True): |
887 def commit(self, free_cnxset=True, reset_pool=None): |
820 """commit the current session's transaction""" |
888 """commit the current session's transaction""" |
821 if self.pool is None: |
889 if reset_pool is not None: |
|
890 warn('[3.13] use free_cnxset argument instead for reset_pool', |
|
891 DeprecationWarning, stacklevel=2) |
|
892 free_cnxset = reset_pool |
|
893 if self.cnxset is None: |
822 assert not self.pending_operations |
894 assert not self.pending_operations |
823 self._clear_thread_data() |
895 self._clear_thread_data() |
824 self._touch() |
896 self._touch() |
825 self.debug('commit session %s done (no db activity)', self.id) |
897 self.debug('commit session %s done (no db activity)', self.id) |
826 return |
898 return |
865 self.critical('error while reverting precommit', |
937 self.critical('error while reverting precommit', |
866 exc_info=True) |
938 exc_info=True) |
867 # XXX use slice notation since self.pending_operations is a |
939 # XXX use slice notation since self.pending_operations is a |
868 # read-only property. |
940 # read-only property. |
869 self.pending_operations[:] = processed + self.pending_operations |
941 self.pending_operations[:] = processed + self.pending_operations |
870 self.rollback(reset_pool) |
942 self.rollback(free_cnxset) |
871 raise |
943 raise |
872 self.pool.commit() |
944 self.cnxset.commit() |
873 self.commit_state = 'postcommit' |
945 self.commit_state = 'postcommit' |
874 while self.pending_operations: |
946 while self.pending_operations: |
875 operation = self.pending_operations.pop(0) |
947 operation = self.pending_operations.pop(0) |
876 operation.processed = 'postcommit' |
948 operation.processed = 'postcommit' |
877 try: |
949 try: |
881 exc_info=sys.exc_info()) |
953 exc_info=sys.exc_info()) |
882 self.debug('postcommit session %s done', self.id) |
954 self.debug('postcommit session %s done', self.id) |
883 return self.transaction_uuid(set=False) |
955 return self.transaction_uuid(set=False) |
884 finally: |
956 finally: |
885 self._touch() |
957 self._touch() |
886 if reset_pool: |
958 if free_cnxset: |
887 self.reset_pool(ignoremode=True) |
959 self.free_cnxset(ignoremode=True) |
888 self._clear_thread_data(reset_pool) |
960 self._clear_thread_data(free_cnxset) |
889 |
961 |
890 def rollback(self, reset_pool=True): |
962 def rollback(self, free_cnxset=True, reset_pool=None): |
891 """rollback the current session's transaction""" |
963 """rollback the current session's transaction""" |
892 # don't use self.pool, rollback may be called with _closed == True |
964 if reset_pool is not None: |
893 pool = getattr(self._threaddata, 'pool', None) |
965 warn('[3.13] use free_cnxset argument instead for reset_pool', |
894 if pool is None: |
966 DeprecationWarning, stacklevel=2) |
|
967 free_cnxset = reset_pool |
|
968 # don't use self.cnxset, rollback may be called with _closed == True |
|
969 cnxset = getattr(self._threaddata, 'cnxset', None) |
|
970 if cnxset is None: |
895 self._clear_thread_data() |
971 self._clear_thread_data() |
896 self._touch() |
972 self._touch() |
897 self.debug('rollback session %s done (no db activity)', self.id) |
973 self.debug('rollback session %s done (no db activity)', self.id) |
898 return |
974 return |
899 try: |
975 try: |
904 operation = self.pending_operations.pop(0) |
980 operation = self.pending_operations.pop(0) |
905 operation.handle_event('rollback_event') |
981 operation.handle_event('rollback_event') |
906 except: |
982 except: |
907 self.critical('rollback error', exc_info=sys.exc_info()) |
983 self.critical('rollback error', exc_info=sys.exc_info()) |
908 continue |
984 continue |
909 pool.rollback() |
985 cnxset.rollback() |
910 self.debug('rollback for session %s done', self.id) |
986 self.debug('rollback for session %s done', self.id) |
911 finally: |
987 finally: |
912 self._touch() |
988 self._touch() |
913 if reset_pool: |
989 if free_cnxset: |
914 self.reset_pool(ignoremode=True) |
990 self.free_cnxset(ignoremode=True) |
915 self._clear_thread_data(reset_pool) |
991 self._clear_thread_data(free_cnxset) |
916 |
992 |
917 def close(self): |
993 def close(self): |
918 """do not close pool on session close, since they are shared now""" |
994 """do not close connections set on session close, since they are shared now""" |
919 with self._closed_lock: |
995 with self._closed_lock: |
920 self._closed = True |
996 self._closed = True |
921 # copy since _threads_in_transaction maybe modified while waiting |
997 # copy since _threads_in_transaction maybe modified while waiting |
922 for thread, pool in self._threads_in_transaction.copy(): |
998 for thread, cnxset in self._threads_in_transaction.copy(): |
923 if thread is threading.currentThread(): |
999 if thread is threading.currentThread(): |
924 continue |
1000 continue |
925 self.info('waiting for thread %s', thread) |
1001 self.info('waiting for thread %s', thread) |
926 # do this loop/break instead of a simple join(10) in case thread is |
1002 # do this loop/break instead of a simple join(10) in case thread is |
927 # the main thread (in which case it will be removed from |
1003 # the main thread (in which case it will be removed from |
928 # self._threads_in_transaction but still be alive...) |
1004 # self._threads_in_transaction but still be alive...) |
929 for i in xrange(10): |
1005 for i in xrange(10): |
930 thread.join(1) |
1006 thread.join(1) |
931 if not (thread.isAlive() and |
1007 if not (thread.isAlive() and |
932 (thread, pool) in self._threads_in_transaction): |
1008 (thread, cnxset) in self._threads_in_transaction): |
933 break |
1009 break |
934 else: |
1010 else: |
935 self.error('thread %s still alive after 10 seconds, will close ' |
1011 self.error('thread %s still alive after 10 seconds, will close ' |
936 'session anyway', thread) |
1012 'session anyway', thread) |
937 self._free_thread_pool(thread, pool, force_close=True) |
1013 self._free_thread_cnxset(thread, cnxset, force_close=True) |
938 self.rollback() |
1014 self.rollback() |
939 del self.__threaddata |
1015 del self.__threaddata |
940 del self._tx_data |
1016 del self._tx_data |
941 |
1017 |
942 @property |
1018 @property |
1076 description.append(tuple(row_descr)) |
1151 description.append(tuple(row_descr)) |
1077 return description |
1152 return description |
1078 |
1153 |
1079 # deprecated ############################################################### |
1154 # deprecated ############################################################### |
1080 |
1155 |
|
1156 @property |
|
1157 @deprecated("[3.13] use .cnxset attribute instead of .pool") |
|
1158 def pool(self): |
|
1159 return self.cnxset |
|
1160 |
|
1161 @deprecated("[3.13] use .set_cnxset() method instead of .set_pool()") |
|
1162 def set_pool(self): |
|
1163 return self.set_cnxset() |
|
1164 |
|
1165 @deprecated("[3.13] use .free_cnxset() method instead of .reset_pool()") |
|
1166 def reset_pool(self): |
|
1167 return self.free_cnxset() |
|
1168 |
1081 @deprecated("[3.7] execute is now unsafe by default in hooks/operation. You" |
1169 @deprecated("[3.7] execute is now unsafe by default in hooks/operation. You" |
1082 " can also control security with the security_enabled context " |
1170 " can also control security with the security_enabled context " |
1083 "manager") |
1171 "manager") |
1084 def unsafe_execute(self, rql, kwargs=None, eid_key=None, build_descr=True, |
1172 def unsafe_execute(self, rql, kwargs=None, eid_key=None, build_descr=True, |
1085 propagate=False): |
1173 propagate=False): |
1141 self.user._cw = self # XXX remove when "vreg = user._cw.vreg" hack in entity.py is gone |
1229 self.user._cw = self # XXX remove when "vreg = user._cw.vreg" hack in entity.py is gone |
1142 self.cnxtype = 'inmemory' |
1230 self.cnxtype = 'inmemory' |
1143 self.disable_hook_categories('integrity') |
1231 self.disable_hook_categories('integrity') |
1144 |
1232 |
1145 @property |
1233 @property |
1146 def pool(self): |
1234 def cnxset(self): |
1147 """connections pool, set according to transaction mode for each query""" |
1235 """connections set, set according to transaction mode for each query""" |
1148 if self.repo.shutting_down: |
1236 if self.repo.shutting_down: |
1149 self.reset_pool(True) |
1237 self.free_cnxset(True) |
1150 raise Exception('repository is shutting down') |
1238 raise Exception('repository is shutting down') |
1151 return getattr(self._threaddata, 'pool', None) |
1239 return getattr(self._threaddata, 'cnxset', None) |
1152 |
1240 |
1153 |
1241 |
1154 class InternalManager(object): |
1242 class InternalManager(object): |
1155 """a manager user with all access rights used internally for task such as |
1243 """a manager user with all access rights used internally for task such as |
1156 bootstrapping the repository or creating regular users according to |
1244 bootstrapping the repository or creating regular users according to |