147 pass |
147 pass |
148 |
148 |
149 def stop(self): |
149 def stop(self): |
150 pass |
150 pass |
151 |
151 |
|
152 class _CnxSetPool(object): |
|
153 |
|
154 def __init__(self, source, size): |
|
155 self._cnxsets = [] |
|
156 self._queue = queue.Queue() |
|
157 for i in range(size): |
|
158 cnxset = source.wrapped_connection() |
|
159 self._cnxsets.append(cnxset) |
|
160 self._queue.put_nowait(cnxset) |
|
161 super(_CnxSetPool, self).__init__() |
|
162 |
|
163 def qsize(self): |
|
164 return self._queue.qsize() |
|
165 |
|
166 def get(self): |
|
167 try: |
|
168 return self._queue.get(True, timeout=5) |
|
169 except queue.Empty: |
|
170 raise Exception('no connections set available after 5 secs, probably either a ' |
|
171 'bug in code (too many uncommited/rolled back ' |
|
172 'connections) or too much load on the server (in ' |
|
173 'which case you can try to set a bigger ' |
|
174 'connections pool size)') |
|
175 |
|
176 def release(self, cnxset): |
|
177 self._queue.put_nowait(cnxset) |
|
178 |
|
179 def __iter__(self): |
|
180 for cnxset in self._cnxsets: |
|
181 yield cnxset |
|
182 |
|
183 def close(self): |
|
184 q = self._queue |
|
185 while not q.empty(): |
|
186 cnxset = q.get_nowait() |
|
187 try: |
|
188 cnxset.close(True) |
|
189 except Exception: |
|
190 self.exception('error while closing %s' % cnxset) |
|
191 |
152 |
192 |
153 class Repository(object): |
193 class Repository(object): |
154 """a repository provides access to a set of persistent storages for |
194 """a repository provides access to a set of persistent storages for |
155 entities and relations |
195 entities and relations |
156 """ |
196 """ |
206 config = self.config |
246 config = self.config |
207 # copy pool size here since config.init_cube() and config.load_schema() |
247 # copy pool size here since config.init_cube() and config.load_schema() |
208 # reload configuration from file and could reset a manually set pool |
248 # reload configuration from file and could reset a manually set pool |
209 # size. |
249 # size. |
210 pool_size = config['connections-pool-size'] |
250 pool_size = config['connections-pool-size'] |
211 self._cnxsets_pool = queue.Queue() |
251 # 0. init a cnxset of size 1 that will be used to fetch bootstrap information from |
212 # 0. init a cnxset that will be used to fetch bootstrap information from |
|
213 # the database |
252 # the database |
214 self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection()) |
253 self.cnxsets = _CnxSetPool(self.system_source, 1) |
215 # 1. set used cubes |
254 # 1. set used cubes |
216 if config.creating or not config.read_instance_schema: |
255 if config.creating or not config.read_instance_schema: |
217 config.bootstrap_cubes() |
256 config.bootstrap_cubes() |
218 else: |
257 else: |
219 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
258 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
257 self.init_sources_from_database() |
296 self.init_sources_from_database() |
258 if 'CWProperty' in self.schema: |
297 if 'CWProperty' in self.schema: |
259 self.vreg.init_properties(self.properties()) |
298 self.vreg.init_properties(self.properties()) |
260 # 4. close initialization connection set and reopen fresh ones for |
299 # 4. close initialization connection set and reopen fresh ones for |
261 # proper initialization |
300 # proper initialization |
262 self._get_cnxset().close(True) |
301 self.cnxsets.close() |
263 # list of available cnxsets (can't iterate on a Queue) |
302 self.cnxsets = _CnxSetPool(self.system_source, pool_size) |
264 self.cnxsets = [] |
|
265 for i in range(pool_size): |
|
266 self.cnxsets.append(self.system_source.wrapped_connection()) |
|
267 self._cnxsets_pool.put_nowait(self.cnxsets[-1]) |
|
268 |
303 |
269 # internals ############################################################### |
304 # internals ############################################################### |
270 |
305 |
271 def init_sources_from_database(self): |
306 def init_sources_from_database(self): |
272 if self.config.quick_start or 'CWSource' not in self.schema: # 3.10 migration |
307 if self.config.quick_start or 'CWSource' not in self.schema: # 3.10 migration |
394 |
429 |
395 def threaded_task(self, func): |
430 def threaded_task(self, func): |
396 """start function in a separated thread""" |
431 """start function in a separated thread""" |
397 utils.RepoThread(func, self._running_threads).start() |
432 utils.RepoThread(func, self._running_threads).start() |
398 |
433 |
399 def _get_cnxset(self): |
|
400 try: |
|
401 return self._cnxsets_pool.get(True, timeout=5) |
|
402 except queue.Empty: |
|
403 raise Exception('no connections set available after 5 secs, probably either a ' |
|
404 'bug in code (too many uncommited/rolled back ' |
|
405 'connections) or too much load on the server (in ' |
|
406 'which case you can try to set a bigger ' |
|
407 'connections pool size)') |
|
408 |
|
409 def _free_cnxset(self, cnxset): |
|
410 self._cnxsets_pool.put_nowait(cnxset) |
|
411 |
|
412 def shutdown(self): |
434 def shutdown(self): |
413 """called on server stop event to properly close opened sessions and |
435 """called on server stop event to properly close opened sessions and |
414 connections |
436 connections |
415 """ |
437 """ |
416 assert not self.shutting_down, 'already shutting down' |
438 assert not self.shutting_down, 'already shutting down' |
428 for thread in self._running_threads: |
450 for thread in self._running_threads: |
429 self.info('waiting thread %s...', thread.getName()) |
451 self.info('waiting thread %s...', thread.getName()) |
430 thread.join() |
452 thread.join() |
431 self.info('thread %s finished', thread.getName()) |
453 self.info('thread %s finished', thread.getName()) |
432 self.close_sessions() |
454 self.close_sessions() |
433 while not self._cnxsets_pool.empty(): |
455 self.cnxsets.close() |
434 cnxset = self._cnxsets_pool.get_nowait() |
|
435 try: |
|
436 cnxset.close(True) |
|
437 except Exception: |
|
438 self.exception('error while closing %s' % cnxset) |
|
439 continue |
|
440 hits, misses = self.querier.cache_hit, self.querier.cache_miss |
456 hits, misses = self.querier.cache_hit, self.querier.cache_miss |
441 try: |
457 try: |
442 self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses, |
458 self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses, |
443 (hits * 100) / (hits + misses)) |
459 (hits * 100) / (hits + misses)) |
444 hits, misses = self.system_source.cache_hit, self.system_source.cache_miss |
460 hits, misses = self.system_source.cache_hit, self.system_source.cache_miss |