312 try: |
312 try: |
313 self._threads_in_transaction.remove(threading.currentThread()) |
313 self._threads_in_transaction.remove(threading.currentThread()) |
314 except KeyError: |
314 except KeyError: |
315 pass |
315 pass |
316 pool.pool_reset() |
316 pool.pool_reset() |
317 self._threaddata.pool = None |
317 del self._threaddata.pool |
318 # free pool once everything is done to avoid race-condition |
318 # free pool once everything is done to avoid race-condition |
319 self.repo._free_pool(pool) |
319 self.repo._free_pool(pool) |
320 |
320 |
321 def _touch(self): |
321 def _touch(self): |
322 """update latest session usage timestamp and reset mode to read""" |
322 """update latest session usage timestamp and reset mode to read""" |
408 return rset |
408 return rset |
409 |
409 |
410 @property |
410 @property |
411 def super_session(self): |
411 def super_session(self): |
412 try: |
412 try: |
413 csession = self._threaddata.childsession |
413 csession = self.childsession |
414 except AttributeError: |
414 except AttributeError: |
415 if isinstance(self, (ChildSession, InternalSession)): |
415 if isinstance(self, (ChildSession, InternalSession)): |
416 csession = self |
416 csession = self |
417 else: |
417 else: |
418 csession = ChildSession(self) |
418 csession = ChildSession(self) |
419 self._threaddata.childsession = csession |
419 self.childsession = csession |
420 # need shared pool set |
420 # need shared pool set |
421 self.set_pool(checkclosed=False) |
421 self.set_pool(checkclosed=False) |
422 return csession |
422 return csession |
423 |
423 |
424 def unsafe_execute(self, rql, kwargs=None, eid_key=None, build_descr=True, |
424 def unsafe_execute(self, rql, kwargs=None, eid_key=None, build_descr=True, |
441 false |
441 false |
442 """ |
442 """ |
443 rset = self._execute(self, rql, kwargs, eid_key, build_descr) |
443 rset = self._execute(self, rql, kwargs, eid_key, build_descr) |
444 return self.decorate_rset(rset, propagate) |
444 return self.decorate_rset(rset, propagate) |
445 |
445 |
|
446 def _clear_thread_data(self): |
|
447 """remove everything from the thread local storage, except pool |
|
448 which is explicitly removed by reset_pool, and mode which is set anyway |
|
449 by _touch |
|
450 """ |
|
451 store = self._threaddata |
|
452 for name in ('commit_state', 'transaction_data', 'pending_operations', |
|
453 '_rewriter'): |
|
454 try: |
|
455 delattr(store, name) |
|
456 except AttributeError: |
|
457 pass |
|
458 |
446 def commit(self, reset_pool=True): |
459 def commit(self, reset_pool=True): |
447 """commit the current session's transaction""" |
460 """commit the current session's transaction""" |
448 if self.pool is None: |
461 if self.pool is None: |
449 assert not self.pending_operations |
462 assert not self.pending_operations |
450 self.transaction_data.clear() |
463 self._clear_thread_data() |
451 self._touch() |
464 self._touch() |
452 self.debug('commit session %s done (no db activity)', self.id) |
465 self.debug('commit session %s done (no db activity)', self.id) |
453 return |
466 return |
454 if self.commit_state: |
467 if self.commit_state: |
455 return |
468 return |
499 except: |
512 except: |
500 self.critical('error while %sing', trstate, |
513 self.critical('error while %sing', trstate, |
501 exc_info=sys.exc_info()) |
514 exc_info=sys.exc_info()) |
502 self.info('%s session %s done', trstate, self.id) |
515 self.info('%s session %s done', trstate, self.id) |
503 finally: |
516 finally: |
|
517 self._clear_thread_data() |
504 self._touch() |
518 self._touch() |
505 self.commit_state = None |
|
506 self.pending_operations[:] = [] |
|
507 self.transaction_data.clear() |
|
508 if reset_pool: |
519 if reset_pool: |
509 self.reset_pool(ignoremode=True) |
520 self.reset_pool(ignoremode=True) |
510 |
521 |
511 def rollback(self, reset_pool=True): |
522 def rollback(self, reset_pool=True): |
512 """rollback the current session's transaction""" |
523 """rollback the current session's transaction""" |
513 if self.pool is None: |
524 if self.pool is None: |
514 assert not self.pending_operations |
525 assert not self.pending_operations |
515 self.transaction_data.clear() |
526 self._clear_thread_data() |
516 self._touch() |
527 self._touch() |
517 self.debug('rollback session %s done (no db activity)', self.id) |
528 self.debug('rollback session %s done (no db activity)', self.id) |
518 return |
529 return |
519 try: |
530 try: |
520 while self.pending_operations: |
531 while self.pending_operations: |
525 self.critical('rollback error', exc_info=sys.exc_info()) |
536 self.critical('rollback error', exc_info=sys.exc_info()) |
526 continue |
537 continue |
527 self.pool.rollback() |
538 self.pool.rollback() |
528 self.debug('rollback for session %s done', self.id) |
539 self.debug('rollback for session %s done', self.id) |
529 finally: |
540 finally: |
|
541 self._clear_thread_data() |
530 self._touch() |
542 self._touch() |
531 self.pending_operations[:] = [] |
|
532 self.transaction_data.clear() |
|
533 if reset_pool: |
543 if reset_pool: |
534 self.reset_pool(ignoremode=True) |
544 self.reset_pool(ignoremode=True) |
535 |
545 |
536 def close(self): |
546 def close(self): |
537 """do not close pool on session close, since they are shared now""" |
547 """do not close pool on session close, since they are shared now""" |
582 |
592 |
583 # querier helpers ######################################################### |
593 # querier helpers ######################################################### |
584 |
594 |
585 @property |
595 @property |
586 def rql_rewriter(self): |
596 def rql_rewriter(self): |
|
597 # in thread local storage since the rewriter isn't thread safe |
587 try: |
598 try: |
588 return self._threaddata._rewriter |
599 return self._threaddata._rewriter |
589 except AttributeError: |
600 except AttributeError: |
590 self._threaddata._rewriter = RQLRewriter(self) |
601 self._threaddata._rewriter = RQLRewriter(self) |
591 return self._threaddata._rewriter |
602 return self._threaddata._rewriter |