server/session.py
branchstable
changeset 1880 293fe4b49e28
parent 1660 d1030dd9730b
child 1977 606923dff11b
equal deleted inserted replaced
1879:cb3466e08d81 1880:293fe4b49e28
    63         # and the rql server
    63         # and the rql server
    64         self.data = {}
    64         self.data = {}
    65         # i18n initialization
    65         # i18n initialization
    66         self.set_language(cnxprops.lang)
    66         self.set_language(cnxprops.lang)
    67         self._threaddata = threading.local()
    67         self._threaddata = threading.local()
       
    68         self._threads_in_transaction = set()
       
    69         self._closed = False
    68 
    70 
    69     def get_mode(self):
    71     def get_mode(self):
    70         return getattr(self._threaddata, 'mode', 'read')
    72         return getattr(self._threaddata, 'mode', 'read')
    71     def set_mode(self, value):
    73     def set_mode(self, value):
    72         self._threaddata.mode = value
    74         self._threaddata.mode = value
   148         self.local_perm_cache.clear()
   150         self.local_perm_cache.clear()
   149         self._threaddata.mode = 'read'
   151         self._threaddata.mode = 'read'
   150 
   152 
   151     def set_pool(self):
   153     def set_pool(self):
   152         """the session need a pool to execute some queries"""
   154         """the session need a pool to execute some queries"""
       
   155         if self._closed:
       
   156             raise Exception('try to set pool on a closed session')
   153         if self.pool is None:
   157         if self.pool is None:
   154             self._threaddata.pool = self.repo._get_pool()
   158             self._threaddata.pool = self.repo._get_pool()
   155             try:
   159             try:
   156                 self._threaddata.pool.pool_set(self)
   160                 self._threaddata.pool.pool_set(self)
   157             except:
   161             except:
   158                 self.repo._free_pool(self.pool)
   162                 self.repo._free_pool(self.pool)
   159                 self._threaddata.pool = None
   163                 self._threaddata.pool = None
   160                 raise
   164                 raise
       
   165             self._threads_in_transaction.add(threading.currentThread())
   161         return self._threaddata.pool
   166         return self._threaddata.pool
   162 
   167 
   163     def reset_pool(self):
   168     def reset_pool(self):
   164         """the session has no longer using its pool, at least for some time
   169         """the session has no longer using its pool, at least for some time
   165         """
   170         """
   166         # pool may be none if no operation has been done since last commit
   171         # pool may be none if no operation has been done since last commit
   167         # or rollback
   172         # or rollback
   168         if self.pool is not None and self.mode == 'read':
   173         if self.pool is not None and self.mode == 'read':
   169             # even in read mode, we must release the current transaction
   174             # even in read mode, we must release the current transaction
       
   175             self._threads_in_transaction.remove(threading.currentThread())
   170             self.repo._free_pool(self.pool)
   176             self.repo._free_pool(self.pool)
   171             self.pool.pool_reset(self)
   177             self.pool.pool_reset(self)
   172             self._threaddata.pool = None
   178             self._threaddata.pool = None
   173 
   179 
   174     def system_sql(self, sql, args=None):
   180     def system_sql(self, sql, args=None):
   341             if reset_pool:
   347             if reset_pool:
   342                 self.reset_pool()
   348                 self.reset_pool()
   343 
   349 
   344     def close(self):
   350     def close(self):
   345         """do not close pool on session close, since they are shared now"""
   351         """do not close pool on session close, since they are shared now"""
       
   352         self._closed = True
       
   353         # copy since _threads_in_transaction maybe modified while waiting
       
   354         for thread in self._threads_in_transaction.copy():
       
   355             if thread is threading.currentThread():
       
   356                 continue
       
   357             self.info('waiting for thread %s', thread)
       
   358             # do this loop/break instead of a simple join(10) in case thread is
       
   359             # the main thread (in which case it will be removed from
       
   360             # self._threads_in_transaction but still be alive...)
       
   361             for i in xrange(10):
       
   362                 thread.join(1)
       
   363                 if not (thread.isAlive() and
       
   364                         thread in self._threads_in_transaction):
       
   365                     break
       
   366             else:
       
   367                 self.error('thread %s still alive after 10 seconds, will close '
       
   368                            'session anyway', thread)
   346         self.rollback()
   369         self.rollback()
   347 
   370 
   348     # transaction data/operations management ##################################
   371     # transaction data/operations management ##################################
   349 
   372 
   350     def add_query_data(self, key, value):
   373     def add_query_data(self, key, value):