cubicweb/server/repository.py
changeset 11931 ad7796dabeaa
parent 11908 7904fe436e82
child 11934 ec69125c03a8
equal deleted inserted replaced
11930:83a921bae21c 11931:ad7796dabeaa
   147         pass
   147         pass
   148 
   148 
   149     def stop(self):
   149     def stop(self):
   150         pass
   150         pass
   151 
   151 
       
   152 class _CnxSetPool(object):
       
   153 
       
   154     def __init__(self, source, size):
       
   155         self._cnxsets = []
       
   156         self._queue = queue.Queue()
       
   157         for i in range(size):
       
   158             cnxset = source.wrapped_connection()
       
   159             self._cnxsets.append(cnxset)
       
   160             self._queue.put_nowait(cnxset)
       
   161         super(_CnxSetPool, self).__init__()
       
   162 
       
   163     def qsize(self):
       
   164         return self._queue.qsize()
       
   165 
       
   166     def get(self):
       
   167         try:
       
   168             return self._queue.get(True, timeout=5)
       
   169         except queue.Empty:
       
   170             raise Exception('no connections set available after 5 secs, probably either a '
       
   171                             'bug in code (too many uncommited/rolled back '
       
   172                             'connections) or too much load on the server (in '
       
   173                             'which case you can try to set a bigger '
       
   174                             'connections pool size)')
       
   175 
       
   176     def release(self, cnxset):
       
   177         self._queue.put_nowait(cnxset)
       
   178 
       
   179     def __iter__(self):
       
   180         for cnxset in self._cnxsets:
       
   181             yield cnxset
       
   182 
       
   183     def close(self):
       
   184         q = self._queue
       
   185         while not q.empty():
       
   186             cnxset = q.get_nowait()
       
   187             try:
       
   188                 cnxset.close(True)
       
   189             except Exception:
       
   190                 self.exception('error while closing %s' % cnxset)
       
   191 
   152 
   192 
   153 class Repository(object):
   193 class Repository(object):
   154     """a repository provides access to a set of persistent storages for
   194     """a repository provides access to a set of persistent storages for
   155     entities and relations
   195     entities and relations
   156     """
   196     """
   206         config = self.config
   246         config = self.config
   207         # copy pool size here since config.init_cube() and config.load_schema()
   247         # copy pool size here since config.init_cube() and config.load_schema()
   208         # reload configuration from file and could reset a manually set pool
   248         # reload configuration from file and could reset a manually set pool
   209         # size.
   249         # size.
   210         pool_size = config['connections-pool-size']
   250         pool_size = config['connections-pool-size']
   211         self._cnxsets_pool = queue.Queue()
   251         # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from
   212         # 0. init a cnxset that will be used to fetch bootstrap information from
       
   213         #    the database
   252         #    the database
   214         self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection())
   253         self.cnxsets = _CnxSetPool(self.system_source, 1)
   215         # 1. set used cubes
   254         # 1. set used cubes
   216         if config.creating or not config.read_instance_schema:
   255         if config.creating or not config.read_instance_schema:
   217             config.bootstrap_cubes()
   256             config.bootstrap_cubes()
   218         else:
   257         else:
   219             self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   258             self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   257             self.init_sources_from_database()
   296             self.init_sources_from_database()
   258             if 'CWProperty' in self.schema:
   297             if 'CWProperty' in self.schema:
   259                 self.vreg.init_properties(self.properties())
   298                 self.vreg.init_properties(self.properties())
   260         # 4. close initialization connection set and reopen fresh ones for
   299         # 4. close initialization connection set and reopen fresh ones for
   261         #    proper initialization
   300         #    proper initialization
   262         self._get_cnxset().close(True)
   301         self.cnxsets.close()
   263         # list of available cnxsets (can't iterate on a Queue)
   302         self.cnxsets = _CnxSetPool(self.system_source, pool_size)
   264         self.cnxsets = []
       
   265         for i in range(pool_size):
       
   266             self.cnxsets.append(self.system_source.wrapped_connection())
       
   267             self._cnxsets_pool.put_nowait(self.cnxsets[-1])
       
   268 
   303 
   269     # internals ###############################################################
   304     # internals ###############################################################
   270 
   305 
   271     def init_sources_from_database(self):
   306     def init_sources_from_database(self):
   272         if self.config.quick_start or 'CWSource' not in self.schema:  # 3.10 migration
   307         if self.config.quick_start or 'CWSource' not in self.schema:  # 3.10 migration
   394 
   429 
   395     def threaded_task(self, func):
   430     def threaded_task(self, func):
   396         """start function in a separated thread"""
   431         """start function in a separated thread"""
   397         utils.RepoThread(func, self._running_threads).start()
   432         utils.RepoThread(func, self._running_threads).start()
   398 
   433 
   399     def _get_cnxset(self):
       
   400         try:
       
   401             return self._cnxsets_pool.get(True, timeout=5)
       
   402         except queue.Empty:
       
   403             raise Exception('no connections set available after 5 secs, probably either a '
       
   404                             'bug in code (too many uncommited/rolled back '
       
   405                             'connections) or too much load on the server (in '
       
   406                             'which case you can try to set a bigger '
       
   407                             'connections pool size)')
       
   408 
       
   409     def _free_cnxset(self, cnxset):
       
   410         self._cnxsets_pool.put_nowait(cnxset)
       
   411 
       
   412     def shutdown(self):
   434     def shutdown(self):
   413         """called on server stop event to properly close opened sessions and
   435         """called on server stop event to properly close opened sessions and
   414         connections
   436         connections
   415         """
   437         """
   416         assert not self.shutting_down, 'already shutting down'
   438         assert not self.shutting_down, 'already shutting down'
   428         for thread in self._running_threads:
   450         for thread in self._running_threads:
   429             self.info('waiting thread %s...', thread.getName())
   451             self.info('waiting thread %s...', thread.getName())
   430             thread.join()
   452             thread.join()
   431             self.info('thread %s finished', thread.getName())
   453             self.info('thread %s finished', thread.getName())
   432         self.close_sessions()
   454         self.close_sessions()
   433         while not self._cnxsets_pool.empty():
   455         self.cnxsets.close()
   434             cnxset = self._cnxsets_pool.get_nowait()
       
   435             try:
       
   436                 cnxset.close(True)
       
   437             except Exception:
       
   438                 self.exception('error while closing %s' % cnxset)
       
   439                 continue
       
   440         hits, misses = self.querier.cache_hit, self.querier.cache_miss
   456         hits, misses = self.querier.cache_hit, self.querier.cache_miss
   441         try:
   457         try:
   442             self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
   458             self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
   443                       (hits * 100) / (hits + misses))
   459                       (hits * 100) / (hits + misses))
   444             hits, misses = self.system_source.cache_hit, self.system_source.cache_miss
   460             hits, misses = self.system_source.cache_hit, self.system_source.cache_miss