141 |
141 |
142 def stop(self): |
142 def stop(self): |
143 pass |
143 pass |
144 |
144 |
145 |
145 |
146 class _CnxSetPool: |
146 class _BaseCnxSet: |
|
147 |
|
148 def __init__(self, source): |
|
149 self._source = source |
|
150 |
|
151 def qsize(self): |
|
152 return None |
|
153 |
|
154 def get(self): |
|
155 return self._source.wrapped_connection() |
|
156 |
|
157 def release(self, cnxset): |
|
158 cnxset.close(True) |
|
159 |
|
160 def __iter__(self): |
|
161 return |
|
162 yield |
|
163 |
|
164 def close(self): |
|
165 pass |
|
166 |
|
167 |
|
168 class _CnxSetPool(_BaseCnxSet): |
147 |
169 |
148 def __init__(self, source, size): |
170 def __init__(self, source, size): |
|
171 super().__init__(source) |
149 self._cnxsets = [] |
172 self._cnxsets = [] |
150 |
173 self._queue = queue.Queue() |
151 if size is not None: |
174 |
152 self._queue = queue.Queue() |
175 for i in range(size): |
153 |
176 cnxset = source.wrapped_connection() |
154 for i in range(size): |
177 self._cnxsets.append(cnxset) |
155 cnxset = source.wrapped_connection() |
178 self._queue.put_nowait(cnxset) |
156 self._cnxsets.append(cnxset) |
|
157 self._queue.put_nowait(cnxset) |
|
158 |
|
159 else: |
|
160 self._queue = None |
|
161 self._source = source |
|
162 |
179 |
163 def qsize(self): |
180 def qsize(self): |
164 if self._queue is None: |
|
165 return None |
|
166 |
|
167 return self._queue.qsize() |
181 return self._queue.qsize() |
168 |
182 |
169 def get(self): |
183 def get(self): |
170 if self._queue is None: |
|
171 return self._source.wrapped_connection() |
|
172 |
|
173 try: |
184 try: |
174 return self._queue.get(True, timeout=5) |
185 return self._queue.get(True, timeout=5) |
175 except queue.Empty: |
186 except queue.Empty: |
176 raise Exception('no connections set available after 5 secs, probably either a ' |
187 raise Exception('no connections set available after 5 secs, probably either a ' |
177 'bug in code (too many uncommited/rolled back ' |
188 'bug in code (too many uncommited/rolled back ' |
178 'connections) or too much load on the server (in ' |
189 'connections) or too much load on the server (in ' |
179 'which case you can try to set a bigger ' |
190 'which case you can try to set a bigger ' |
180 'connections pool size)') |
191 'connections pool size)') |
181 |
192 |
182 def release(self, cnxset): |
193 def release(self, cnxset): |
183 if self._queue is None: |
194 self._queue.put_nowait(cnxset) |
184 cnxset.close(True) |
|
185 else: |
|
186 self._queue.put_nowait(cnxset) |
|
187 |
195 |
188 def __iter__(self): |
196 def __iter__(self): |
189 for cnxset in self._cnxsets: |
197 for cnxset in self._cnxsets: |
190 yield cnxset |
198 yield cnxset |
191 |
199 |
192 def close(self): |
200 def close(self): |
193 # XXX we don't close the connection when there is no queue? |
201 while not self._queue.empty(): |
194 if self._queue is not None: |
202 cnxset = self._queue.get_nowait() |
195 while not self._queue.empty(): |
203 |
196 cnxset = self._queue.get_nowait() |
204 try: |
197 |
205 cnxset.close(True) |
198 try: |
206 except Exception as e: |
199 cnxset.close(True) |
207 self.exception('error while closing %s, error: %s' % (cnxset, e)) |
200 except Exception as e: |
208 |
201 self.exception('error while closing %s, error: %s' % (cnxset, e)) |
209 |
|
210 def get_cnxset(source, size): |
|
211 if not size: |
|
212 return _BaseCnxSet(source) |
|
213 return _CnxSetPool(source, size) |
202 |
214 |
203 |
215 |
204 class Repository(object): |
216 class Repository(object): |
205 """a repository provides access to a set of persistent storages for |
217 """a repository provides access to a set of persistent storages for |
206 entities and relations |
218 entities and relations |
244 pool_size, min_pool_size = config['connections-pool-size'], 1 |
256 pool_size, min_pool_size = config['connections-pool-size'], 1 |
245 else: |
257 else: |
246 pool_size = min_pool_size = None |
258 pool_size = min_pool_size = None |
247 # 0. init a cnxset that will be used to fetch bootstrap information from |
259 # 0. init a cnxset that will be used to fetch bootstrap information from |
248 # the database |
260 # the database |
249 self.cnxsets = _CnxSetPool(self.system_source, min_pool_size) |
261 self.cnxsets = get_cnxset(self.system_source, min_pool_size) |
250 # 1. set used cubes |
262 # 1. set used cubes |
251 if config.creating or not config.read_instance_schema: |
263 if config.creating or not config.read_instance_schema: |
252 config.bootstrap_cubes() |
264 config.bootstrap_cubes() |
253 else: |
265 else: |
254 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
266 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
293 if 'CWProperty' in self.schema: |
305 if 'CWProperty' in self.schema: |
294 self.vreg.init_properties(self.properties()) |
306 self.vreg.init_properties(self.properties()) |
295 # 4. close initialization connection set and reopen fresh ones for |
307 # 4. close initialization connection set and reopen fresh ones for |
296 # proper initialization |
308 # proper initialization |
297 self.cnxsets.close() |
309 self.cnxsets.close() |
298 self.cnxsets = _CnxSetPool(self.system_source, pool_size) |
310 self.cnxsets = get_cnxset(self.system_source, pool_size) |
299 # 5. call instance level initialisation hooks |
311 # 5. call instance level initialisation hooks |
300 self.hm.call_hooks('server_startup', repo=self) |
312 self.hm.call_hooks('server_startup', repo=self) |
301 |
313 |
302 def source_by_uri(self, uri): |
314 def source_by_uri(self, uri): |
303 with self.internal_cnx() as cnx: |
315 with self.internal_cnx() as cnx: |