37 from itertools import chain |
37 from itertools import chain |
38 from os.path import join |
38 from os.path import join |
39 from datetime import datetime |
39 from datetime import datetime |
40 from time import time, localtime, strftime |
40 from time import time, localtime, strftime |
41 |
41 |
42 from logilab.common.decorators import cached |
42 from logilab.common.decorators import cached, clear_cache |
43 from logilab.common.compat import any |
43 from logilab.common.compat import any |
44 from logilab.common import flatten |
44 from logilab.common import flatten |
45 |
45 |
46 from yams import BadSchemaDefinition |
46 from yams import BadSchemaDefinition |
47 from yams.schema import role_name |
47 from yams.schema import role_name |
120 # list of running threads |
120 # list of running threads |
121 self._running_threads = [] |
121 self._running_threads = [] |
122 # initial schema, should be build or replaced latter |
122 # initial schema, should be build or replaced latter |
123 self.schema = schema.CubicWebSchema(config.appid) |
123 self.schema = schema.CubicWebSchema(config.appid) |
124 self.vreg.schema = self.schema # until actual schema is loaded... |
124 self.vreg.schema = self.schema # until actual schema is loaded... |
|
125 # shutdown flag |
|
126 self.shutting_down = False |
|
127 # sources (additional sources info in the system database) |
|
128 self.system_source = self.get_source('native', 'system', |
|
129 config.sources()['system']) |
|
130 self.sources = [self.system_source] |
|
131 self.sources_by_uri = {'system': self.system_source} |
125 # querier helper, need to be created after sources initialization |
132 # querier helper, need to be created after sources initialization |
126 self.querier = querier.QuerierHelper(self, self.schema) |
133 self.querier = querier.QuerierHelper(self, self.schema) |
127 # sources |
|
128 self.sources = [] |
|
129 self.sources_by_uri = {} |
|
130 # shutdown flag |
|
131 self.shutting_down = False |
|
132 # FIXME: store additional sources info in the system database ? |
|
133 # FIXME: sources should be ordered (add_entity priority) |
|
134 for uri, source_config in config.sources().items(): |
|
135 if uri == 'admin': |
|
136 # not an actual source |
|
137 continue |
|
138 source = self.get_source(uri, source_config) |
|
139 self.sources_by_uri[uri] = source |
|
140 if config.source_enabled(uri): |
|
141 self.sources.append(source) |
|
142 self.system_source = self.sources_by_uri['system'] |
|
143 # ensure system source is the first one |
|
144 self.sources.remove(self.system_source) |
|
145 self.sources.insert(0, self.system_source) |
|
146 # cache eid -> type / source |
134 # cache eid -> type / source |
147 self._type_source_cache = {} |
135 self._type_source_cache = {} |
148 # cache (extid, source uri) -> eid |
136 # cache (extid, source uri) -> eid |
149 self._extid_cache = {} |
137 self._extid_cache = {} |
150 # open some connections pools |
138 # open some connections pools |
192 # test start: use the file system schema (quicker) |
180 # test start: use the file system schema (quicker) |
193 self.warning("set fs instance'schema") |
181 self.warning("set fs instance'schema") |
194 config.bootstrap_cubes() |
182 config.bootstrap_cubes() |
195 self.set_schema(config.load_schema()) |
183 self.set_schema(config.load_schema()) |
196 if not config.creating: |
184 if not config.creating: |
|
185 self.init_sources_from_database() |
197 if 'CWProperty' in self.schema: |
186 if 'CWProperty' in self.schema: |
198 self.vreg.init_properties(self.properties()) |
187 self.vreg.init_properties(self.properties()) |
199 # call source's init method to complete their initialisation if |
188 # call source's init method to complete their initialisation if |
200 # needed (for instance looking for persistent configuration using an |
189 # needed (for instance looking for persistent configuration using an |
201 # internal session, which is not possible until pools have been |
190 # internal session, which is not possible until pools have been |
208 for source in self.sources: |
197 for source in self.sources: |
209 source.init_creating() |
198 source.init_creating() |
210 # close initialization pool and reopen fresh ones for proper |
199 # close initialization pool and reopen fresh ones for proper |
211 # initialization now that we know cubes |
200 # initialization now that we know cubes |
212 self._get_pool().close(True) |
201 self._get_pool().close(True) |
213 # list of available pools (we can't iterated on Queue instance) |
202 # list of available pools (we can't iterate on Queue instance) |
214 self.pools = [] |
203 self.pools = [] |
215 for i in xrange(config['connections-pool-size']): |
204 for i in xrange(config['connections-pool-size']): |
216 self.pools.append(pool.ConnectionsPool(self.sources)) |
205 self.pools.append(pool.ConnectionsPool(self.sources)) |
217 self._available_pools.put_nowait(self.pools[-1]) |
206 self._available_pools.put_nowait(self.pools[-1]) |
218 if config.quick_start: |
207 if config.quick_start: |
219 config.init_cubes(self.get_cubes()) |
208 config.init_cubes(self.get_cubes()) |
220 self.hm = hook.HooksManager(self.vreg) |
209 self.hm = hook.HooksManager(self.vreg) |
221 |
210 |
222 # internals ############################################################### |
211 # internals ############################################################### |
223 |
212 |
224 def get_source(self, uri, source_config): |
213 def init_sources_from_database(self): |
|
214 self.sources_by_eid = {} |
|
215 if not 'CWSource' in self.schema: |
|
216 # 3.10 migration |
|
217 return |
|
218 session = self.internal_session() |
|
219 try: |
|
220 # FIXME: sources should be ordered (add_entity priority) |
|
221 for sourceent in session.execute( |
|
222 'Any S, SN, SA, SC WHERE S is CWSource, ' |
|
223 'S name SN, S type SA, S config SC').entities(): |
|
224 if sourceent.name == 'system': |
|
225 self.system_source.eid = sourceent.eid |
|
226 self.sources_by_eid[sourceent.eid] = self.system_source |
|
227 continue |
|
228 self.add_source(sourceent, add_to_pools=False) |
|
229 finally: |
|
230 session.close() |
|
231 |
|
232 def _clear_planning_caches(self): |
|
233 for cache in ('source_defs', 'is_multi_sources_relation', |
|
234 'can_cross_relation', 'rel_type_sources'): |
|
235 clear_cache(self, cache) |
|
236 |
|
237 def add_source(self, sourceent, add_to_pools=True): |
|
238 source = self.get_source(sourceent.type, sourceent.name, |
|
239 sourceent.host_config) |
|
240 source.eid = sourceent.eid |
|
241 self.sources_by_eid[sourceent.eid] = source |
|
242 self.sources_by_uri[sourceent.name] = source |
|
243 if self.config.source_enabled(source): |
|
244 self.sources.append(source) |
|
245 self.querier.set_planner() |
|
246 if add_to_pools: |
|
247 for pool in self.pools: |
|
248 pool.add_source(source) |
|
249 self._clear_planning_caches() |
|
250 |
|
251 def remove_source(self, uri): |
|
252 source = self.sources_by_uri.pop(uri) |
|
253 del self.sources_by_eid[source.eid] |
|
254 if self.config.source_enabled(source): |
|
255 self.sources.remove(source) |
|
256 self.querier.set_planner() |
|
257 for pool in self.pools: |
|
258 pool.remove_source(source) |
|
259 self._clear_planning_caches() |
|
260 |
|
261 def get_source(self, type, uri, source_config): |
|
262 # set uri and type in source config so it's available through |
|
263 # source_defs() |
225 source_config['uri'] = uri |
264 source_config['uri'] = uri |
226 return sources.get_source(source_config, self.schema, self) |
265 source_config['type'] = type |
|
266 return sources.get_source(type, source_config, self) |
227 |
267 |
228 def set_schema(self, schema, resetvreg=True, rebuildinfered=True): |
268 def set_schema(self, schema, resetvreg=True, rebuildinfered=True): |
229 if rebuildinfered: |
269 if rebuildinfered: |
230 schema.rebuild_infered_relations() |
270 schema.rebuild_infered_relations() |
231 self.info('set schema %s %#x', schema.name, id(schema)) |
271 self.info('set schema %s %#x', schema.name, id(schema)) |
523 """Return the a dictionary containing source uris as value and a |
563 """Return the a dictionary containing source uris as value and a |
524 dictionary describing each source as value. |
564 dictionary describing each source as value. |
525 |
565 |
526 This is a public method, not requiring a session id. |
566 This is a public method, not requiring a session id. |
527 """ |
567 """ |
528 sources = self.config.sources().copy() |
568 sources = {} |
529 # remove manager information |
|
530 sources.pop('admin', None) |
|
531 # remove sensitive information |
569 # remove sensitive information |
532 for uri, sourcedef in sources.iteritems(): |
570 for uri, source in self.sources_by_uri.iteritems(): |
533 sourcedef = sourcedef.copy() |
571 sources[uri] = source.cfg |
534 self.sources_by_uri[uri].remove_sensitive_information(sourcedef) |
|
535 sources[uri] = sourcedef |
|
536 return sources |
572 return sources |
537 |
573 |
538 def properties(self): |
574 def properties(self): |
539 """Return a result set containing system wide properties. |
575 """Return a result set containing system wide properties. |
540 |
576 |
1014 # XXX call add_info with complete=False ? |
1050 # XXX call add_info with complete=False ? |
1015 self.add_info(session, entity, source, extid) |
1051 self.add_info(session, entity, source, extid) |
1016 source.after_entity_insertion(session, extid, entity) |
1052 source.after_entity_insertion(session, extid, entity) |
1017 if source.should_call_hooks: |
1053 if source.should_call_hooks: |
1018 self.hm.call_hooks('after_add_entity', session, entity=entity) |
1054 self.hm.call_hooks('after_add_entity', session, entity=entity) |
1019 else: |
|
1020 # minimal meta-data |
|
1021 session.execute('SET X is E WHERE X eid %(x)s, E name %(name)s', |
|
1022 {'x': entity.eid, 'name': entity.__regid__}) |
|
1023 session.commit(reset_pool) |
1055 session.commit(reset_pool) |
1024 return eid |
1056 return eid |
1025 except: |
1057 except: |
1026 session.rollback(reset_pool) |
1058 session.rollback(reset_pool) |
1027 raise |
1059 raise |