cubicweb/server/repository.py
changeset 12958 3667f6df1ec3
parent 12807 fb0936668535
child 12933 c74f378958be
equal deleted inserted replaced
12957:0c973204033a 12958:3667f6df1ec3
   141 
   141 
   142     def stop(self):
   142     def stop(self):
   143         pass
   143         pass
   144 
   144 
   145 
   145 
   146 class _CnxSetPool:
   146 class _BaseCnxSet:
       
   147 
       
   148     def __init__(self, source):
       
   149         self._source = source
       
   150 
       
   151     def qsize(self):
       
   152         return None
       
   153 
       
   154     def get(self):
       
   155         return self._source.wrapped_connection()
       
   156 
       
   157     def release(self, cnxset):
       
   158         cnxset.close(True)
       
   159 
       
   160     def __iter__(self):
       
   161         return
       
   162         yield
       
   163 
       
   164     def close(self):
       
   165         pass
       
   166 
       
   167 
       
   168 class _CnxSetPool(_BaseCnxSet):
   147 
   169 
   148     def __init__(self, source, size):
   170     def __init__(self, source, size):
       
   171         super().__init__(source)
   149         self._cnxsets = []
   172         self._cnxsets = []
   150 
   173         self._queue = queue.Queue()
   151         if size is not None:
   174 
   152             self._queue = queue.Queue()
   175         for i in range(size):
   153 
   176             cnxset = source.wrapped_connection()
   154             for i in range(size):
   177             self._cnxsets.append(cnxset)
   155                 cnxset = source.wrapped_connection()
   178             self._queue.put_nowait(cnxset)
   156                 self._cnxsets.append(cnxset)
       
   157                 self._queue.put_nowait(cnxset)
       
   158 
       
   159         else:
       
   160             self._queue = None
       
   161             self._source = source
       
   162 
   179 
   163     def qsize(self):
   180     def qsize(self):
   164         if self._queue is None:
       
   165             return None
       
   166 
       
   167         return self._queue.qsize()
   181         return self._queue.qsize()
   168 
   182 
   169     def get(self):
   183     def get(self):
   170         if self._queue is None:
       
   171             return self._source.wrapped_connection()
       
   172 
       
   173         try:
   184         try:
   174             return self._queue.get(True, timeout=5)
   185             return self._queue.get(True, timeout=5)
   175         except queue.Empty:
   186         except queue.Empty:
   176             raise Exception('no connections set available after 5 secs, probably either a '
   187             raise Exception('no connections set available after 5 secs, probably either a '
   177                             'bug in code (too many uncommited/rolled back '
   188                             'bug in code (too many uncommited/rolled back '
   178                             'connections) or too much load on the server (in '
   189                             'connections) or too much load on the server (in '
   179                             'which case you can try to set a bigger '
   190                             'which case you can try to set a bigger '
   180                             'connections pool size)')
   191                             'connections pool size)')
   181 
   192 
   182     def release(self, cnxset):
   193     def release(self, cnxset):
   183         if self._queue is None:
   194         self._queue.put_nowait(cnxset)
   184             cnxset.close(True)
       
   185         else:
       
   186             self._queue.put_nowait(cnxset)
       
   187 
   195 
   188     def __iter__(self):
   196     def __iter__(self):
   189         for cnxset in self._cnxsets:
   197         for cnxset in self._cnxsets:
   190             yield cnxset
   198             yield cnxset
   191 
   199 
   192     def close(self):
   200     def close(self):
   193         # XXX we don't close the connection when there is no queue?
   201         while not self._queue.empty():
   194         if self._queue is not None:
   202             cnxset = self._queue.get_nowait()
   195             while not self._queue.empty():
   203 
   196                 cnxset = self._queue.get_nowait()
   204             try:
   197 
   205                 cnxset.close(True)
   198                 try:
   206             except Exception as e:
   199                     cnxset.close(True)
   207                 self.exception('error while closing %s, error: %s' % (cnxset, e))
   200                 except Exception as e:
   208 
   201                     self.exception('error while closing %s, error: %s' % (cnxset, e))
   209 
       
   210 def get_cnxset(source, size):
       
   211     if not size:
       
   212         return _BaseCnxSet(source)
       
   213     return _CnxSetPool(source, size)
   202 
   214 
   203 
   215 
   204 class Repository(object):
   216 class Repository(object):
   205     """a repository provides access to a set of persistent storages for
   217     """a repository provides access to a set of persistent storages for
   206     entities and relations
   218     entities and relations
   244             pool_size, min_pool_size = config['connections-pool-size'], 1
   256             pool_size, min_pool_size = config['connections-pool-size'], 1
   245         else:
   257         else:
   246             pool_size = min_pool_size = None
   258             pool_size = min_pool_size = None
   247         # 0. init a cnxset that will be used to fetch bootstrap information from
   259         # 0. init a cnxset that will be used to fetch bootstrap information from
   248         #    the database
   260         #    the database
   249         self.cnxsets = _CnxSetPool(self.system_source, min_pool_size)
   261         self.cnxsets = get_cnxset(self.system_source, min_pool_size)
   250         # 1. set used cubes
   262         # 1. set used cubes
   251         if config.creating or not config.read_instance_schema:
   263         if config.creating or not config.read_instance_schema:
   252             config.bootstrap_cubes()
   264             config.bootstrap_cubes()
   253         else:
   265         else:
   254             self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   266             self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
   293             if 'CWProperty' in self.schema:
   305             if 'CWProperty' in self.schema:
   294                 self.vreg.init_properties(self.properties())
   306                 self.vreg.init_properties(self.properties())
   295         # 4. close initialization connection set and reopen fresh ones for
   307         # 4. close initialization connection set and reopen fresh ones for
   296         #    proper initialization
   308         #    proper initialization
   297         self.cnxsets.close()
   309         self.cnxsets.close()
   298         self.cnxsets = _CnxSetPool(self.system_source, pool_size)
   310         self.cnxsets = get_cnxset(self.system_source, pool_size)
   299         # 5. call instance level initialisation hooks
   311         # 5. call instance level initialisation hooks
   300         self.hm.call_hooks('server_startup', repo=self)
   312         self.hm.call_hooks('server_startup', repo=self)
   301 
   313 
   302     def source_by_uri(self, uri):
   314     def source_by_uri(self, uri):
   303         with self.internal_cnx() as cnx:
   315         with self.internal_cnx() as cnx: