--- a/cubicweb/server/repository.py Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/repository.py Fri Jan 27 17:42:16 2017 +0100
@@ -149,6 +149,46 @@
def stop(self):
pass
+class _CnxSetPool(object):
+
+ def __init__(self, source, size):
+ self._cnxsets = []
+ self._queue = queue.Queue()
+ for i in range(size):
+ cnxset = source.wrapped_connection()
+ self._cnxsets.append(cnxset)
+ self._queue.put_nowait(cnxset)
+ super(_CnxSetPool, self).__init__()
+
+ def qsize(self):
+ return self._queue.qsize()
+
+ def get(self):
+ try:
+ return self._queue.get(True, timeout=5)
+ except queue.Empty:
+ raise Exception('no connections set available after 5 secs, probably either a '
+ 'bug in code (too many uncommited/rolled back '
+ 'connections) or too much load on the server (in '
+ 'which case you can try to set a bigger '
+ 'connections pool size)')
+
+ def release(self, cnxset):
+ self._queue.put_nowait(cnxset)
+
+ def __iter__(self):
+ for cnxset in self._cnxsets:
+ yield cnxset
+
+ def close(self):
+ q = self._queue
+ while not q.empty():
+ cnxset = q.get_nowait()
+ try:
+ cnxset.close(True)
+ except Exception:
+ self.exception('error while closing %s' % cnxset)
+
class Repository(object):
"""a repository provides access to a set of persistent storages for
@@ -208,10 +248,9 @@
# reload configuration from file and could reset a manually set pool
# size.
pool_size = config['connections-pool-size']
- self._cnxsets_pool = queue.Queue()
- # 0. init a cnxset that will be used to fetch bootstrap information from
+ # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from
# the database
- self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection())
+ self.cnxsets = _CnxSetPool(self.system_source, 1)
# 1. set used cubes
if config.creating or not config.read_instance_schema:
config.bootstrap_cubes()
@@ -259,12 +298,8 @@
self.vreg.init_properties(self.properties())
# 4. close initialization connection set and reopen fresh ones for
# proper initialization
- self._get_cnxset().close(True)
- # list of available cnxsets (can't iterate on a Queue)
- self.cnxsets = []
- for i in range(pool_size):
- self.cnxsets.append(self.system_source.wrapped_connection())
- self._cnxsets_pool.put_nowait(self.cnxsets[-1])
+ self.cnxsets.close()
+ self.cnxsets = _CnxSetPool(self.system_source, pool_size)
# internals ###############################################################
@@ -396,19 +431,6 @@
"""start function in a separated thread"""
utils.RepoThread(func, self._running_threads).start()
- def _get_cnxset(self):
- try:
- return self._cnxsets_pool.get(True, timeout=5)
- except queue.Empty:
- raise Exception('no connections set available after 5 secs, probably either a '
- 'bug in code (too many uncommited/rolled back '
- 'connections) or too much load on the server (in '
- 'which case you can try to set a bigger '
- 'connections pool size)')
-
- def _free_cnxset(self, cnxset):
- self._cnxsets_pool.put_nowait(cnxset)
-
def shutdown(self):
"""called on server stop event to properly close opened sessions and
connections
@@ -430,13 +452,7 @@
thread.join()
self.info('thread %s finished', thread.getName())
self.close_sessions()
- while not self._cnxsets_pool.empty():
- cnxset = self._cnxsets_pool.get_nowait()
- try:
- cnxset.close(True)
- except Exception:
- self.exception('error while closing %s' % cnxset)
- continue
+ self.cnxsets.close()
hits, misses = self.querier.cache_hit, self.querier.cache_miss
try:
self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,