cubicweb/server/repository.py
changeset 12962 fa0cd558d829
parent 12961 01810941a4be
child 12937 26e292e32f9c
child 12946 e49e03b590e0
equal deleted inserted replaced
12961:01810941a4be 12962:fa0cd558d829
    28 
    28 
    29 from itertools import chain
    29 from itertools import chain
    30 from contextlib import contextmanager
    30 from contextlib import contextmanager
    31 from logging import getLogger
    31 from logging import getLogger
    32 import queue
    32 import queue
       
    33 import threading
    33 
    34 
    34 from logilab.common.decorators import cached, clear_cache
    35 from logilab.common.decorators import cached, clear_cache
    35 
    36 
    36 from yams import BadSchemaDefinition
    37 from yams import BadSchemaDefinition
    37 from rql.utils import rqlvar_maker
    38 from rql.utils import rqlvar_maker
   168         pass
   169         pass
   169 
   170 
   170 
   171 
   171 class _CnxSetPool(_BaseCnxSet):
   172 class _CnxSetPool(_BaseCnxSet):
   172 
   173 
   173     def __init__(self, source, size):
   174     def __init__(self, source, min_size=1, max_size=4):
   174         super().__init__(source)
   175         super().__init__(source)
   175         self._cnxsets = []
   176         self._cnxsets = []
   176         self._queue = queue.LifoQueue()
   177         self._queue = queue.LifoQueue()
   177 
   178         self.lock = threading.Lock()
   178         for i in range(size):
   179         self.min_size = min_size
       
   180         self.max_size = max_size
       
   181 
       
   182         for i in range(min_size):
   179             self._queue.put_nowait(self._new_cnxset())
   183             self._queue.put_nowait(self._new_cnxset())
   180 
   184 
   181     def _new_cnxset(self):
   185     def _new_cnxset(self):
   182         cnxset = super()._new_cnxset()
   186         cnxset = super()._new_cnxset()
   183         self._cnxsets.append(cnxset)
   187         with self.lock:
       
   188             self._cnxsets.append(cnxset)
   184         return cnxset
   189         return cnxset
       
   190 
       
   191     def size(self):
       
   192         with self.lock:
       
   193             return len(self._cnxsets)
   185 
   194 
   186     def qsize(self):
   195     def qsize(self):
   187         return self._queue.qsize()
   196         return self._queue.qsize()
   188 
   197 
   189     def get(self):
   198     def get(self):
   190         try:
   199         try:
   191             return self._queue.get(True, timeout=5)
   200             cnxset = self._queue.get_nowait()
       
   201             return cnxset
   192         except queue.Empty:
   202         except queue.Empty:
   193             raise Exception('no connections set available after 5 secs, probably either a '
   203             if self.max_size and self.size() >= self.max_size:
   194                             'bug in code (too many uncommited/rolled back '
   204                 try:
   195                             'connections) or too much load on the server (in '
   205                     return self._queue.get(True, timeout=5)
   196                             'which case you can try to set a bigger '
   206                 except queue.Empty:
   197                             'connections pool size)')
   207                     raise Exception('no connections set available after 5 secs, probably either a '
       
   208                                     'bug in code (too many uncommited/rolled back '
       
   209                                     'connections) or too much load on the server (in '
       
   210                                     'which case you can try to set a bigger '
       
   211                                     'connections pool size)')
       
   212             return self._new_cnxset()
   198 
   213 
   199     def release(self, cnxset):
   214     def release(self, cnxset):
   200         self._queue.put_nowait(cnxset)
   215         self._queue.put_nowait(cnxset)
   201 
   216 
   202     def __iter__(self):
   217     def __iter__(self):
   203         for cnxset in self._cnxsets:
   218         with self.lock:
   204             yield cnxset
   219             for cnxset in self._cnxsets:
       
   220                 yield cnxset
   205 
   221 
   206     def close(self):
   222     def close(self):
   207         while True:
   223         while True:
   208             try:
   224             try:
   209                 cnxset = self._queue.get_nowait()
   225                 cnxset = self._queue.get_nowait()
   216 
   232 
   217 
   233 
   218 def get_cnxset(source, size):
   234 def get_cnxset(source, size):
   219     if not size:
   235     if not size:
   220         return _BaseCnxSet(source)
   236         return _BaseCnxSet(source)
   221     return _CnxSetPool(source, size)
   237     return _CnxSetPool(source, min_size=1, max_size=size)
   222 
   238 
   223 
   239 
   224 class Repository(object):
   240 class Repository(object):
   225     """a repository provides access to a set of persistent storages for
   241     """a repository provides access to a set of persistent storages for
   226     entities and relations
   242     entities and relations