cubicweb/server/repository.py
changeset 12963 dd9e98b25213
parent 12962 fa0cd558d829
child 12947 27323f7200fc
equal deleted inserted replaced
12962:fa0cd558d829 12963:dd9e98b25213
    29 from itertools import chain
    29 from itertools import chain
    30 from contextlib import contextmanager
    30 from contextlib import contextmanager
    31 from logging import getLogger
    31 from logging import getLogger
    32 import queue
    32 import queue
    33 import threading
    33 import threading
       
    34 import time
    34 
    35 
    35 from logilab.common.decorators import cached, clear_cache
    36 from logilab.common.decorators import cached, clear_cache
    36 
    37 
    37 from yams import BadSchemaDefinition
    38 from yams import BadSchemaDefinition
    38 from rql.utils import rqlvar_maker
    39 from rql.utils import rqlvar_maker
   169         pass
   170         pass
   170 
   171 
   171 
   172 
   172 class _CnxSetPool(_BaseCnxSet):
   173 class _CnxSetPool(_BaseCnxSet):
   173 
   174 
   174     def __init__(self, source, min_size=1, max_size=4):
   175     def __init__(self, source, min_size=1, max_size=4, idle_timeout=300):
   175         super().__init__(source)
   176         super().__init__(source)
   176         self._cnxsets = []
   177         self._cnxsets = []
   177         self._queue = queue.LifoQueue()
   178         self._queue = queue.LifoQueue()
   178         self.lock = threading.Lock()
   179         self.lock = threading.Lock()
   179         self.min_size = min_size
   180         self.min_size = min_size
   180         self.max_size = max_size
   181         self.max_size = max_size
       
   182         self.idle = time.time()
       
   183         self.idle_timeout = idle_timeout
   181 
   184 
   182         for i in range(min_size):
   185         for i in range(min_size):
   183             self._queue.put_nowait(self._new_cnxset())
   186             self._queue.put_nowait(self._new_cnxset())
   184 
   187 
   185     def _new_cnxset(self):
   188     def _new_cnxset(self):
   186         cnxset = super()._new_cnxset()
   189         cnxset = super()._new_cnxset()
   187         with self.lock:
   190         with self.lock:
   188             self._cnxsets.append(cnxset)
   191             self._cnxsets.append(cnxset)
   189         return cnxset
   192         return cnxset
   190 
   193 
       
   194     def _close_idle_cnxset(self):
       
   195         # close connections not being used since idle_timeout
       
   196         if abs(time.time() - self.idle) > self.idle_timeout and self.size() > self.min_size:
       
   197             try:
       
   198                 cnxset = self._queue.get_nowait()
       
   199             except queue.Empty:
       
   200                 # the queue has been used since we checked it size
       
   201                 pass
       
   202             else:
       
   203                 cnxset.close(True)
       
   204                 with self.lock:
       
   205                     self._cnxsets.remove(cnxset)
       
   206 
   191     def size(self):
   207     def size(self):
   192         with self.lock:
   208         with self.lock:
   193             return len(self._cnxsets)
   209             return len(self._cnxsets)
   194 
   210 
   195     def qsize(self):
   211     def qsize(self):
   196         return self._queue.qsize()
   212         return self._queue.qsize()
   197 
   213 
   198     def get(self):
   214     def get(self):
   199         try:
   215         try:
   200             cnxset = self._queue.get_nowait()
   216             cnxset = self._queue.get_nowait()
       
   217             self._close_idle_cnxset()
   201             return cnxset
   218             return cnxset
   202         except queue.Empty:
   219         except queue.Empty:
       
   220             # reset idle time
       
   221             self.idle = time.time()
   203             if self.max_size and self.size() >= self.max_size:
   222             if self.max_size and self.size() >= self.max_size:
   204                 try:
   223                 try:
   205                     return self._queue.get(True, timeout=5)
   224                     return self._queue.get(True, timeout=5)
   206                 except queue.Empty:
   225                 except queue.Empty:
   207                     raise Exception('no connections set available after 5 secs, probably either a '
   226                     raise Exception('no connections set available after 5 secs, probably either a '
   211                                     'connections pool size)')
   230                                     'connections pool size)')
   212             return self._new_cnxset()
   231             return self._new_cnxset()
   213 
   232 
   214     def release(self, cnxset):
   233     def release(self, cnxset):
   215         self._queue.put_nowait(cnxset)
   234         self._queue.put_nowait(cnxset)
       
   235         self._close_idle_cnxset()
   216 
   236 
   217     def __iter__(self):
   237     def __iter__(self):
   218         with self.lock:
   238         with self.lock:
   219             for cnxset in self._cnxsets:
   239             for cnxset in self._cnxsets:
   220                 yield cnxset
   240                 yield cnxset