143 |
143 |
144 HOOKS_ALLOW_ALL = object() |
144 HOOKS_ALLOW_ALL = object() |
145 HOOKS_DENY_ALL = object() |
145 HOOKS_DENY_ALL = object() |
146 DEFAULT_SECURITY = object() # evaluated to true by design |
146 DEFAULT_SECURITY = object() # evaluated to true by design |
147 |
147 |
|
148 class SessionClosedError(RuntimeError): |
|
149 pass |
|
150 |
|
151 class CnxSetTracker(object): |
|
152 """Keep track of which transaction use which cnxset. |
|
153 |
|
154 There should be one of this object per session plus one another for |
|
155 internal session. |
|
156 |
|
157 Session object are responsible of creating their CnxSetTracker object. |
|
158 |
|
159 Transaction should use the :meth:`record` and :meth:`forget` to inform the |
|
160 tracker of cnxset they have acquired. |
|
161 |
|
162 .. automethod:: cubicweb.server.session.CnxSetTracker.record |
|
163 .. automethod:: cubicweb.server.session.CnxSetTracker.forget |
|
164 |
|
165 Session use the :meth:`close` and :meth:`wait` method when closing. |
|
166 |
|
167 .. automethod:: cubicweb.server.session.CnxSetTracker.close |
|
168 .. automethod:: cubicweb.server.session.CnxSetTracker.wait |
|
169 |
|
170 This object itself is threadsafe. It also requires caller to acquired its |
|
171 lock in some situation. |
|
172 """ |
|
173 |
|
174 def __init__(self): |
|
175 self._active = True |
|
176 self._condition = threading.Condition() |
|
177 self._record = {} |
|
178 |
|
179 def __enter__(self): |
|
180 self._condition.__enter__() |
|
181 |
|
182 def __exit__(self, *args): |
|
183 self._condition.__exit__(*args) |
|
184 |
|
185 def record(self, txid, cnxset): |
|
186 """Inform the tracker that a txid have acquired a cnxset |
|
187 |
|
188 This methode is to be used by Transaction object. |
|
189 |
|
190 This method fails when: |
|
191 - The txid already have a recorded cnxset. |
|
192 - The tracker is not active anymore. |
|
193 |
|
194 Notes about the caller: |
|
195 (1) It is responsible for retrieving a cnxset. |
|
196 (2) It must be prepared to release the cnxset if the |
|
197 `cnxsettracker.forget` call fails. |
|
198 (3) It should acquire the tracker lock until the very end of the operation. |
|
199 (4) However It take care to lock the CnxSetTracker object after having |
|
200 retrieved the cnxset to prevent deadlock. |
|
201 |
|
202 A typical usage look like:: |
|
203 |
|
204 cnxset = repo._get_cnxset() # (1) |
|
205 try: |
|
206 with cnxset_tracker: # (3) and (4) |
|
207 cnxset_tracker.record(caller.id, cnxset) |
|
208 # (3') operation ends when caller is in expected state only |
|
209 caller.cnxset = cnxset |
|
210 except Exception: |
|
211 repo._free_cnxset(cnxset) # (2) |
|
212 raise |
|
213 """ |
|
214 # dubious since the caller is suppose to have acquired it anyway. |
|
215 with self._condition: |
|
216 if not self._active: |
|
217 raise SessionClosedError('Closed') |
|
218 old = self._record.get(txid) |
|
219 if old is not None: |
|
220 raise ValueError('"%s" already have a cnx_set (%r)' |
|
221 % (txid, old)) |
|
222 self._record[txid] = cnxset |
|
223 |
|
224 def forget(self, txid, cnxset): |
|
225 """Inform the tracker that a txid have release a cnxset |
|
226 |
|
227 This methode is to be used by Transaction object. |
|
228 |
|
229 This method fails when: |
|
230 - The cnxset for the txid does not match the recorded one. |
|
231 |
|
232 Notes about the caller: |
|
233 (1) It is responsible for releasing the cnxset. |
|
234 (2) It should acquire the tracker lock during the operation to ensure |
|
235 the internal tracker state is always accurate regarding its own state. |
|
236 |
|
237 A typical usage look like:: |
|
238 |
|
239 cnxset = caller.cnxset |
|
240 try: |
|
241 with cnxset_tracker: |
|
242 # (2) you can not have caller.cnxset out of sync with |
|
243 # cnxset_tracker state while unlocked |
|
244 caller.cnxset = None |
|
245 cnxset_tracker.forget(caller.id, cnxset) |
|
246 finally: |
|
247 cnxset = repo._free_cnxset(cnxset) # (1) |
|
248 """ |
|
249 with self._condition: |
|
250 old = self._record.get(txid, None) |
|
251 if old is not cnxset: |
|
252 raise ValueError('recorded cnxset for "%s" mismatch: %r != %r' |
|
253 % (txid, old, cnxset)) |
|
254 self._record.pop(txid) |
|
255 self._condition.notify_all() |
|
256 |
|
257 def close(self): |
|
258 """Marks the tracker as inactive. |
|
259 |
|
260 This methode is to be used by Session object. |
|
261 |
|
262 Inactive tracker does not accept new record anymore. |
|
263 """ |
|
264 with self._condition: |
|
265 self._active = False |
|
266 |
|
267 def wait(self, timeout=10): |
|
268 """Wait for all recorded cnxset to be released |
|
269 |
|
270 This methode is to be used by Session object. |
|
271 |
|
272 returns a tuple of transaction id that remains open. |
|
273 """ |
|
274 with self._condition: |
|
275 if self._active: |
|
276 raise RuntimeError('Cannot wait on active tracker.' |
|
277 ' Call tracker.close() first') |
|
278 while self._record and timeout > 0: |
|
279 start = time() |
|
280 self._condition.wait(timeout) |
|
281 timeout -= time() - start |
|
282 return tuple(self._record) |
|
283 |
148 class Transaction(object): |
284 class Transaction(object): |
149 """Repository Transaction |
285 """Repository Transaction |
150 |
286 |
151 Holds all transaction related data |
287 Holds all transaction related data |
152 |
288 |
197 :attr:`read_security` and :attr:`write_security`, boolean flags telling if |
333 :attr:`read_security` and :attr:`write_security`, boolean flags telling if |
198 read/write security is currently activated. |
334 read/write security is currently activated. |
199 |
335 |
200 """ |
336 """ |
201 |
337 |
202 def __init__(self, txid, mode, rewriter): |
338 def __init__(self, txid, cnxset_tracker, mode, rewriter): |
203 #: transaction unique id |
339 #: transaction unique id |
204 self.transactionid = txid |
340 self.transactionid = txid |
205 #: reentrance handling |
341 #: reentrance handling |
206 self.ctx_count = 0 |
342 self.ctx_count = 0 |
207 |
343 |
208 #: connection handling mode |
344 #: connection handling mode |
209 self.mode = mode |
345 self.mode = mode |
210 #: connection set used to execute queries on sources |
346 #: connection set used to execute queries on sources |
211 self.cnxset = None |
347 self._cnxset = None |
|
348 #: CnxSetTracker used to report cnxset usage |
|
349 self._cnxset_tracker = cnxset_tracker |
212 #: is this transaction from a client or internal to the repo |
350 #: is this transaction from a client or internal to the repo |
213 self.running_dbapi_query = True |
351 self.running_dbapi_query = True |
214 |
352 |
215 #: dict containing arbitrary data cleared at the end of the transaction |
353 #: dict containing arbitrary data cleared at the end of the transaction |
216 self.data = {} |
354 self.data = {} |
244 #: ordered list of operations to be processed on commit/rollback |
382 #: ordered list of operations to be processed on commit/rollback |
245 self.pending_operations = [] |
383 self.pending_operations = [] |
246 #: (None, 'precommit', 'postcommit', 'uncommitable') |
384 #: (None, 'precommit', 'postcommit', 'uncommitable') |
247 self.commit_state = None |
385 self.commit_state = None |
248 self.pruned_hooks_cache = {} |
386 self.pruned_hooks_cache = {} |
|
387 # Connection Set Management ############################################### |
|
388 @property |
|
389 def cnxset(self): |
|
390 return self._cnxset |
|
391 |
|
392 @cnxset.setter |
|
393 def cnxset(self, new_cnxset): |
|
394 with self._cnxset_tracker: |
|
395 old_cnxset = self._cnxset |
|
396 if new_cnxset is old_cnxset: |
|
397 return #nothing to do |
|
398 if old_cnxset is not None: |
|
399 self._cnxset = None |
|
400 self.ctx_count -= 1 |
|
401 self._cnxset_tracker.forget(self.transactionid, old_cnxset) |
|
402 if new_cnxset is not None: |
|
403 self._cnxset_tracker.record(self.transactionid, new_cnxset) |
|
404 self._cnxset = new_cnxset |
|
405 self.ctx_count += 1 |
249 |
406 |
250 # Entity cache management ################################################# |
407 # Entity cache management ################################################# |
251 # |
408 # |
252 # The transaction entity cache as held in tx.data it is removed at end the |
409 # The transaction entity cache as held in tx.data it is removed at end the |
253 # end of the transaction (commit and rollback) |
410 # end of the transaction (commit and rollback) |
437 |
594 |
438 :attr:`__threaddata` is a thread local storage whose `tx` attribute |
595 :attr:`__threaddata` is a thread local storage whose `tx` attribute |
439 refers to the proper instance of :class:`Transaction` according to the |
596 refers to the proper instance of :class:`Transaction` according to the |
440 transaction. |
597 transaction. |
441 |
598 |
442 :attr:`_threads_in_transaction` is a set of (thread, connections set) |
|
443 referencing threads that currently hold a connections set for the session. |
|
444 .. automethod:: cubicweb.server.session.transaction |
|
445 |
|
446 You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`, |
599 You should not have to use neither :attr:`_tx` nor :attr:`__threaddata`, |
447 simply access transaction data transparently through the :attr:`_tx` |
600 simply access transaction data transparently through the :attr:`_tx` |
448 property. Also, you usually don't have to access it directly since current |
601 property. Also, you usually don't have to access it directly since current |
449 transaction's data may be accessed/modified through properties / methods: |
602 transaction's data may be accessed/modified through properties / methods: |
450 |
603 |
611 def hijack_user(self, user): |
765 def hijack_user(self, user): |
612 """return a fake request/session using specified user""" |
766 """return a fake request/session using specified user""" |
613 session = Session(user, self.repo) |
767 session = Session(user, self.repo) |
614 tx = session._tx |
768 tx = session._tx |
615 tx.cnxset = self.cnxset |
769 tx.cnxset = self.cnxset |
616 # we attributed a connections set, need to update ctx_count else it will be freed |
|
617 # while undesired |
|
618 tx.ctx_count = 1 |
|
619 # share pending_operations, else operation added in the hi-jacked |
770 # share pending_operations, else operation added in the hi-jacked |
620 # session such as SendMailOp won't ever be processed |
771 # session such as SendMailOp won't ever be processed |
621 tx.pending_operations = self.pending_operations |
772 tx.pending_operations = self.pending_operations |
622 # everything in tx.data should be copied back but the entity |
773 # everything in tx.data should be copied back but the entity |
623 # type cache we don't want to avoid security pb |
774 # type cache we don't want to avoid security pb |
899 raise Exception('try to access connections set on a closed session %s' % self.id) |
1050 raise Exception('try to access connections set on a closed session %s' % self.id) |
900 return self._tx.cnxset |
1051 return self._tx.cnxset |
901 |
1052 |
902 def set_cnxset(self): |
1053 def set_cnxset(self): |
903 """the session need a connections set to execute some queries""" |
1054 """the session need a connections set to execute some queries""" |
904 with self._lock: |
1055 with self._lock: # can probably be removed |
905 if self._closed: |
1056 if self._closed: |
906 self.free_cnxset(True) |
1057 self.free_cnxset(True) |
907 raise Exception('try to set connections set on a closed session %s' % self.id) |
1058 raise Exception('try to set connections set on a closed session %s' % self.id) |
908 if self.cnxset is None: |
1059 tx = self._tx |
909 # get connections set first to avoid race-condition |
1060 if tx.cnxset is None: |
910 self._tx.cnxset = cnxset = self.repo._get_cnxset() |
1061 cnxset = self.repo._get_cnxset() |
911 self._tx.ctx_count += 1 |
|
912 try: |
1062 try: |
913 cnxset.cnxset_set() |
1063 self._tx.cnxset = cnxset |
914 except Exception: |
1064 try: |
915 self._tx.cnxset = None |
1065 cnxset.cnxset_set() |
|
1066 except: |
|
1067 self._tx.cnxset = None |
|
1068 raise |
|
1069 except: |
916 self.repo._free_cnxset(cnxset) |
1070 self.repo._free_cnxset(cnxset) |
917 raise |
1071 raise |
918 self._threads_in_transaction.add( |
1072 return tx.cnxset |
919 (threading.currentThread(), cnxset) ) |
|
920 return self._tx.cnxset |
|
921 |
|
922 def _free_thread_cnxset(self, thread, cnxset, force_close=False): |
|
923 try: |
|
924 self._threads_in_transaction.remove( (thread, cnxset) ) |
|
925 except KeyError: |
|
926 # race condition on cnxset freeing (freed by commit or rollback vs |
|
927 # close) |
|
928 pass |
|
929 else: |
|
930 if force_close: |
|
931 cnxset.reconnect() |
|
932 else: |
|
933 cnxset.cnxset_freed() |
|
934 # free cnxset once everything is done to avoid race-condition |
|
935 self.repo._free_cnxset(cnxset) |
|
936 |
1073 |
937 def free_cnxset(self, ignoremode=False): |
1074 def free_cnxset(self, ignoremode=False): |
938 """the session is no longer using its connections set, at least for some time""" |
1075 """the session is no longer using its connections set, at least for some time""" |
939 # cnxset may be none if no operation has been done since last commit |
1076 # cnxset may be none if no operation has been done since last commit |
940 # or rollback |
1077 # or rollback |
941 cnxset = self._tx.cnxset |
1078 tx = self._tx |
|
1079 cnxset = tx.cnxset |
942 if cnxset is not None and (ignoremode or self.mode == 'read'): |
1080 if cnxset is not None and (ignoremode or self.mode == 'read'): |
943 # even in read mode, we must release the current transaction |
1081 try: |
944 self._free_thread_cnxset(threading.currentThread(), cnxset) |
1082 tx.cnxset = None |
945 self._tx.cnxset = None |
1083 finally: |
946 self._tx.ctx_count -= 1 |
1084 cnxset.cnxset_freed() |
|
1085 self.repo._free_cnxset(cnxset) |
947 |
1086 |
948 def _touch(self): |
1087 def _touch(self): |
949 """update latest session usage timestamp and reset mode to read""" |
1088 """update latest session usage timestamp and reset mode to read""" |
950 self.timestamp = time() |
1089 self.timestamp = time() |
951 self.local_perm_cache.clear() # XXX simply move in tx.data, no? |
1090 self.local_perm_cache.clear() # XXX simply move in tx.data, no? |
1172 if free_cnxset: |
1311 if free_cnxset: |
1173 self.free_cnxset(ignoremode=True) |
1312 self.free_cnxset(ignoremode=True) |
1174 self._clear_thread_data(free_cnxset) |
1313 self._clear_thread_data(free_cnxset) |
1175 |
1314 |
1176 def close(self): |
1315 def close(self): |
1177 """do not close connections set on session close, since they are shared now""" |
1316 # do not close connections set on session close, since they are shared now |
|
1317 tracker = self._cnxset_tracker |
1178 with self._lock: |
1318 with self._lock: |
1179 self._closed = True |
1319 self._closed = True |
1180 # copy since _threads_in_transaction maybe modified while waiting |
1320 tracker.close() |
1181 for thread, cnxset in self._threads_in_transaction.copy(): |
|
1182 if thread is threading.currentThread(): |
|
1183 continue |
|
1184 self.info('waiting for thread %s', thread) |
|
1185 # do this loop/break instead of a simple join(10) in case thread is |
|
1186 # the main thread (in which case it will be removed from |
|
1187 # self._threads_in_transaction but still be alive...) |
|
1188 for i in xrange(10): |
|
1189 thread.join(1) |
|
1190 if not (thread.isAlive() and |
|
1191 (thread, cnxset) in self._threads_in_transaction): |
|
1192 break |
|
1193 else: |
|
1194 self.error('thread %s still alive after 10 seconds, will close ' |
|
1195 'session anyway', thread) |
|
1196 self._free_thread_cnxset(thread, cnxset, force_close=True) |
|
1197 self.rollback() |
1321 self.rollback() |
|
1322 self.info('waiting for open transaction of session: %s', self) |
|
1323 timeout = 10 |
|
1324 pendings = tracker.wait(timeout) |
|
1325 if pendings: |
|
1326 self.error('%i transaction still alive after 10 seconds, will close ' |
|
1327 'session anyway', len(pendings)) |
|
1328 for txid in pendings: |
|
1329 tx = self._txs.get(txid) |
|
1330 if tx is not None: |
|
1331 # drop tx.cnxset |
|
1332 with tracker: |
|
1333 try: |
|
1334 cnxset = tx.cnxset |
|
1335 if cnxset is None: |
|
1336 continue |
|
1337 tx.cnxset = None |
|
1338 except RuntimeError: |
|
1339 msg = 'issue while force free of cnxset in %s' |
|
1340 self.error(msg, tx) |
|
1341 # cnxset.reconnect() do an hard reset of the cnxset |
|
1342 # it force it to be freed |
|
1343 cnxset.reconnect() |
|
1344 self.repo._free_cnxset(cnxset) |
1198 del self.__threaddata |
1345 del self.__threaddata |
1199 del self._txs |
1346 del self._txs |
1200 |
1347 |
1201 @property |
1348 @property |
1202 def closed(self): |
1349 def closed(self): |