cubicweb/server/repository.py
changeset 11931 ad7796dabeaa
parent 11908 7904fe436e82
child 11934 ec69125c03a8
--- a/cubicweb/server/repository.py	Fri Jan 27 15:53:15 2017 +0100
+++ b/cubicweb/server/repository.py	Fri Jan 27 17:42:16 2017 +0100
@@ -149,6 +149,46 @@
     def stop(self):
         pass
 
+class _CnxSetPool(object):
+
+    def __init__(self, source, size):
+        self._cnxsets = []
+        self._queue = queue.Queue()
+        for i in range(size):
+            cnxset = source.wrapped_connection()
+            self._cnxsets.append(cnxset)
+            self._queue.put_nowait(cnxset)
+        super(_CnxSetPool, self).__init__()
+
+    def qsize(self):
+        return self._queue.qsize()
+
+    def get(self):
+        try:
+            return self._queue.get(True, timeout=5)
+        except queue.Empty:
+            raise Exception('no connections set available after 5 secs, probably either a '
+                            'bug in code (too many uncommited/rolled back '
+                            'connections) or too much load on the server (in '
+                            'which case you can try to set a bigger '
+                            'connections pool size)')
+
+    def release(self, cnxset):
+        self._queue.put_nowait(cnxset)
+
+    def __iter__(self):
+        for cnxset in self._cnxsets:
+            yield cnxset
+
+    def close(self):
+        q = self._queue
+        while not q.empty():
+            cnxset = q.get_nowait()
+            try:
+                cnxset.close(True)
+            except Exception:
+                self.exception('error while closing %s' % cnxset)
+
 
 class Repository(object):
     """a repository provides access to a set of persistent storages for
@@ -208,10 +248,9 @@
         # reload configuration from file and could reset a manually set pool
         # size.
         pool_size = config['connections-pool-size']
-        self._cnxsets_pool = queue.Queue()
-        # 0. init a cnxset that will be used to fetch bootstrap information from
+        # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from
         #    the database
-        self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection())
+        self.cnxsets = _CnxSetPool(self.system_source, 1)
         # 1. set used cubes
         if config.creating or not config.read_instance_schema:
             config.bootstrap_cubes()
@@ -259,12 +298,8 @@
                 self.vreg.init_properties(self.properties())
         # 4. close initialization connection set and reopen fresh ones for
         #    proper initialization
-        self._get_cnxset().close(True)
-        # list of available cnxsets (can't iterate on a Queue)
-        self.cnxsets = []
-        for i in range(pool_size):
-            self.cnxsets.append(self.system_source.wrapped_connection())
-            self._cnxsets_pool.put_nowait(self.cnxsets[-1])
+        self.cnxsets.close()
+        self.cnxsets = _CnxSetPool(self.system_source, pool_size)
 
     # internals ###############################################################
 
@@ -396,19 +431,6 @@
         """start function in a separated thread"""
         utils.RepoThread(func, self._running_threads).start()
 
-    def _get_cnxset(self):
-        try:
-            return self._cnxsets_pool.get(True, timeout=5)
-        except queue.Empty:
-            raise Exception('no connections set available after 5 secs, probably either a '
-                            'bug in code (too many uncommited/rolled back '
-                            'connections) or too much load on the server (in '
-                            'which case you can try to set a bigger '
-                            'connections pool size)')
-
-    def _free_cnxset(self, cnxset):
-        self._cnxsets_pool.put_nowait(cnxset)
-
     def shutdown(self):
         """called on server stop event to properly close opened sessions and
         connections
@@ -430,13 +452,7 @@
             thread.join()
             self.info('thread %s finished', thread.getName())
         self.close_sessions()
-        while not self._cnxsets_pool.empty():
-            cnxset = self._cnxsets_pool.get_nowait()
-            try:
-                cnxset.close(True)
-            except Exception:
-                self.exception('error while closing %s' % cnxset)
-                continue
+        self.cnxsets.close()
         hits, misses = self.querier.cache_hit, self.querier.cache_miss
         try:
             self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,