31 from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, CW_EVENT_MANAGER, |
31 from cubicweb import (CW_SOFTWARE_ROOT, CW_MIGRATION_MAP, CW_EVENT_MANAGER, |
32 UnknownEid, AuthenticationError, ExecutionError, |
32 UnknownEid, AuthenticationError, ExecutionError, |
33 ETypeNotSupportedBySources, RTypeNotSupportedBySources, |
33 ETypeNotSupportedBySources, RTypeNotSupportedBySources, |
34 BadConnectionId, Unauthorized, ValidationError, |
34 BadConnectionId, Unauthorized, ValidationError, |
35 typed_eid) |
35 typed_eid) |
36 from cubicweb.cwvreg import CubicWebVRegistry |
36 from cubicweb import cwvreg, schema, server |
37 from cubicweb.schema import VIRTUAL_RTYPES, CubicWebSchema |
37 from cubicweb.server import utils, hook, pool, querier, sources |
38 from cubicweb import server |
|
39 from cubicweb.server.utils import RepoThread, LoopTask |
|
40 from cubicweb.server.pool import ConnectionsPool, LateOperation, SingleLastOperation |
|
41 from cubicweb.server.session import Session, InternalSession |
38 from cubicweb.server.session import Session, InternalSession |
42 from cubicweb.server.querier import QuerierHelper |
39 |
43 from cubicweb.server.sources import get_source |
40 |
44 from cubicweb.server.hookhelper import rproperty |
41 class CleanupEidTypeCacheOp(hook.SingleLastOperation): |
45 |
|
46 |
|
47 class CleanupEidTypeCacheOp(SingleLastOperation): |
|
48 """on rollback of a insert query or commit of delete query, we have to |
42 """on rollback of a insert query or commit of delete query, we have to |
49 clear repository's cache from no more valid entries |
43 clear repository's cache from no more valid entries |
50 |
44 |
51 NOTE: querier's rqlst/solutions cache may have been polluted too with |
45 NOTE: querier's rqlst/solutions cache may have been polluted too with |
52 queries such as Any X WHERE X eid 32 if 32 has been rollbacked however |
46 queries such as Any X WHERE X eid 32 if 32 has been rollbacked however |
72 self.repo.clear_caches(self.session.transaction_data['neweids']) |
66 self.repo.clear_caches(self.session.transaction_data['neweids']) |
73 except KeyError: |
67 except KeyError: |
74 pass |
68 pass |
75 |
69 |
76 |
70 |
77 class FTIndexEntityOp(LateOperation): |
71 class FTIndexEntityOp(hook.LateOperation): |
78 """operation to delay entity full text indexation to commit |
72 """operation to delay entity full text indexation to commit |
79 |
73 |
80 since fti indexing may trigger discovery of other entities, it should be |
74 since fti indexing may trigger discovery of other entities, it should be |
81 triggered on precommit, not commit, and this should be done after other |
75 triggered on precommit, not commit, and this should be done after other |
82 precommit operation which may add relations to the entity |
76 precommit operation which may add relations to the entity |
105 """ |
99 """ |
106 # skip delete queries (only?) if session is an internal session. This is |
100 # skip delete queries (only?) if session is an internal session. This is |
107 # hooks responsability to ensure they do not violate relation's cardinality |
101 # hooks responsability to ensure they do not violate relation's cardinality |
108 if session.is_super_session: |
102 if session.is_super_session: |
109 return |
103 return |
110 card = rproperty(session, rtype, eidfrom, eidto, 'cardinality') |
104 card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality') |
111 # one may be tented to check for neweids but this may cause more than one |
105 # one may be tented to check for neweids but this may cause more than one |
112 # relation even with '1?' cardinality if thoses relations are added in the |
106 # relation even with '1?' cardinality if thoses relations are added in the |
113 # same transaction where the entity is being created. This never occurs from |
107 # same transaction where the entity is being created. This never occurs from |
114 # the web interface but may occurs during test or dbapi connection (though |
108 # the web interface but may occurs during test or dbapi connection (though |
115 # not expected for this). So: don't do it, we pretend to ensure repository |
109 # not expected for this). So: don't do it, we pretend to ensure repository |
134 """ |
128 """ |
135 |
129 |
136 def __init__(self, config, vreg=None, debug=False): |
130 def __init__(self, config, vreg=None, debug=False): |
137 self.config = config |
131 self.config = config |
138 if vreg is None: |
132 if vreg is None: |
139 vreg = CubicWebVRegistry(config, debug) |
133 vreg = cwvreg.CubicWebVRegistry(config, debug) |
140 self.vreg = vreg |
134 self.vreg = vreg |
141 self.pyro_registered = False |
135 self.pyro_registered = False |
142 self.info('starting repository from %s', self.config.apphome) |
136 self.info('starting repository from %s', self.config.apphome) |
143 # dictionary of opened sessions |
137 # dictionary of opened sessions |
144 self._sessions = {} |
138 self._sessions = {} |
145 # list of functions to be called at regular interval |
139 # list of functions to be called at regular interval |
146 self._looping_tasks = [] |
140 self._looping_tasks = [] |
147 # list of running threads |
141 # list of running threads |
148 self._running_threads = [] |
142 self._running_threads = [] |
149 # initial schema, should be build or replaced latter |
143 # initial schema, should be build or replaced latter |
150 self.schema = CubicWebSchema(config.appid) |
144 self.schema = schema.CubicWebSchema(config.appid) |
151 # querier helper, need to be created after sources initialization |
145 # querier helper, need to be created after sources initialization |
152 self.querier = QuerierHelper(self, self.schema) |
146 self.querier = querier.QuerierHelper(self, self.schema) |
153 # should we reindex in changes? |
147 # should we reindex in changes? |
154 self.do_fti = not config['delay-full-text-indexation'] |
148 self.do_fti = not config['delay-full-text-indexation'] |
155 # sources |
149 # sources |
156 self.sources = [] |
150 self.sources = [] |
157 self.sources_by_uri = {} |
151 self.sources_by_uri = {} |
172 self._type_source_cache = {} |
166 self._type_source_cache = {} |
173 # cache (extid, source uri) -> eid |
167 # cache (extid, source uri) -> eid |
174 self._extid_cache = {} |
168 self._extid_cache = {} |
175 # open some connections pools |
169 # open some connections pools |
176 self._available_pools = Queue.Queue() |
170 self._available_pools = Queue.Queue() |
177 self._available_pools.put_nowait(ConnectionsPool(self.sources)) |
171 self._available_pools.put_nowait(pool.ConnectionsPool(self.sources)) |
178 if config.read_instance_schema: |
172 if config.read_instance_schema: |
179 # normal start: load the instance schema from the database |
173 # normal start: load the instance schema from the database |
180 self.fill_schema() |
174 self.fill_schema() |
181 elif config.bootstrap_schema: |
175 elif config.bootstrap_schema: |
182 # usually during repository creation |
176 # usually during repository creation |
214 # initialization now that we know cubes |
208 # initialization now that we know cubes |
215 self._get_pool().close(True) |
209 self._get_pool().close(True) |
216 # list of available pools (we can't iterated on Queue instance) |
210 # list of available pools (we can't iterated on Queue instance) |
217 self.pools = [] |
211 self.pools = [] |
218 for i in xrange(config['connections-pool-size']): |
212 for i in xrange(config['connections-pool-size']): |
219 self.pools.append(ConnectionsPool(self.sources)) |
213 self.pools.append(pool.ConnectionsPool(self.sources)) |
220 self._available_pools.put_nowait(self.pools[-1]) |
214 self._available_pools.put_nowait(self.pools[-1]) |
221 self._shutting_down = False |
215 self._shutting_down = False |
222 self.hm = vreg['hooks'] |
216 self.hm = vreg['hooks'] |
223 if not (config.creating or config.repairing): |
217 if not (config.creating or config.repairing): |
224 # call instance level initialisation hooks |
218 # call instance level initialisation hooks |
229 |
223 |
230 # internals ############################################################### |
224 # internals ############################################################### |
231 |
225 |
232 def get_source(self, uri, source_config): |
226 def get_source(self, uri, source_config): |
233 source_config['uri'] = uri |
227 source_config['uri'] = uri |
234 return get_source(source_config, self.schema, self) |
228 return sources.get_source(source_config, self.schema, self) |
235 |
229 |
236 def set_schema(self, schema, resetvreg=True): |
230 def set_schema(self, schema, resetvreg=True): |
237 schema.rebuild_infered_relations() |
231 schema.rebuild_infered_relations() |
238 self.info('set schema %s %#x', schema.name, id(schema)) |
232 self.info('set schema %s %#x', schema.name, id(schema)) |
239 self.debug(', '.join(sorted(str(e) for e in schema.entities()))) |
233 self.debug(', '.join(sorted(str(e) for e in schema.entities()))) |
248 |
242 |
249 def fill_schema(self): |
243 def fill_schema(self): |
250 """lod schema from the repository""" |
244 """lod schema from the repository""" |
251 from cubicweb.server.schemaserial import deserialize_schema |
245 from cubicweb.server.schemaserial import deserialize_schema |
252 self.info('loading schema from the repository') |
246 self.info('loading schema from the repository') |
253 appschema = CubicWebSchema(self.config.appid) |
247 appschema = schema.CubicWebSchema(self.config.appid) |
254 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
248 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
255 self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema)) |
249 self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema)) |
256 session = self.internal_session() |
250 session = self.internal_session() |
257 try: |
251 try: |
258 try: |
252 try: |
271 self.config.init_cubes(self.get_cubes()) |
265 self.config.init_cubes(self.get_cubes()) |
272 |
266 |
273 def start_looping_tasks(self): |
267 def start_looping_tasks(self): |
274 assert isinstance(self._looping_tasks, list), 'already started' |
268 assert isinstance(self._looping_tasks, list), 'already started' |
275 for i, (interval, func, args) in enumerate(self._looping_tasks): |
269 for i, (interval, func, args) in enumerate(self._looping_tasks): |
276 self._looping_tasks[i] = task = LoopTask(interval, func, args) |
270 self._looping_tasks[i] = task = utils.LoopTask(interval, func, args) |
277 self.info('starting task %s with interval %.2fs', task.name, |
271 self.info('starting task %s with interval %.2fs', task.name, |
278 interval) |
272 interval) |
279 task.start() |
273 task.start() |
280 # ensure no tasks will be further added |
274 # ensure no tasks will be further added |
281 self._looping_tasks = tuple(self._looping_tasks) |
275 self._looping_tasks = tuple(self._looping_tasks) |
291 except AttributeError: |
285 except AttributeError: |
292 raise RuntimeError("can't add looping task once the repository is started") |
286 raise RuntimeError("can't add looping task once the repository is started") |
293 |
287 |
294 def threaded_task(self, func): |
288 def threaded_task(self, func): |
295 """start function in a separated thread""" |
289 """start function in a separated thread""" |
296 t = RepoThread(func, self._running_threads) |
290 t = utils.RepoThread(func, self._running_threads) |
297 t.start() |
291 t.start() |
298 |
292 |
299 #@locked |
293 #@locked |
300 def _get_pool(self): |
294 def _get_pool(self): |
301 try: |
295 try: |
896 rql = [] |
890 rql = [] |
897 eschema = self.schema.eschema(etype) |
891 eschema = self.schema.eschema(etype) |
898 pendingrtypes = session.transaction_data.get('pendingrtypes', ()) |
892 pendingrtypes = session.transaction_data.get('pendingrtypes', ()) |
899 for rschema, targetschemas, x in eschema.relation_definitions(): |
893 for rschema, targetschemas, x in eschema.relation_definitions(): |
900 rtype = rschema.type |
894 rtype = rschema.type |
901 if rtype in VIRTUAL_RTYPES or rtype in pendingrtypes: |
895 if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes: |
902 continue |
896 continue |
903 var = '%s%s' % (rtype.upper(), x.upper()) |
897 var = '%s%s' % (rtype.upper(), x.upper()) |
904 if x == 'subject': |
898 if x == 'subject': |
905 # don't skip inlined relation so they are regularly |
899 # don't skip inlined relation so they are regularly |
906 # deleted and so hooks are correctly called |
900 # deleted and so hooks are correctly called |
976 entity._is_saved = True # entity has an eid and is saved |
970 entity._is_saved = True # entity has an eid and is saved |
977 # prefill entity relation caches |
971 # prefill entity relation caches |
978 session.set_entity_cache(entity) |
972 session.set_entity_cache(entity) |
979 for rschema in eschema.subject_relations(): |
973 for rschema in eschema.subject_relations(): |
980 rtype = str(rschema) |
974 rtype = str(rschema) |
981 if rtype in VIRTUAL_RTYPES: |
975 if rtype in schema.VIRTUAL_RTYPES: |
982 continue |
976 continue |
983 if rschema.is_final(): |
977 if rschema.is_final(): |
984 entity.setdefault(rtype, None) |
978 entity.setdefault(rtype, None) |
985 else: |
979 else: |
986 entity.set_related_cache(rtype, 'subject', session.empty_rset()) |
980 entity.set_related_cache(rtype, 'subject', session.empty_rset()) |
987 for rschema in eschema.object_relations(): |
981 for rschema in eschema.object_relations(): |
988 rtype = str(rschema) |
982 rtype = str(rschema) |
989 if rtype in VIRTUAL_RTYPES: |
983 if rtype in schema.VIRTUAL_RTYPES: |
990 continue |
984 continue |
991 entity.set_related_cache(rtype, 'object', session.empty_rset()) |
985 entity.set_related_cache(rtype, 'object', session.empty_rset()) |
992 # set inline relation cache before call to after_add_entity |
986 # set inline relation cache before call to after_add_entity |
993 for attr, value in relations: |
987 for attr, value in relations: |
994 session.update_rel_cache_add(entity.eid, attr, value) |
988 session.update_rel_cache_add(entity.eid, attr, value) |