[repository] move cnxset pool handling to a helper class
authorPhilippe Pepiot <philippe.pepiot@logilab.fr>
Fri, 27 Jan 2017 17:42:16 +0100
changeset 11931 ad7796dabeaa
parent 11930 83a921bae21c
child 11932 7b2247098f58
[repository] move cnxset pool handling to a helper class The class has responsability to handle connections pool operations in a single public attribute 'cnxsets'. On Repository _get_cnxset() and _free_cnxset() are replaced by cnxsets.get() and cnxsets.release(). Drop multiple access to private attributes and methods from outside of Repository.
cubicweb/server/repository.py
cubicweb/server/session.py
cubicweb/server/sources/native.py
cubicweb/sobjects/services.py
--- a/cubicweb/server/repository.py	Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/repository.py	Fri Jan 27 17:42:16 2017 +0100
@@ -149,6 +149,46 @@
     def stop(self):
         pass
 
+class _CnxSetPool(object):
+
+    def __init__(self, source, size):
+        self._cnxsets = []
+        self._queue = queue.Queue()
+        for i in range(size):
+            cnxset = source.wrapped_connection()
+            self._cnxsets.append(cnxset)
+            self._queue.put_nowait(cnxset)
+        super(_CnxSetPool, self).__init__()
+
+    def qsize(self):
+        return self._queue.qsize()
+
+    def get(self):
+        try:
+            return self._queue.get(True, timeout=5)
+        except queue.Empty:
+            raise Exception('no connections set available after 5 secs, probably either a '
+                            'bug in code (too many uncommited/rolled back '
+                            'connections) or too much load on the server (in '
+                            'which case you can try to set a bigger '
+                            'connections pool size)')
+
+    def release(self, cnxset):
+        self._queue.put_nowait(cnxset)
+
+    def __iter__(self):
+        for cnxset in self._cnxsets:
+            yield cnxset
+
+    def close(self):
+        q = self._queue
+        while not q.empty():
+            cnxset = q.get_nowait()
+            try:
+                cnxset.close(True)
+            except Exception:
+                self.exception('error while closing %s' % cnxset)
+
 
 class Repository(object):
     """a repository provides access to a set of persistent storages for
@@ -208,10 +248,9 @@
         # reload configuration from file and could reset a manually set pool
         # size.
         pool_size = config['connections-pool-size']
-        self._cnxsets_pool = queue.Queue()
-        # 0. init a cnxset that will be used to fetch bootstrap information from
+        # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from
         #    the database
-        self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection())
+        self.cnxsets = _CnxSetPool(self.system_source, 1)
         # 1. set used cubes
         if config.creating or not config.read_instance_schema:
             config.bootstrap_cubes()
@@ -259,12 +298,8 @@
                 self.vreg.init_properties(self.properties())
         # 4. close initialization connection set and reopen fresh ones for
         #    proper initialization
-        self._get_cnxset().close(True)
-        # list of available cnxsets (can't iterate on a Queue)
-        self.cnxsets = []
-        for i in range(pool_size):
-            self.cnxsets.append(self.system_source.wrapped_connection())
-            self._cnxsets_pool.put_nowait(self.cnxsets[-1])
+        self.cnxsets.close()
+        self.cnxsets = _CnxSetPool(self.system_source, pool_size)
 
     # internals ###############################################################
 
@@ -396,19 +431,6 @@
         """start function in a separated thread"""
         utils.RepoThread(func, self._running_threads).start()
 
-    def _get_cnxset(self):
-        try:
-            return self._cnxsets_pool.get(True, timeout=5)
-        except queue.Empty:
-            raise Exception('no connections set available after 5 secs, probably either a '
-                            'bug in code (too many uncommited/rolled back '
-                            'connections) or too much load on the server (in '
-                            'which case you can try to set a bigger '
-                            'connections pool size)')
-
-    def _free_cnxset(self, cnxset):
-        self._cnxsets_pool.put_nowait(cnxset)
-
     def shutdown(self):
         """called on server stop event to properly close opened sessions and
         connections
@@ -430,13 +452,7 @@
             thread.join()
             self.info('thread %s finished', thread.getName())
         self.close_sessions()
-        while not self._cnxsets_pool.empty():
-            cnxset = self._cnxsets_pool.get_nowait()
-            try:
-                cnxset.close(True)
-            except Exception:
-                self.exception('error while closing %s' % cnxset)
-                continue
+        self.cnxsets.close()
         hits, misses = self.querier.cache_hit, self.querier.cache_miss
         try:
             self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
--- a/cubicweb/server/session.py	Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/session.py	Fri Jan 27 17:42:16 2017 +0100
@@ -376,7 +376,7 @@
     def __enter__(self):
         assert not self._open
         self._open = True
-        self.cnxset = self.repo._get_cnxset()
+        self.cnxset = self.repo.cnxsets.get()
         if self.lang is None:
             self.set_language(self.user.prefered_language())
         return self
@@ -386,7 +386,7 @@
         self.rollback()
         self._open = False
         self.cnxset.cnxset_freed()
-        self.repo._free_cnxset(self.cnxset)
+        self.repo.cnxsets.release(self.cnxset)
         self.cnxset = None
 
     @contextmanager
--- a/cubicweb/server/sources/native.py	Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/sources/native.py	Fri Jan 27 17:42:16 2017 +0100
@@ -380,7 +380,7 @@
         # check full text index availibility
         if self.do_fti:
             if cnxset is None:
-                _cnxset = self.repo._get_cnxset()
+                _cnxset = self.repo.cnxsets.get()
             else:
                 _cnxset = cnxset
             if not self.dbhelper.has_fti_table(_cnxset.cu):
@@ -389,7 +389,7 @@
                 self.do_fti = False
             if cnxset is None:
                 _cnxset.cnxset_freed()
-                self.repo._free_cnxset(_cnxset)
+                self.repo.cnxsets.release(_cnxset)
 
     def backup(self, backupfile, confirm, format='native'):
         """method called to create a backup of the source's data"""
--- a/cubicweb/sobjects/services.py	Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/sobjects/services.py	Fri Jan 27 17:42:16 2017 +0100
@@ -54,7 +54,7 @@
         results['nb_active_threads'] = threading.activeCount()
         looping_tasks = repo._tasks_manager._looping_tasks
         results['looping_tasks'] = [(t.name, t.interval) for t in looping_tasks]
-        results['available_cnxsets'] = repo._cnxsets_pool.qsize()
+        results['available_cnxsets'] = repo.cnxsets.qsize()
         results['threads'] = [t.name for t in threading.enumerate()]
         return results