server/repository.py
changeset 8320 cd2d332b3063
parent 8280 17c588eca3c2
child 8321 b5d5a5630649
equal deleted inserted replaced
8318:e8a2fd7d9606 8320:cd2d332b3063
   118             session.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
   118             session.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
   119                             'NOT X eid %%(x)s' % rtype,
   119                             'NOT X eid %%(x)s' % rtype,
   120                             {'x': eidfrom, 'y': eidto})
   120                             {'x': eidfrom, 'y': eidto})
   121 
   121 
   122 
   122 
       
   123 
   123 class NullEventBus(object):
   124 class NullEventBus(object):
   124     def publish(self, msg):
   125     def publish(self, msg):
   125         pass
   126         pass
   126 
   127 
   127     def add_subscription(self, topic, callback):
   128     def add_subscription(self, topic, callback):
   144     def __init__(self, config, vreg=None):
   145     def __init__(self, config, vreg=None):
   145         self.config = config
   146         self.config = config
   146         if vreg is None:
   147         if vreg is None:
   147             vreg = cwvreg.CWRegistryStore(config)
   148             vreg = cwvreg.CWRegistryStore(config)
   148         self.vreg = vreg
   149         self.vreg = vreg
       
   150         self._tasks_manager = utils.TasksManager()
       
   151 
   149         self.pyro_registered = False
   152         self.pyro_registered = False
   150         self.pyro_uri = None
   153         self.pyro_uri = None
   151         self.app_instances_bus = NullEventBus()
   154         self.app_instances_bus = NullEventBus()
   152         self.info('starting repository from %s', self.config.apphome)
   155         self.info('starting repository from %s', self.config.apphome)
   153         # dictionary of opened sessions
   156         # dictionary of opened sessions
   154         self._sessions = {}
   157         self._sessions = {}
       
   158 
       
   159 
   155         # list of functions to be called at regular interval
   160         # list of functions to be called at regular interval
   156         self._looping_tasks = []
       
   157         # list of running threads
   161         # list of running threads
   158         self._running_threads = []
   162         self._running_threads = []
   159         # initial schema, should be build or replaced latter
   163         # initial schema, should be build or replaced latter
   160         self.schema = schema.CubicWebSchema(config.appid)
   164         self.schema = schema.CubicWebSchema(config.appid)
   161         self.vreg.schema = self.schema # until actual schema is loaded...
   165         self.vreg.schema = self.schema # until actual schema is loaded...
   357             # register a task to cleanup expired session
   361             # register a task to cleanup expired session
   358             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   362             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
   359             assert self.cleanup_session_time > 0
   363             assert self.cleanup_session_time > 0
   360             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
   364             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
   361             self.looping_task(cleanup_session_interval, self.clean_sessions)
   365             self.looping_task(cleanup_session_interval, self.clean_sessions)
   362         assert isinstance(self._looping_tasks, list), 'already started'
   366         self._tasks_manager.start()
   363         for i, (interval, func, args) in enumerate(self._looping_tasks):
       
   364             self._looping_tasks[i] = task = utils.LoopTask(self, interval, func, args)
       
   365             self.info('starting task %s with interval %.2fs', task.name,
       
   366                       interval)
       
   367             task.start()
       
   368         # ensure no tasks will be further added
       
   369         self._looping_tasks = tuple(self._looping_tasks)
       
   370 
   367 
   371     def looping_task(self, interval, func, *args):
   368     def looping_task(self, interval, func, *args):
   372         """register a function to be called every `interval` seconds.
   369         """register a function to be called every `interval` seconds.
   373 
   370 
   374         looping tasks can only be registered during repository initialization,
   371         looping tasks can only be registered during repository initialization,
   375         once done this method will fail.
   372         once done this method will fail.
   376         """
   373         """
   377         try:
   374         self._tasks_manager.add_looping_task(interval, func, *args)
   378             self._looping_tasks.append( (interval, func, args) )
       
   379         except AttributeError:
       
   380             raise RuntimeError("can't add looping task once the repository is started")
       
   381 
   375 
   382     def threaded_task(self, func):
   376     def threaded_task(self, func):
   383         """start function in a separated thread"""
   377         """start function in a separated thread"""
   384         t = utils.RepoThread(func, self._running_threads)
   378         utils.RepoThread(func, self._running_threads).start()
   385         t.start()
       
   386 
   379 
   387     #@locked
   380     #@locked
   388     def _get_cnxset(self):
   381     def _get_cnxset(self):
   389         try:
   382         try:
   390             return self._cnxsets_pool.get(True, timeout=5)
   383             return self._cnxsets_pool.get(True, timeout=5)
   410         connections
   403         connections
   411         """
   404         """
   412         assert not self.shutting_down, 'already shutting down'
   405         assert not self.shutting_down, 'already shutting down'
   413         self.shutting_down = True
   406         self.shutting_down = True
   414         self.system_source.shutdown()
   407         self.system_source.shutdown()
   415         if isinstance(self._looping_tasks, tuple): # if tasks have been started
   408         self._tasks_manager.stop()
   416             for looptask in self._looping_tasks:
       
   417                 self.info('canceling task %s...', looptask.name)
       
   418                 looptask.cancel()
       
   419                 looptask.join()
       
   420                 self.info('task %s finished', looptask.name)
       
   421         for thread in self._running_threads:
   409         for thread in self._running_threads:
   422             self.info('waiting thread %s...', thread.getName())
   410             self.info('waiting thread %s...', thread.getName())
   423             thread.join()
   411             thread.join()
   424             self.info('thread %s finished', thread.getName())
   412             self.info('thread %s finished', thread.getName())
   425         if not (self.config.creating or self.config.repairing
   413         if not (self.config.creating or self.config.repairing
   515         results['type_source_cache_size'] = len(self._type_source_cache)
   503         results['type_source_cache_size'] = len(self._type_source_cache)
   516         results['extid_cache_size'] = len(self._extid_cache)
   504         results['extid_cache_size'] = len(self._extid_cache)
   517         results['sql_no_cache'] = self.system_source.no_cache
   505         results['sql_no_cache'] = self.system_source.no_cache
   518         results['nb_open_sessions'] = len(self._sessions)
   506         results['nb_open_sessions'] = len(self._sessions)
   519         results['nb_active_threads'] = threading.activeCount()
   507         results['nb_active_threads'] = threading.activeCount()
   520         results['looping_tasks'] = ', '.join(str(t) for t in self._looping_tasks)
   508         looping_tasks = self._tasks_manager._looping_tasks
       
   509         results['looping_tasks'] = ', '.join(str(t) for t in looping_tasks)
   521         results['available_cnxsets'] = self._cnxsets_pool.qsize()
   510         results['available_cnxsets'] = self._cnxsets_pool.qsize()
   522         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
   511         results['threads'] = ', '.join(sorted(str(t) for t in threading.enumerate()))
   523         return results
   512         return results
   524 
   513 
   525     def gc_stats(self, nmax=20):
   514     def gc_stats(self, nmax=20):