equal
deleted
inserted
replaced
123 # querier helper, need to be created after sources initialization |
123 # querier helper, need to be created after sources initialization |
124 self.querier = querier.QuerierHelper(self, self.schema) |
124 self.querier = querier.QuerierHelper(self, self.schema) |
125 # sources |
125 # sources |
126 self.sources = [] |
126 self.sources = [] |
127 self.sources_by_uri = {} |
127 self.sources_by_uri = {} |
|
128 # shutdown flag |
|
129 self.shutting_down = False |
128 # FIXME: store additional sources info in the system database ? |
130 # FIXME: store additional sources info in the system database ? |
129 # FIXME: sources should be ordered (add_entity priority) |
131 # FIXME: sources should be ordered (add_entity priority) |
130 for uri, source_config in config.sources().items(): |
132 for uri, source_config in config.sources().items(): |
131 if uri == 'admin': |
133 if uri == 'admin': |
132 # not an actual source |
134 # not an actual source |
213 # list of available pools (we can't iterated on Queue instance) |
215 # list of available pools (we can't iterated on Queue instance) |
214 self.pools = [] |
216 self.pools = [] |
215 for i in xrange(config['connections-pool-size']): |
217 for i in xrange(config['connections-pool-size']): |
216 self.pools.append(pool.ConnectionsPool(self.sources)) |
218 self.pools.append(pool.ConnectionsPool(self.sources)) |
217 self._available_pools.put_nowait(self.pools[-1]) |
219 self._available_pools.put_nowait(self.pools[-1]) |
218 self._shutting_down = False |
|
219 if config.quick_start: |
220 if config.quick_start: |
220 config.init_cubes(self.get_cubes()) |
221 config.init_cubes(self.get_cubes()) |
221 self.hm = hook.HooksManager(self.vreg) |
222 self.hm = hook.HooksManager(self.vreg) |
222 |
223 |
223 # internals ############################################################### |
224 # internals ############################################################### |
321 threading.currentThread()) |
322 threading.currentThread()) |
322 def shutdown(self): |
323 def shutdown(self): |
323 """called on server stop event to properly close opened sessions and |
324 """called on server stop event to properly close opened sessions and |
324 connections |
325 connections |
325 """ |
326 """ |
326 assert not self._shutting_down, 'already shutting down' |
327 assert not self.shutting_down, 'already shutting down' |
327 self._shutting_down = True |
328 self.shutting_down = True |
328 self.system_source.shutdown() |
329 self.system_source.shutdown() |
329 if isinstance(self._looping_tasks, tuple): # if tasks have been started |
330 if isinstance(self._looping_tasks, tuple): # if tasks have been started |
330 for looptask in self._looping_tasks: |
331 for looptask in self._looping_tasks: |
331 self.info('canceling task %s...', looptask.name) |
332 self.info('canceling task %s...', looptask.name) |
332 looptask.cancel() |
333 looptask.cancel() |
798 return session |
799 return session |
799 |
800 |
800 def _get_session(self, sessionid, setpool=False, txid=None, |
801 def _get_session(self, sessionid, setpool=False, txid=None, |
801 checkshuttingdown=True): |
802 checkshuttingdown=True): |
802 """return the user associated to the given session identifier""" |
803 """return the user associated to the given session identifier""" |
803 if checkshuttingdown and self._shutting_down: |
804 if checkshuttingdown and self.shutting_down: |
804 raise Exception('Repository is shutting down') |
805 raise Exception('Repository is shutting down') |
805 try: |
806 try: |
806 session = self._sessions[sessionid] |
807 session = self._sessions[sessionid] |
807 except KeyError: |
808 except KeyError: |
808 raise BadConnectionId('No such session %s' % sessionid) |
809 raise BadConnectionId('No such session %s' % sessionid) |