server/repository.py
changeset 6427 c8a5ac2d1eaa
parent 6426 541659c39f6a
child 6464 11f9fbf6a645
equal deleted inserted replaced
6426:541659c39f6a 6427:c8a5ac2d1eaa
    37 from itertools import chain
    37 from itertools import chain
    38 from os.path import join
    38 from os.path import join
    39 from datetime import datetime
    39 from datetime import datetime
    40 from time import time, localtime, strftime
    40 from time import time, localtime, strftime
    41 
    41 
    42 from logilab.common.decorators import cached
    42 from logilab.common.decorators import cached, clear_cache
    43 from logilab.common.compat import any
    43 from logilab.common.compat import any
    44 from logilab.common import flatten
    44 from logilab.common import flatten
    45 
    45 
    46 from yams import BadSchemaDefinition
    46 from yams import BadSchemaDefinition
    47 from yams.schema import role_name
    47 from yams.schema import role_name
   120         # list of running threads
   120         # list of running threads
   121         self._running_threads = []
   121         self._running_threads = []
   122         # initial schema, should be build or replaced latter
   122         # initial schema, should be build or replaced latter
   123         self.schema = schema.CubicWebSchema(config.appid)
   123         self.schema = schema.CubicWebSchema(config.appid)
   124         self.vreg.schema = self.schema # until actual schema is loaded...
   124         self.vreg.schema = self.schema # until actual schema is loaded...
       
   125         # shutdown flag
       
   126         self.shutting_down = False
       
   127         # sources (additional sources info in the system database)
       
   128         self.system_source = self.get_source('native', 'system',
       
   129                                              config.sources()['system'])
       
   130         self.sources = [self.system_source]
       
   131         self.sources_by_uri = {'system': self.system_source}
   125         # querier helper, need to be created after sources initialization
   132         # querier helper, need to be created after sources initialization
   126         self.querier = querier.QuerierHelper(self, self.schema)
   133         self.querier = querier.QuerierHelper(self, self.schema)
   127         # sources
       
   128         self.sources = []
       
   129         self.sources_by_uri = {}
       
   130         # shutdown flag
       
   131         self.shutting_down = False
       
   132         # FIXME: store additional sources info in the system database ?
       
   133         # FIXME: sources should be ordered (add_entity priority)
       
   134         for uri, source_config in config.sources().items():
       
   135             if uri == 'admin':
       
   136                 # not an actual source
       
   137                 continue
       
   138             source = self.get_source(uri, source_config)
       
   139             self.sources_by_uri[uri] = source
       
   140             if config.source_enabled(uri):
       
   141                 self.sources.append(source)
       
   142         self.system_source = self.sources_by_uri['system']
       
   143         # ensure system source is the first one
       
   144         self.sources.remove(self.system_source)
       
   145         self.sources.insert(0, self.system_source)
       
   146         # cache eid -> type / source
   134         # cache eid -> type / source
   147         self._type_source_cache = {}
   135         self._type_source_cache = {}
   148         # cache (extid, source uri) -> eid
   136         # cache (extid, source uri) -> eid
   149         self._extid_cache = {}
   137         self._extid_cache = {}
   150         # open some connections pools
   138         # open some connections pools
   192             # test start: use the file system schema (quicker)
   180             # test start: use the file system schema (quicker)
   193             self.warning("set fs instance'schema")
   181             self.warning("set fs instance'schema")
   194             config.bootstrap_cubes()
   182             config.bootstrap_cubes()
   195             self.set_schema(config.load_schema())
   183             self.set_schema(config.load_schema())
   196         if not config.creating:
   184         if not config.creating:
       
   185             self.init_sources_from_database()
   197             if 'CWProperty' in self.schema:
   186             if 'CWProperty' in self.schema:
   198                 self.vreg.init_properties(self.properties())
   187                 self.vreg.init_properties(self.properties())
   199             # call source's init method to complete their initialisation if
   188             # call source's init method to complete their initialisation if
   200             # needed (for instance looking for persistent configuration using an
   189             # needed (for instance looking for persistent configuration using an
   201             # internal session, which is not possible until pools have been
   190             # internal session, which is not possible until pools have been
   208             for source in self.sources:
   197             for source in self.sources:
   209                 source.init_creating()
   198                 source.init_creating()
   210         # close initialization pool and reopen fresh ones for proper
   199         # close initialization pool and reopen fresh ones for proper
   211         # initialization now that we know cubes
   200         # initialization now that we know cubes
   212         self._get_pool().close(True)
   201         self._get_pool().close(True)
   213         # list of available pools (we can't iterated on Queue instance)
   202         # list of available pools (we can't iterate on Queue instance)
   214         self.pools = []
   203         self.pools = []
   215         for i in xrange(config['connections-pool-size']):
   204         for i in xrange(config['connections-pool-size']):
   216             self.pools.append(pool.ConnectionsPool(self.sources))
   205             self.pools.append(pool.ConnectionsPool(self.sources))
   217             self._available_pools.put_nowait(self.pools[-1])
   206             self._available_pools.put_nowait(self.pools[-1])
   218         if config.quick_start:
   207         if config.quick_start:
   219             config.init_cubes(self.get_cubes())
   208             config.init_cubes(self.get_cubes())
   220         self.hm = hook.HooksManager(self.vreg)
   209         self.hm = hook.HooksManager(self.vreg)
   221 
   210 
   222     # internals ###############################################################
   211     # internals ###############################################################
   223 
   212 
   224     def get_source(self, uri, source_config):
   213     def init_sources_from_database(self):
       
   214         self.sources_by_eid = {}
       
   215         if not 'CWSource' in self.schema:
       
   216             # 3.10 migration
       
   217             return
       
   218         session = self.internal_session()
       
   219         try:
       
   220             # FIXME: sources should be ordered (add_entity priority)
       
   221             for sourceent in session.execute(
       
   222                 'Any S, SN, SA, SC WHERE S is CWSource, '
       
   223                 'S name SN, S type SA, S config SC').entities():
       
   224                 if sourceent.name == 'system':
       
   225                     self.system_source.eid = sourceent.eid
       
   226                     self.sources_by_eid[sourceent.eid] = self.system_source
       
   227                     continue
       
   228                 self.add_source(sourceent, add_to_pools=False)
       
   229         finally:
       
   230             session.close()
       
   231 
       
   232     def _clear_planning_caches(self):
       
   233         for cache in ('source_defs', 'is_multi_sources_relation',
       
   234                       'can_cross_relation', 'rel_type_sources'):
       
   235             clear_cache(self, cache)
       
   236 
       
   237     def add_source(self, sourceent, add_to_pools=True):
       
   238         source = self.get_source(sourceent.type, sourceent.name,
       
   239                                  sourceent.host_config)
       
   240         source.eid = sourceent.eid
       
   241         self.sources_by_eid[sourceent.eid] = source
       
   242         self.sources_by_uri[sourceent.name] = source
       
   243         if self.config.source_enabled(source):
       
   244             self.sources.append(source)
       
   245             self.querier.set_planner()
       
   246             if add_to_pools:
       
   247                 for pool in self.pools:
       
   248                     pool.add_source(source)
       
   249         self._clear_planning_caches()
       
   250 
       
   251     def remove_source(self, uri):
       
   252         source = self.sources_by_uri.pop(uri)
       
   253         del self.sources_by_eid[source.eid]
       
   254         if self.config.source_enabled(source):
       
   255             self.sources.remove(source)
       
   256             self.querier.set_planner()
       
   257             for pool in self.pools:
       
   258                 pool.remove_source(source)
       
   259         self._clear_planning_caches()
       
   260 
       
   261     def get_source(self, type, uri, source_config):
       
   262         # set uri and type in source config so it's available through
       
   263         # source_defs()
   225         source_config['uri'] = uri
   264         source_config['uri'] = uri
   226         return sources.get_source(source_config, self.schema, self)
   265         source_config['type'] = type
       
   266         return sources.get_source(type, source_config, self)
   227 
   267 
   228     def set_schema(self, schema, resetvreg=True, rebuildinfered=True):
   268     def set_schema(self, schema, resetvreg=True, rebuildinfered=True):
   229         if rebuildinfered:
   269         if rebuildinfered:
   230             schema.rebuild_infered_relations()
   270             schema.rebuild_infered_relations()
   231         self.info('set schema %s %#x', schema.name, id(schema))
   271         self.info('set schema %s %#x', schema.name, id(schema))
   523         """Return the a dictionary containing source uris as value and a
   563         """Return the a dictionary containing source uris as value and a
   524         dictionary describing each source as value.
   564         dictionary describing each source as value.
   525 
   565 
   526         This is a public method, not requiring a session id.
   566         This is a public method, not requiring a session id.
   527         """
   567         """
   528         sources = self.config.sources().copy()
   568         sources = {}
   529         # remove manager information
       
   530         sources.pop('admin', None)
       
   531         # remove sensitive information
   569         # remove sensitive information
   532         for uri, sourcedef in sources.iteritems():
   570         for uri, source in self.sources_by_uri.iteritems():
   533             sourcedef = sourcedef.copy()
   571             sources[uri] = source.cfg
   534             self.sources_by_uri[uri].remove_sensitive_information(sourcedef)
       
   535             sources[uri] = sourcedef
       
   536         return sources
   572         return sources
   537 
   573 
   538     def properties(self):
   574     def properties(self):
   539         """Return a result set containing system wide properties.
   575         """Return a result set containing system wide properties.
   540 
   576 
  1014             # XXX call add_info with complete=False ?
  1050             # XXX call add_info with complete=False ?
  1015             self.add_info(session, entity, source, extid)
  1051             self.add_info(session, entity, source, extid)
  1016             source.after_entity_insertion(session, extid, entity)
  1052             source.after_entity_insertion(session, extid, entity)
  1017             if source.should_call_hooks:
  1053             if source.should_call_hooks:
  1018                 self.hm.call_hooks('after_add_entity', session, entity=entity)
  1054                 self.hm.call_hooks('after_add_entity', session, entity=entity)
  1019             else:
       
  1020                 # minimal meta-data
       
  1021                 session.execute('SET X is E WHERE X eid %(x)s, E name %(name)s',
       
  1022                                 {'x': entity.eid, 'name': entity.__regid__})
       
  1023             session.commit(reset_pool)
  1055             session.commit(reset_pool)
  1024             return eid
  1056             return eid
  1025         except:
  1057         except:
  1026             session.rollback(reset_pool)
  1058             session.rollback(reset_pool)
  1027             raise
  1059             raise