341 assert self.cleanup_session_time > 0 |
341 assert self.cleanup_session_time > 0 |
342 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
342 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
343 self.looping_task(cleanup_session_interval, self.clean_sessions) |
343 self.looping_task(cleanup_session_interval, self.clean_sessions) |
344 assert isinstance(self._looping_tasks, list), 'already started' |
344 assert isinstance(self._looping_tasks, list), 'already started' |
345 for i, (interval, func, args) in enumerate(self._looping_tasks): |
345 for i, (interval, func, args) in enumerate(self._looping_tasks): |
346 self._looping_tasks[i] = task = utils.LoopTask(interval, func, args) |
346 self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args) |
347 self.info('starting task %s with interval %.2fs', task.name, |
347 self.info('starting task %s with interval %.2fs', task.name, |
348 interval) |
348 interval) |
349 task.start() |
349 task.start() |
350 # ensure no tasks will be further added |
350 # ensure no tasks will be further added |
351 self._looping_tasks = tuple(self._looping_tasks) |
351 self._looping_tasks = tuple(self._looping_tasks) |
410 self.close_sessions() |
410 self.close_sessions() |
411 while not self._cnxsets_pool.empty(): |
411 while not self._cnxsets_pool.empty(): |
412 cnxset = self._cnxsets_pool.get_nowait() |
412 cnxset = self._cnxsets_pool.get_nowait() |
413 try: |
413 try: |
414 cnxset.close(True) |
414 cnxset.close(True) |
415 except: |
415 except Exception: |
416 self.exception('error while closing %s' % cnxset) |
416 self.exception('error while closing %s' % cnxset) |
417 continue |
417 continue |
418 if self.pyro_registered: |
418 if self.pyro_registered: |
419 if self._use_pyrons(): |
419 if self._use_pyrons(): |
420 pyro_unregister(self.config) |
420 pyro_unregister(self.config) |
789 session = self._get_session(sessionid) |
789 session = self._get_session(sessionid) |
790 session.set_tx_data(txid) |
790 session.set_tx_data(txid) |
791 return session.commit() |
791 return session.commit() |
792 except (ValidationError, Unauthorized): |
792 except (ValidationError, Unauthorized): |
793 raise |
793 raise |
794 except: |
794 except Exception: |
795 self.exception('unexpected error') |
795 self.exception('unexpected error') |
796 raise |
796 raise |
797 |
797 |
798 def rollback(self, sessionid, txid=None): |
798 def rollback(self, sessionid, txid=None): |
799 """commit transaction for the session with the given id""" |
799 """commit transaction for the session with the given id""" |
800 self.debug('begin rollback for session %s', sessionid) |
800 self.debug('begin rollback for session %s', sessionid) |
801 try: |
801 try: |
802 session = self._get_session(sessionid) |
802 session = self._get_session(sessionid) |
803 session.set_tx_data(txid) |
803 session.set_tx_data(txid) |
804 session.rollback() |
804 session.rollback() |
805 except: |
805 except Exception: |
806 self.exception('unexpected error') |
806 self.exception('unexpected error') |
807 raise |
807 raise |
808 |
808 |
809 def close(self, sessionid, txid=None, checkshuttingdown=True): |
809 def close(self, sessionid, txid=None, checkshuttingdown=True): |
810 """close the session with the given id""" |
810 """close the session with the given id""" |
903 def close_sessions(self): |
903 def close_sessions(self): |
904 """close every opened sessions""" |
904 """close every opened sessions""" |
905 for sessionid in self._sessions.keys(): |
905 for sessionid in self._sessions.keys(): |
906 try: |
906 try: |
907 self.close(sessionid, checkshuttingdown=False) |
907 self.close(sessionid, checkshuttingdown=False) |
908 except: |
908 except Exception: # XXX BaseException? |
909 self.exception('error while closing session %s' % sessionid) |
909 self.exception('error while closing session %s' % sessionid) |
910 |
910 |
911 def clean_sessions(self): |
911 def clean_sessions(self): |
912 """close sessions not used since an amount of time specified in the |
912 """close sessions not used since an amount of time specified in the |
913 configuration |
913 configuration |