server/session.py
changeset 8809 9ee4d0c65ad2
parent 8808 538852f69b48
child 8810 1a25bdd49f9a
equal deleted inserted replaced
8808:538852f69b48 8809:9ee4d0c65ad2
   143 
   143 
   144 HOOKS_ALLOW_ALL = object()
   144 HOOKS_ALLOW_ALL = object()
   145 HOOKS_DENY_ALL = object()
   145 HOOKS_DENY_ALL = object()
   146 DEFAULT_SECURITY = object() # evaluated to true by design
   146 DEFAULT_SECURITY = object() # evaluated to true by design
   147 
   147 
       
   148 class SessionClosedError(RuntimeError):
       
   149     pass
       
   150 
       
   151 class CnxSetTracker(object):
       
   152     """Keep track of which transaction use which cnxset.
       
   153 
       
   154     There should be one of this object per session plus one another for
       
   155     internal session.
       
   156 
       
   157     Session object are responsible of creating their CnxSetTracker object.
       
   158 
       
   159     Transaction should use the :meth:`record` and :meth:`forget` to inform the
       
   160     tracker of cnxset they have acquired.
       
   161 
       
   162     .. automethod:: cubicweb.server.session.CnxSetTracker.record
       
   163     .. automethod:: cubicweb.server.session.CnxSetTracker.forget
       
   164 
       
   165     Session use the :meth:`close` and :meth:`wait` method when closing.
       
   166 
       
   167     .. automethod:: cubicweb.server.session.CnxSetTracker.close
       
   168     .. automethod:: cubicweb.server.session.CnxSetTracker.wait
       
   169 
       
   170     This object itself is threadsafe. It also requires caller to acquired its
       
   171     lock in some situation.
       
   172     """
       
   173 
       
   174     def __init__(self):
       
   175         self._active = True
       
   176         self._condition = threading.Condition()
       
   177         self._record = {}
       
   178 
       
   179     def __enter__(self):
       
   180         self._condition.__enter__()
       
   181 
       
   182     def __exit__(self, *args):
       
   183         self._condition.__exit__(*args)
       
   184 
       
   185     def record(self, txid, cnxset):
       
   186         """Inform the tracker that a txid have acquired a cnxset
       
   187 
       
   188         This methode is to be used by Transaction object.
       
   189 
       
   190         This method fails when:
       
   191         - The txid already have a recorded cnxset.
       
   192         - The tracker is not active anymore.
       
   193 
       
   194         Notes about the caller:
       
   195         (1) It is responsible for retrieving a cnxset.
       
   196         (2) It must be prepared to release the cnxset if the
       
   197             `cnxsettracker.forget` call fails.
       
   198         (3) It should acquire the tracker lock until the very end of the operation.
       
   199         (4) However It take care to lock the CnxSetTracker object after having
       
   200             retrieved the cnxset to prevent deadlock.
       
   201 
       
   202         A typical usage look like::
       
   203 
       
   204         cnxset = repo._get_cnxset() # (1)
       
   205         try:
       
   206             with cnxset_tracker: # (3) and (4)
       
   207                 cnxset_tracker.record(caller.id, cnxset)
       
   208                 # (3') operation ends when caller is in expected state only
       
   209                 caller.cnxset = cnxset
       
   210         except Exception:
       
   211             repo._free_cnxset(cnxset) # (2)
       
   212             raise
       
   213         """
       
   214         # dubious since the caller is suppose to have acquired it anyway.
       
   215         with self._condition:
       
   216             if not self._active:
       
   217                 raise SessionClosedError('Closed')
       
   218             old = self._record.get(txid)
       
   219             if old is not None:
       
   220                 raise ValueError('"%s" already have a cnx_set (%r)'
       
   221                                  % (txid, old))
       
   222             self._record[txid] = cnxset
       
   223 
       
   224     def forget(self, txid, cnxset):
       
   225         """Inform the tracker that a txid have release a cnxset
       
   226 
       
   227         This methode is to be used by Transaction object.
       
   228 
       
   229         This method fails when:
       
   230         - The cnxset for the txid does not match the recorded one.
       
   231 
       
   232         Notes about the caller:
       
   233         (1) It is responsible for releasing the cnxset.
       
   234         (2) It should acquire the tracker lock during the operation to ensure
       
   235             the internal tracker state is always accurate regarding its own state.
       
   236 
       
   237         A typical usage look like::
       
   238 
       
   239         cnxset = caller.cnxset
       
   240         try:
       
   241             with cnxset_tracker:
       
   242                 # (2) you can not have caller.cnxset out of sync with
       
   243                 #     cnxset_tracker state while unlocked
       
   244                 caller.cnxset = None
       
   245                 cnxset_tracker.forget(caller.id, cnxset)
       
   246         finally:
       
   247             cnxset = repo._free_cnxset(cnxset) # (1)
       
   248         """
       
   249         with self._condition:
       
   250             old = self._record.get(txid, None)
       
   251             if old is not cnxset:
       
   252                 raise ValueError('recorded cnxset for "%s" mismatch: %r != %r'
       
   253                                  % (txid, old, cnxset))
       
   254             self._record.pop(txid)
       
   255             self._condition.notify_all()
       
   256 
       
   257     def close(self):
       
   258         """Marks the tracker as inactive.
       
   259 
       
   260         This methode is to be used by Session object.
       
   261 
       
   262         Inactive tracker does not accept new record anymore.
       
   263         """
       
   264         with self._condition:
       
   265             self._active = False
       
   266 
       
   267     def wait(self, timeout=10):
       
   268         """Wait for all recorded cnxset to be released
       
   269 
       
   270         This methode is to be used by Session object.
       
   271 
       
   272         returns a tuple of transaction id that remains open.
       
   273         """
       
   274         with self._condition:
       
   275             if  self._active:
       
   276                 raise RuntimeError('Cannot wait on active tracker.'
       
   277                                    ' Call tracker.close() first')
       
   278             while self._record and timeout > 0:
       
   279                 start = time()
       
   280                 self._condition.wait(timeout)
       
   281                 timeout -= time() - start
       
   282             return tuple(self._record)
       
   283 
   148 class Transaction(object):
   284 class Transaction(object):
   149     """Repository Transaction
   285     """Repository Transaction
   150 
   286 
   151     Holds all transaction related data
   287     Holds all transaction related data
   152 
   288 
   197       :attr:`read_security` and :attr:`write_security`, boolean flags telling if
   333       :attr:`read_security` and :attr:`write_security`, boolean flags telling if
   198       read/write security is currently activated.
   334       read/write security is currently activated.
   199 
   335 
   200     """
   336     """
   201 
   337 
   202     def __init__(self, txid, mode, rewriter):
   338     def __init__(self, txid, cnxset_tracker, mode, rewriter):
   203         #: transaction unique id
   339         #: transaction unique id
   204         self.transactionid = txid
   340         self.transactionid = txid
   205         #: reentrance handling
   341         #: reentrance handling
   206         self.ctx_count = 0
   342         self.ctx_count = 0
   207 
   343 
   208         #: connection handling mode
   344         #: connection handling mode
   209         self.mode = mode
   345         self.mode = mode
   210         #: connection set used to execute queries on sources
   346         #: connection set used to execute queries on sources
   211         self.cnxset = None
   347         self._cnxset = None
       
   348         #: CnxSetTracker used to report cnxset usage
       
   349         self._cnxset_tracker = cnxset_tracker
   212         #: is this transaction from a client or internal to the repo
   350         #: is this transaction from a client or internal to the repo
   213         self.running_dbapi_query = True
   351         self.running_dbapi_query = True
   214 
   352 
   215         #: dict containing arbitrary data cleared at the end of the transaction
   353         #: dict containing arbitrary data cleared at the end of the transaction
   216         self.data = {}
   354         self.data = {}
   244         #: ordered list of operations to be processed on commit/rollback
   382         #: ordered list of operations to be processed on commit/rollback
   245         self.pending_operations = []
   383         self.pending_operations = []
   246         #: (None, 'precommit', 'postcommit', 'uncommitable')
   384         #: (None, 'precommit', 'postcommit', 'uncommitable')
   247         self.commit_state = None
   385         self.commit_state = None
   248         self.pruned_hooks_cache = {}
   386         self.pruned_hooks_cache = {}
       
   387     # Connection Set Management ###############################################
       
   388     @property
       
   389     def cnxset(self):
       
   390         return self._cnxset
       
   391 
       
   392     @cnxset.setter
       
   393     def cnxset(self, new_cnxset):
       
   394         with self._cnxset_tracker:
       
   395             old_cnxset = self._cnxset
       
   396             if new_cnxset is old_cnxset:
       
   397                 return #nothing to do
       
   398             if old_cnxset is not None:
       
   399                 self._cnxset = None
       
   400                 self.ctx_count -= 1
       
   401                 self._cnxset_tracker.forget(self.transactionid, old_cnxset)
       
   402             if new_cnxset is not None:
       
   403                 self._cnxset_tracker.record(self.transactionid, new_cnxset)
       
   404                 self._cnxset = new_cnxset
       
   405                 self.ctx_count += 1
   249 
   406 
   250     # Entity cache management #################################################
   407     # Entity cache management #################################################
   251     #
   408     #
   252     # The transaction entity cache as held in tx.data it is removed at end the
   409     # The transaction entity cache as held in tx.data it is removed at end the
   253     # end of the transaction (commit and rollback)
   410     # end of the transaction (commit and rollback)
   437 
   594 
   438       :attr:`__threaddata` is a thread local storage whose `tx` attribute
   595       :attr:`__threaddata` is a thread local storage whose `tx` attribute
   439       refers to the proper instance of :class:`Transaction` according to the
   596       refers to the proper instance of :class:`Transaction` according to the
   440       transaction.
   597       transaction.
   441 
   598 
   442       :attr:`_threads_in_transaction` is a set of (thread, connections set)
       
   443       referencing threads that currently hold a connections set for the session.
       
   444     .. automethod:: cubicweb.server.session.transaction
       
   445 
       
   446     You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`,
   599     You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`,
   447     simply access transaction data transparently through the :attr:`_tx`
   600     simply access transaction data transparently through the :attr:`_tx`
   448     property. Also, you usually don't have to access it directly since current
   601     property. Also, you usually don't have to access it directly since current
   449     transaction's data may be accessed/modified through properties / methods:
   602     transaction's data may be accessed/modified through properties / methods:
   450 
   603 
   554         ### internals
   707         ### internals
   555         # Transaction of this section
   708         # Transaction of this section
   556         self._txs = {}
   709         self._txs = {}
   557         # Data local to the thread
   710         # Data local to the thread
   558         self.__threaddata = threading.local()
   711         self.__threaddata = threading.local()
   559         self._threads_in_transaction = set()
   712         self._cnxset_tracker = CnxSetTracker()
   560         self._closed = False
   713         self._closed = False
   561         self._lock = threading.RLock()
   714         self._lock = threading.RLock()
   562 
   715 
   563     def __unicode__(self):
   716     def __unicode__(self):
   564         return '<session %s (%s 0x%x)>' % (
   717         return '<session %s (%s 0x%x)>' % (
   571         with self._lock: # no transaction exist with the same id
   724         with self._lock: # no transaction exist with the same id
   572             try:
   725             try:
   573                 tx = self._txs[txid]
   726                 tx = self._txs[txid]
   574             except KeyError:
   727             except KeyError:
   575                 rewriter = RQLRewriter(self)
   728                 rewriter = RQLRewriter(self)
   576                 tx = Transaction(txid, self.default_mode, rewriter)
   729                 tx = Transaction(txid, self._cnxset_tracker, self.default_mode,
       
   730                                  rewriter)
   577                 self._txs[txid] = tx
   731                 self._txs[txid] = tx
   578         return tx
   732         return tx
   579 
   733 
   580     def set_tx(self, txid=None):
   734     def set_tx(self, txid=None):
   581         """set the default transaction of the current thread to <txid>
   735         """set the default transaction of the current thread to <txid>
   611     def hijack_user(self, user):
   765     def hijack_user(self, user):
   612         """return a fake request/session using specified user"""
   766         """return a fake request/session using specified user"""
   613         session = Session(user, self.repo)
   767         session = Session(user, self.repo)
   614         tx = session._tx
   768         tx = session._tx
   615         tx.cnxset = self.cnxset
   769         tx.cnxset = self.cnxset
   616         # we attributed a connections set, need to update ctx_count else it will be freed
       
   617         # while undesired
       
   618         tx.ctx_count = 1
       
   619         # share pending_operations, else operation added in the hi-jacked
   770         # share pending_operations, else operation added in the hi-jacked
   620         # session such as SendMailOp won't ever be processed
   771         # session such as SendMailOp won't ever be processed
   621         tx.pending_operations = self.pending_operations
   772         tx.pending_operations = self.pending_operations
   622         # everything in tx.data should be copied back but the entity
   773         # everything in tx.data should be copied back but the entity
   623         # type cache we don't want to avoid security pb
   774         # type cache we don't want to avoid security pb
   899             raise Exception('try to access connections set on a closed session %s' % self.id)
  1050             raise Exception('try to access connections set on a closed session %s' % self.id)
   900         return self._tx.cnxset
  1051         return self._tx.cnxset
   901 
  1052 
   902     def set_cnxset(self):
  1053     def set_cnxset(self):
   903         """the session need a connections set to execute some queries"""
  1054         """the session need a connections set to execute some queries"""
   904         with self._lock:
  1055         with self._lock: # can probably be removed
   905             if self._closed:
  1056             if self._closed:
   906                 self.free_cnxset(True)
  1057                 self.free_cnxset(True)
   907                 raise Exception('try to set connections set on a closed session %s' % self.id)
  1058                 raise Exception('try to set connections set on a closed session %s' % self.id)
   908             if self.cnxset is None:
  1059             tx = self._tx
   909                 # get connections set first to avoid race-condition
  1060             if tx.cnxset is None:
   910                 self._tx.cnxset = cnxset = self.repo._get_cnxset()
  1061                 cnxset = self.repo._get_cnxset()
   911                 self._tx.ctx_count += 1
       
   912                 try:
  1062                 try:
   913                     cnxset.cnxset_set()
  1063                     self._tx.cnxset = cnxset
   914                 except Exception:
  1064                     try:
   915                     self._tx.cnxset = None
  1065                         cnxset.cnxset_set()
       
  1066                     except:
       
  1067                         self._tx.cnxset = None
       
  1068                         raise
       
  1069                 except:
   916                     self.repo._free_cnxset(cnxset)
  1070                     self.repo._free_cnxset(cnxset)
   917                     raise
  1071                     raise
   918                 self._threads_in_transaction.add(
  1072         return tx.cnxset
   919                     (threading.currentThread(), cnxset) )
       
   920             return self._tx.cnxset
       
   921 
       
   922     def _free_thread_cnxset(self, thread, cnxset, force_close=False):
       
   923         try:
       
   924             self._threads_in_transaction.remove( (thread, cnxset) )
       
   925         except KeyError:
       
   926             # race condition on cnxset freeing (freed by commit or rollback vs
       
   927             # close)
       
   928             pass
       
   929         else:
       
   930             if force_close:
       
   931                 cnxset.reconnect()
       
   932             else:
       
   933                 cnxset.cnxset_freed()
       
   934             # free cnxset once everything is done to avoid race-condition
       
   935             self.repo._free_cnxset(cnxset)
       
   936 
  1073 
   937     def free_cnxset(self, ignoremode=False):
  1074     def free_cnxset(self, ignoremode=False):
   938         """the session is no longer using its connections set, at least for some time"""
  1075         """the session is no longer using its connections set, at least for some time"""
   939         # cnxset may be none if no operation has been done since last commit
  1076         # cnxset may be none if no operation has been done since last commit
   940         # or rollback
  1077         # or rollback
   941         cnxset = self._tx.cnxset
  1078         tx = self._tx
       
  1079         cnxset = tx.cnxset
   942         if cnxset is not None and (ignoremode or self.mode == 'read'):
  1080         if cnxset is not None and (ignoremode or self.mode == 'read'):
   943             # even in read mode, we must release the current transaction
  1081             try:
   944             self._free_thread_cnxset(threading.currentThread(), cnxset)
  1082                 tx.cnxset = None
   945             self._tx.cnxset = None
  1083             finally:
   946             self._tx.ctx_count -= 1
  1084                 cnxset.cnxset_freed()
       
  1085                 self.repo._free_cnxset(cnxset)
   947 
  1086 
   948     def _touch(self):
  1087     def _touch(self):
   949         """update latest session usage timestamp and reset mode to read"""
  1088         """update latest session usage timestamp and reset mode to read"""
   950         self.timestamp = time()
  1089         self.timestamp = time()
   951         self.local_perm_cache.clear() # XXX simply move in tx.data, no?
  1090         self.local_perm_cache.clear() # XXX simply move in tx.data, no?
  1172             if free_cnxset:
  1311             if free_cnxset:
  1173                 self.free_cnxset(ignoremode=True)
  1312                 self.free_cnxset(ignoremode=True)
  1174             self._clear_thread_data(free_cnxset)
  1313             self._clear_thread_data(free_cnxset)
  1175 
  1314 
  1176     def close(self):
  1315     def close(self):
  1177         """do not close connections set on session close, since they are shared now"""
  1316         # do not close connections set on session close, since they are shared now
       
  1317         tracker = self._cnxset_tracker
  1178         with self._lock:
  1318         with self._lock:
  1179             self._closed = True
  1319             self._closed = True
  1180         # copy since _threads_in_transaction maybe modified while waiting
  1320         tracker.close()
  1181         for thread, cnxset in self._threads_in_transaction.copy():
       
  1182             if thread is threading.currentThread():
       
  1183                 continue
       
  1184             self.info('waiting for thread %s', thread)
       
  1185             # do this loop/break instead of a simple join(10) in case thread is
       
  1186             # the main thread (in which case it will be removed from
       
  1187             # self._threads_in_transaction but still be alive...)
       
  1188             for i in xrange(10):
       
  1189                 thread.join(1)
       
  1190                 if not (thread.isAlive() and
       
  1191                         (thread, cnxset) in self._threads_in_transaction):
       
  1192                     break
       
  1193             else:
       
  1194                 self.error('thread %s still alive after 10 seconds, will close '
       
  1195                            'session anyway', thread)
       
  1196                 self._free_thread_cnxset(thread, cnxset, force_close=True)
       
  1197         self.rollback()
  1321         self.rollback()
       
  1322         self.info('waiting for open transaction of session: %s', self)
       
  1323         timeout = 10
       
  1324         pendings = tracker.wait(timeout)
       
  1325         if pendings:
       
  1326             self.error('%i transaction still alive after 10 seconds, will close '
       
  1327                        'session anyway', len(pendings))
       
  1328             for txid in pendings:
       
  1329                 tx = self._txs.get(txid)
       
  1330                 if tx is not None:
       
  1331                     # drop tx.cnxset
       
  1332                     with tracker:
       
  1333                         try:
       
  1334                             cnxset = tx.cnxset
       
  1335                             if cnxset is None:
       
  1336                                 continue
       
  1337                             tx.cnxset = None
       
  1338                         except RuntimeError:
       
  1339                             msg = 'issue while force free of cnxset in %s'
       
  1340                             self.error(msg, tx)
       
  1341                     # cnxset.reconnect() do an hard reset of the cnxset
       
  1342                     # it force it to be freed
       
  1343                     cnxset.reconnect()
       
  1344                     self.repo._free_cnxset(cnxset)
  1198         del self.__threaddata
  1345         del self.__threaddata
  1199         del self._txs
  1346         del self._txs
  1200 
  1347 
  1201     @property
  1348     @property
  1202     def closed(self):
  1349     def closed(self):