28 |
28 |
29 from itertools import chain |
29 from itertools import chain |
30 from contextlib import contextmanager |
30 from contextlib import contextmanager |
31 from logging import getLogger |
31 from logging import getLogger |
32 import queue |
32 import queue |
|
33 import threading |
33 |
34 |
34 from logilab.common.decorators import cached, clear_cache |
35 from logilab.common.decorators import cached, clear_cache |
35 |
36 |
36 from yams import BadSchemaDefinition |
37 from yams import BadSchemaDefinition |
37 from rql.utils import rqlvar_maker |
38 from rql.utils import rqlvar_maker |
168 pass |
169 pass |
169 |
170 |
170 |
171 |
171 class _CnxSetPool(_BaseCnxSet): |
172 class _CnxSetPool(_BaseCnxSet): |
172 |
173 |
173 def __init__(self, source, size): |
174 def __init__(self, source, min_size=1, max_size=4): |
174 super().__init__(source) |
175 super().__init__(source) |
175 self._cnxsets = [] |
176 self._cnxsets = [] |
176 self._queue = queue.LifoQueue() |
177 self._queue = queue.LifoQueue() |
177 |
178 self.lock = threading.Lock() |
178 for i in range(size): |
179 self.min_size = min_size |
|
180 self.max_size = max_size |
|
181 |
|
182 for i in range(min_size): |
179 self._queue.put_nowait(self._new_cnxset()) |
183 self._queue.put_nowait(self._new_cnxset()) |
180 |
184 |
181 def _new_cnxset(self): |
185 def _new_cnxset(self): |
182 cnxset = super()._new_cnxset() |
186 cnxset = super()._new_cnxset() |
183 self._cnxsets.append(cnxset) |
187 with self.lock: |
|
188 self._cnxsets.append(cnxset) |
184 return cnxset |
189 return cnxset |
|
190 |
|
191 def size(self): |
|
192 with self.lock: |
|
193 return len(self._cnxsets) |
185 |
194 |
186 def qsize(self): |
195 def qsize(self): |
187 return self._queue.qsize() |
196 return self._queue.qsize() |
188 |
197 |
189 def get(self): |
198 def get(self): |
190 try: |
199 try: |
191 return self._queue.get(True, timeout=5) |
200 cnxset = self._queue.get_nowait() |
|
201 return cnxset |
192 except queue.Empty: |
202 except queue.Empty: |
193 raise Exception('no connections set available after 5 secs, probably either a ' |
203 if self.max_size and self.size() >= self.max_size: |
194 'bug in code (too many uncommited/rolled back ' |
204 try: |
195 'connections) or too much load on the server (in ' |
205 return self._queue.get(True, timeout=5) |
196 'which case you can try to set a bigger ' |
206 except queue.Empty: |
197 'connections pool size)') |
207 raise Exception('no connections set available after 5 secs, probably either a ' |
|
208 'bug in code (too many uncommited/rolled back ' |
|
209 'connections) or too much load on the server (in ' |
|
210 'which case you can try to set a bigger ' |
|
211 'connections pool size)') |
|
212 return self._new_cnxset() |
198 |
213 |
199 def release(self, cnxset): |
214 def release(self, cnxset): |
200 self._queue.put_nowait(cnxset) |
215 self._queue.put_nowait(cnxset) |
201 |
216 |
202 def __iter__(self): |
217 def __iter__(self): |
203 for cnxset in self._cnxsets: |
218 with self.lock: |
204 yield cnxset |
219 for cnxset in self._cnxsets: |
|
220 yield cnxset |
205 |
221 |
206 def close(self): |
222 def close(self): |
207 while True: |
223 while True: |
208 try: |
224 try: |
209 cnxset = self._queue.get_nowait() |
225 cnxset = self._queue.get_nowait() |
216 |
232 |
217 |
233 |
218 def get_cnxset(source, size): |
234 def get_cnxset(source, size): |
219 if not size: |
235 if not size: |
220 return _BaseCnxSet(source) |
236 return _BaseCnxSet(source) |
221 return _CnxSetPool(source, size) |
237 return _CnxSetPool(source, min_size=1, max_size=size) |
222 |
238 |
223 |
239 |
224 class Repository(object): |
240 class Repository(object): |
225 """a repository provides access to a set of persistent storages for |
241 """a repository provides access to a set of persistent storages for |
226 entities and relations |
242 entities and relations |