29 from itertools import chain |
29 from itertools import chain |
30 from contextlib import contextmanager |
30 from contextlib import contextmanager |
31 from logging import getLogger |
31 from logging import getLogger |
32 import queue |
32 import queue |
33 import threading |
33 import threading |
|
34 import time |
34 |
35 |
35 from logilab.common.decorators import cached, clear_cache |
36 from logilab.common.decorators import cached, clear_cache |
36 |
37 |
37 from yams import BadSchemaDefinition |
38 from yams import BadSchemaDefinition |
38 from rql.utils import rqlvar_maker |
39 from rql.utils import rqlvar_maker |
169 pass |
170 pass |
170 |
171 |
171 |
172 |
172 class _CnxSetPool(_BaseCnxSet): |
173 class _CnxSetPool(_BaseCnxSet): |
173 |
174 |
174 def __init__(self, source, min_size=1, max_size=4): |
175 def __init__(self, source, min_size=1, max_size=4, idle_timeout=300): |
175 super().__init__(source) |
176 super().__init__(source) |
176 self._cnxsets = [] |
177 self._cnxsets = [] |
177 self._queue = queue.LifoQueue() |
178 self._queue = queue.LifoQueue() |
178 self.lock = threading.Lock() |
179 self.lock = threading.Lock() |
179 self.min_size = min_size |
180 self.min_size = min_size |
180 self.max_size = max_size |
181 self.max_size = max_size |
|
182 self.idle = time.time() |
|
183 self.idle_timeout = idle_timeout |
181 |
184 |
182 for i in range(min_size): |
185 for i in range(min_size): |
183 self._queue.put_nowait(self._new_cnxset()) |
186 self._queue.put_nowait(self._new_cnxset()) |
184 |
187 |
185 def _new_cnxset(self): |
188 def _new_cnxset(self): |
186 cnxset = super()._new_cnxset() |
189 cnxset = super()._new_cnxset() |
187 with self.lock: |
190 with self.lock: |
188 self._cnxsets.append(cnxset) |
191 self._cnxsets.append(cnxset) |
189 return cnxset |
192 return cnxset |
190 |
193 |
|
194 def _close_idle_cnxset(self): |
|
195 # close connections not being used since idle_timeout |
|
196 if abs(time.time() - self.idle) > self.idle_timeout and self.size() > self.min_size: |
|
197 try: |
|
198 cnxset = self._queue.get_nowait() |
|
199 except queue.Empty: |
|
200 # the queue has been used since we checked it size |
|
201 pass |
|
202 else: |
|
203 cnxset.close(True) |
|
204 with self.lock: |
|
205 self._cnxsets.remove(cnxset) |
|
206 |
191 def size(self): |
207 def size(self): |
192 with self.lock: |
208 with self.lock: |
193 return len(self._cnxsets) |
209 return len(self._cnxsets) |
194 |
210 |
195 def qsize(self): |
211 def qsize(self): |
196 return self._queue.qsize() |
212 return self._queue.qsize() |
197 |
213 |
198 def get(self): |
214 def get(self): |
199 try: |
215 try: |
200 cnxset = self._queue.get_nowait() |
216 cnxset = self._queue.get_nowait() |
|
217 self._close_idle_cnxset() |
201 return cnxset |
218 return cnxset |
202 except queue.Empty: |
219 except queue.Empty: |
|
220 # reset idle time |
|
221 self.idle = time.time() |
203 if self.max_size and self.size() >= self.max_size: |
222 if self.max_size and self.size() >= self.max_size: |
204 try: |
223 try: |
205 return self._queue.get(True, timeout=5) |
224 return self._queue.get(True, timeout=5) |
206 except queue.Empty: |
225 except queue.Empty: |
207 raise Exception('no connections set available after 5 secs, probably either a ' |
226 raise Exception('no connections set available after 5 secs, probably either a ' |
211 'connections pool size)') |
230 'connections pool size)') |
212 return self._new_cnxset() |
231 return self._new_cnxset() |
213 |
232 |
214 def release(self, cnxset): |
233 def release(self, cnxset): |
215 self._queue.put_nowait(cnxset) |
234 self._queue.put_nowait(cnxset) |
|
235 self._close_idle_cnxset() |
216 |
236 |
217 def __iter__(self): |
237 def __iter__(self): |
218 with self.lock: |
238 with self.lock: |
219 for cnxset in self._cnxsets: |
239 for cnxset in self._cnxsets: |
220 yield cnxset |
240 yield cnxset |