[repository] move cnxset pool handling to a helper class
The class has responsability to handle connections pool operations in a single
public attribute 'cnxsets'.
On Repository _get_cnxset() and _free_cnxset() are replaced by cnxsets.get()
and cnxsets.release().
Drop multiple access to private attributes and methods from outside of Repository.
--- a/cubicweb/server/repository.py Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/repository.py Fri Jan 27 17:42:16 2017 +0100
@@ -149,6 +149,46 @@
def stop(self):
pass
+class _CnxSetPool(object):
+
+ def __init__(self, source, size):
+ self._cnxsets = []
+ self._queue = queue.Queue()
+ for i in range(size):
+ cnxset = source.wrapped_connection()
+ self._cnxsets.append(cnxset)
+ self._queue.put_nowait(cnxset)
+ super(_CnxSetPool, self).__init__()
+
+ def qsize(self):
+ return self._queue.qsize()
+
+ def get(self):
+ try:
+ return self._queue.get(True, timeout=5)
+ except queue.Empty:
+ raise Exception('no connections set available after 5 secs, probably either a '
+ 'bug in code (too many uncommited/rolled back '
+ 'connections) or too much load on the server (in '
+ 'which case you can try to set a bigger '
+ 'connections pool size)')
+
+ def release(self, cnxset):
+ self._queue.put_nowait(cnxset)
+
+ def __iter__(self):
+ for cnxset in self._cnxsets:
+ yield cnxset
+
+ def close(self):
+ q = self._queue
+ while not q.empty():
+ cnxset = q.get_nowait()
+ try:
+ cnxset.close(True)
+ except Exception:
+ self.exception('error while closing %s' % cnxset)
+
class Repository(object):
"""a repository provides access to a set of persistent storages for
@@ -208,10 +248,9 @@
# reload configuration from file and could reset a manually set pool
# size.
pool_size = config['connections-pool-size']
- self._cnxsets_pool = queue.Queue()
- # 0. init a cnxset that will be used to fetch bootstrap information from
+ # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from
# the database
- self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection())
+ self.cnxsets = _CnxSetPool(self.system_source, 1)
# 1. set used cubes
if config.creating or not config.read_instance_schema:
config.bootstrap_cubes()
@@ -259,12 +298,8 @@
self.vreg.init_properties(self.properties())
# 4. close initialization connection set and reopen fresh ones for
# proper initialization
- self._get_cnxset().close(True)
- # list of available cnxsets (can't iterate on a Queue)
- self.cnxsets = []
- for i in range(pool_size):
- self.cnxsets.append(self.system_source.wrapped_connection())
- self._cnxsets_pool.put_nowait(self.cnxsets[-1])
+ self.cnxsets.close()
+ self.cnxsets = _CnxSetPool(self.system_source, pool_size)
# internals ###############################################################
@@ -396,19 +431,6 @@
"""start function in a separated thread"""
utils.RepoThread(func, self._running_threads).start()
- def _get_cnxset(self):
- try:
- return self._cnxsets_pool.get(True, timeout=5)
- except queue.Empty:
- raise Exception('no connections set available after 5 secs, probably either a '
- 'bug in code (too many uncommited/rolled back '
- 'connections) or too much load on the server (in '
- 'which case you can try to set a bigger '
- 'connections pool size)')
-
- def _free_cnxset(self, cnxset):
- self._cnxsets_pool.put_nowait(cnxset)
-
def shutdown(self):
"""called on server stop event to properly close opened sessions and
connections
@@ -430,13 +452,7 @@
thread.join()
self.info('thread %s finished', thread.getName())
self.close_sessions()
- while not self._cnxsets_pool.empty():
- cnxset = self._cnxsets_pool.get_nowait()
- try:
- cnxset.close(True)
- except Exception:
- self.exception('error while closing %s' % cnxset)
- continue
+ self.cnxsets.close()
hits, misses = self.querier.cache_hit, self.querier.cache_miss
try:
self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
--- a/cubicweb/server/session.py Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/session.py Fri Jan 27 17:42:16 2017 +0100
@@ -376,7 +376,7 @@
def __enter__(self):
assert not self._open
self._open = True
- self.cnxset = self.repo._get_cnxset()
+ self.cnxset = self.repo.cnxsets.get()
if self.lang is None:
self.set_language(self.user.prefered_language())
return self
@@ -386,7 +386,7 @@
self.rollback()
self._open = False
self.cnxset.cnxset_freed()
- self.repo._free_cnxset(self.cnxset)
+ self.repo.cnxsets.release(self.cnxset)
self.cnxset = None
@contextmanager
--- a/cubicweb/server/sources/native.py Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/sources/native.py Fri Jan 27 17:42:16 2017 +0100
@@ -380,7 +380,7 @@
# check full text index availibility
if self.do_fti:
if cnxset is None:
- _cnxset = self.repo._get_cnxset()
+ _cnxset = self.repo.cnxsets.get()
else:
_cnxset = cnxset
if not self.dbhelper.has_fti_table(_cnxset.cu):
@@ -389,7 +389,7 @@
self.do_fti = False
if cnxset is None:
_cnxset.cnxset_freed()
- self.repo._free_cnxset(_cnxset)
+ self.repo.cnxsets.release(_cnxset)
def backup(self, backupfile, confirm, format='native'):
"""method called to create a backup of the source's data"""
--- a/cubicweb/sobjects/services.py Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/sobjects/services.py Fri Jan 27 17:42:16 2017 +0100
@@ -54,7 +54,7 @@
results['nb_active_threads'] = threading.activeCount()
looping_tasks = repo._tasks_manager._looping_tasks
results['looping_tasks'] = [(t.name, t.interval) for t in looping_tasks]
- results['available_cnxsets'] = repo._cnxsets_pool.qsize()
+ results['available_cnxsets'] = repo.cnxsets.qsize()
results['threads'] = [t.name for t in threading.enumerate()]
return results