# HG changeset patch # User Philippe Pepiot # Date 1485535336 -3600 # Node ID ad7796dabeaa13b84305aba879691c8bc354dbce # Parent 83a921bae21c7b86e507666a09e562d8482ac4ea [repository] move cnxset pool handling to a helper class The class has responsability to handle connections pool operations in a single public attribute 'cnxsets'. On Repository _get_cnxset() and _free_cnxset() are replaced by cnxsets.get() and cnxsets.release(). Drop multiple access to private attributes and methods from outside of Repository. diff -r 83a921bae21c -r ad7796dabeaa cubicweb/server/repository.py --- a/cubicweb/server/repository.py Fri Jan 27 15:53:15 2017 +0100 +++ b/cubicweb/server/repository.py Fri Jan 27 17:42:16 2017 +0100 @@ -149,6 +149,46 @@ def stop(self): pass +class _CnxSetPool(object): + + def __init__(self, source, size): + self._cnxsets = [] + self._queue = queue.Queue() + for i in range(size): + cnxset = source.wrapped_connection() + self._cnxsets.append(cnxset) + self._queue.put_nowait(cnxset) + super(_CnxSetPool, self).__init__() + + def qsize(self): + return self._queue.qsize() + + def get(self): + try: + return self._queue.get(True, timeout=5) + except queue.Empty: + raise Exception('no connections set available after 5 secs, probably either a ' + 'bug in code (too many uncommited/rolled back ' + 'connections) or too much load on the server (in ' + 'which case you can try to set a bigger ' + 'connections pool size)') + + def release(self, cnxset): + self._queue.put_nowait(cnxset) + + def __iter__(self): + for cnxset in self._cnxsets: + yield cnxset + + def close(self): + q = self._queue + while not q.empty(): + cnxset = q.get_nowait() + try: + cnxset.close(True) + except Exception: + self.exception('error while closing %s' % cnxset) + class Repository(object): """a repository provides access to a set of persistent storages for @@ -208,10 +248,9 @@ # reload configuration from file and could reset a manually set pool # size. pool_size = config['connections-pool-size'] - self._cnxsets_pool = queue.Queue() - # 0. init a cnxset that will be used to fetch bootstrap information from + # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from # the database - self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection()) + self.cnxsets = _CnxSetPool(self.system_source, 1) # 1. set used cubes if config.creating or not config.read_instance_schema: config.bootstrap_cubes() @@ -259,12 +298,8 @@ self.vreg.init_properties(self.properties()) # 4. close initialization connection set and reopen fresh ones for # proper initialization - self._get_cnxset().close(True) - # list of available cnxsets (can't iterate on a Queue) - self.cnxsets = [] - for i in range(pool_size): - self.cnxsets.append(self.system_source.wrapped_connection()) - self._cnxsets_pool.put_nowait(self.cnxsets[-1]) + self.cnxsets.close() + self.cnxsets = _CnxSetPool(self.system_source, pool_size) # internals ############################################################### @@ -396,19 +431,6 @@ """start function in a separated thread""" utils.RepoThread(func, self._running_threads).start() - def _get_cnxset(self): - try: - return self._cnxsets_pool.get(True, timeout=5) - except queue.Empty: - raise Exception('no connections set available after 5 secs, probably either a ' - 'bug in code (too many uncommited/rolled back ' - 'connections) or too much load on the server (in ' - 'which case you can try to set a bigger ' - 'connections pool size)') - - def _free_cnxset(self, cnxset): - self._cnxsets_pool.put_nowait(cnxset) - def shutdown(self): """called on server stop event to properly close opened sessions and connections @@ -430,13 +452,7 @@ thread.join() self.info('thread %s finished', thread.getName()) self.close_sessions() - while not self._cnxsets_pool.empty(): - cnxset = self._cnxsets_pool.get_nowait() - try: - cnxset.close(True) - except Exception: - self.exception('error while closing %s' % cnxset) - continue + self.cnxsets.close() hits, misses = self.querier.cache_hit, self.querier.cache_miss try: self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses, diff -r 83a921bae21c -r ad7796dabeaa cubicweb/server/session.py --- a/cubicweb/server/session.py Fri Jan 27 15:53:15 2017 +0100 +++ b/cubicweb/server/session.py Fri Jan 27 17:42:16 2017 +0100 @@ -376,7 +376,7 @@ def __enter__(self): assert not self._open self._open = True - self.cnxset = self.repo._get_cnxset() + self.cnxset = self.repo.cnxsets.get() if self.lang is None: self.set_language(self.user.prefered_language()) return self @@ -386,7 +386,7 @@ self.rollback() self._open = False self.cnxset.cnxset_freed() - self.repo._free_cnxset(self.cnxset) + self.repo.cnxsets.release(self.cnxset) self.cnxset = None @contextmanager diff -r 83a921bae21c -r ad7796dabeaa cubicweb/server/sources/native.py --- a/cubicweb/server/sources/native.py Fri Jan 27 15:53:15 2017 +0100 +++ b/cubicweb/server/sources/native.py Fri Jan 27 17:42:16 2017 +0100 @@ -380,7 +380,7 @@ # check full text index availibility if self.do_fti: if cnxset is None: - _cnxset = self.repo._get_cnxset() + _cnxset = self.repo.cnxsets.get() else: _cnxset = cnxset if not self.dbhelper.has_fti_table(_cnxset.cu): @@ -389,7 +389,7 @@ self.do_fti = False if cnxset is None: _cnxset.cnxset_freed() - self.repo._free_cnxset(_cnxset) + self.repo.cnxsets.release(_cnxset) def backup(self, backupfile, confirm, format='native'): """method called to create a backup of the source's data""" diff -r 83a921bae21c -r ad7796dabeaa cubicweb/sobjects/services.py --- a/cubicweb/sobjects/services.py Fri Jan 27 15:53:15 2017 +0100 +++ b/cubicweb/sobjects/services.py Fri Jan 27 17:42:16 2017 +0100 @@ -54,7 +54,7 @@ results['nb_active_threads'] = threading.activeCount() looping_tasks = repo._tasks_manager._looping_tasks results['looping_tasks'] = [(t.name, t.interval) for t in looping_tasks] - results['available_cnxsets'] = repo._cnxsets_pool.qsize() + results['available_cnxsets'] = repo.cnxsets.qsize() results['threads'] = [t.name for t in threading.enumerate()] return results