|
1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """Defines the central class for the CubicWeb RQL server: the repository. |
|
19 |
|
20 The repository is an abstraction allowing execution of rql queries against |
|
21 data sources. Most of the work is actually done in helper classes. The |
|
22 repository mainly: |
|
23 |
|
24 * brings these classes all together to provide a single access |
|
25 point to a cubicweb instance. |
|
26 * handles session management |
|
27 """ |
|
28 from __future__ import print_function |
|
29 |
|
30 __docformat__ = "restructuredtext en" |
|
31 |
|
32 import threading |
|
33 from warnings import warn |
|
34 from itertools import chain |
|
35 from time import time, localtime, strftime |
|
36 from contextlib import contextmanager |
|
37 |
|
38 from six.moves import range, queue |
|
39 |
|
40 from logilab.common.decorators import cached, clear_cache |
|
41 from logilab.common.deprecation import deprecated |
|
42 |
|
43 from yams import BadSchemaDefinition |
|
44 from rql.utils import rqlvar_maker |
|
45 |
|
46 from cubicweb import (CW_MIGRATION_MAP, QueryError, |
|
47 UnknownEid, AuthenticationError, ExecutionError, |
|
48 BadConnectionId, ValidationError, Unauthorized, |
|
49 UniqueTogetherError, onevent, ViolatedConstraint) |
|
50 from cubicweb import cwvreg, schema, server |
|
51 from cubicweb.server import ShuttingDown, utils, hook, querier, sources |
|
52 from cubicweb.server.session import Session, InternalManager |
|
53 |
|
54 NO_CACHE_RELATIONS = set( [('owned_by', 'object'), |
|
55 ('created_by', 'object'), |
|
56 ('cw_source', 'object'), |
|
57 ]) |
|
58 |
|
59 def prefill_entity_caches(entity): |
|
60 cnx = entity._cw |
|
61 # prefill entity relation caches |
|
62 for rschema in entity.e_schema.subject_relations(): |
|
63 rtype = str(rschema) |
|
64 if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS: |
|
65 continue |
|
66 if rschema.final: |
|
67 entity.cw_attr_cache.setdefault(rtype, None) |
|
68 else: |
|
69 entity.cw_set_relation_cache(rtype, 'subject', |
|
70 cnx.empty_rset()) |
|
71 for rschema in entity.e_schema.object_relations(): |
|
72 rtype = str(rschema) |
|
73 if rtype in schema.VIRTUAL_RTYPES or (rtype, 'object') in NO_CACHE_RELATIONS: |
|
74 continue |
|
75 entity.cw_set_relation_cache(rtype, 'object', cnx.empty_rset()) |
|
76 |
|
77 def del_existing_rel_if_needed(cnx, eidfrom, rtype, eidto): |
|
78 """delete existing relation when adding a new one if card is 1 or ? |
|
79 |
|
80 have to be done once the new relation has been inserted to avoid having |
|
81 an entity without a relation for some time |
|
82 |
|
83 this kind of behaviour has to be done in the repository so we don't have |
|
84 hooks order hazardness |
|
85 """ |
|
86 # skip that if integrity explicitly disabled |
|
87 if not cnx.is_hook_category_activated('activeintegrity'): |
|
88 return |
|
89 rdef = cnx.rtype_eids_rdef(rtype, eidfrom, eidto) |
|
90 card = rdef.cardinality |
|
91 # one may be tented to check for neweids but this may cause more than one |
|
92 # relation even with '1?' cardinality if thoses relations are added in the |
|
93 # same transaction where the entity is being created. This never occurs from |
|
94 # the web interface but may occurs during test or dbapi connection (though |
|
95 # not expected for this). So: don't do it, we pretend to ensure repository |
|
96 # consistency. |
|
97 # |
|
98 # notes: |
|
99 # * inlined relations will be implicitly deleted for the subject entity |
|
100 # * we don't want read permissions to be applied but we want delete |
|
101 # permission to be checked |
|
102 if card[0] in '1?': |
|
103 with cnx.security_enabled(read=False): |
|
104 cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, ' |
|
105 'NOT Y eid %%(y)s' % rtype, |
|
106 {'x': eidfrom, 'y': eidto}) |
|
107 if card[1] in '1?': |
|
108 with cnx.security_enabled(read=False): |
|
109 cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, ' |
|
110 'NOT X eid %%(x)s' % rtype, |
|
111 {'x': eidfrom, 'y': eidto}) |
|
112 |
|
113 |
|
114 def preprocess_inlined_relations(cnx, entity): |
|
115 """when an entity is added, check if it has some inlined relation which |
|
116 requires to be extrated for proper call hooks |
|
117 """ |
|
118 relations = [] |
|
119 activeintegrity = cnx.is_hook_category_activated('activeintegrity') |
|
120 eschema = entity.e_schema |
|
121 for attr in entity.cw_edited: |
|
122 rschema = eschema.subjrels[attr] |
|
123 if not rschema.final: # inlined relation |
|
124 value = entity.cw_edited[attr] |
|
125 relations.append((attr, value)) |
|
126 cnx.update_rel_cache_add(entity.eid, attr, value) |
|
127 rdef = cnx.rtype_eids_rdef(attr, entity.eid, value) |
|
128 if rdef.cardinality[1] in '1?' and activeintegrity: |
|
129 with cnx.security_enabled(read=False): |
|
130 cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr, |
|
131 {'x': entity.eid, 'y': value}) |
|
132 return relations |
|
133 |
|
134 |
|
135 class NullEventBus(object): |
|
136 def publish(self, msg): |
|
137 pass |
|
138 |
|
139 def add_subscription(self, topic, callback): |
|
140 pass |
|
141 |
|
142 def start(self): |
|
143 pass |
|
144 |
|
145 def stop(self): |
|
146 pass |
|
147 |
|
148 |
|
149 class Repository(object): |
|
150 """a repository provides access to a set of persistent storages for |
|
151 entities and relations |
|
152 """ |
|
153 |
|
154 def __init__(self, config, tasks_manager=None, vreg=None): |
|
155 self.config = config |
|
156 if vreg is None: |
|
157 vreg = cwvreg.CWRegistryStore(config) |
|
158 self.vreg = vreg |
|
159 self._tasks_manager = tasks_manager |
|
160 |
|
161 self.app_instances_bus = NullEventBus() |
|
162 self.info('starting repository from %s', self.config.apphome) |
|
163 # dictionary of opened sessions |
|
164 self._sessions = {} |
|
165 |
|
166 # list of functions to be called at regular interval |
|
167 # list of running threads |
|
168 self._running_threads = [] |
|
169 # initial schema, should be build or replaced latter |
|
170 self.schema = schema.CubicWebSchema(config.appid) |
|
171 self.vreg.schema = self.schema # until actual schema is loaded... |
|
172 # shutdown flag |
|
173 self.shutting_down = False |
|
174 # sources (additional sources info in the system database) |
|
175 self.system_source = self.get_source('native', 'system', |
|
176 config.system_source_config.copy()) |
|
177 self.sources_by_uri = {'system': self.system_source} |
|
178 # querier helper, need to be created after sources initialization |
|
179 self.querier = querier.QuerierHelper(self, self.schema) |
|
180 # cache eid -> (type, extid, actual source) |
|
181 self._type_source_cache = {} |
|
182 # cache extid -> eid |
|
183 self._extid_cache = {} |
|
184 # open some connection sets |
|
185 if config.init_cnxset_pool: |
|
186 self.init_cnxset_pool() |
|
187 # the hooks manager |
|
188 self.hm = hook.HooksManager(self.vreg) |
|
189 # registry hook to fix user class on registry reload |
|
190 @onevent('after-registry-reload', self) |
|
191 def fix_user_classes(self): |
|
192 # After registry reload the 'CWUser' class used for CWEtype |
|
193 # changed. So any existing user object have a different class than |
|
194 # the new loaded one. We are hot fixing this. |
|
195 usercls = self.vreg['etypes'].etype_class('CWUser') |
|
196 for session in self._sessions.values(): |
|
197 if not isinstance(session.user, InternalManager): |
|
198 session.user.__class__ = usercls |
|
199 |
|
200 def init_cnxset_pool(self): |
|
201 """should be called bootstrap_repository, as this is what it does""" |
|
202 config = self.config |
|
203 self._cnxsets_pool = queue.Queue() |
|
204 # 0. init a cnxset that will be used to fetch bootstrap information from |
|
205 # the database |
|
206 self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection()) |
|
207 # 1. set used cubes |
|
208 if config.creating or not config.read_instance_schema: |
|
209 config.bootstrap_cubes() |
|
210 else: |
|
211 self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False) |
|
212 config.init_cubes(self.get_cubes()) |
|
213 # 2. load schema |
|
214 if config.quick_start: |
|
215 # quick start: only to get a minimal repository to get cubes |
|
216 # information (eg dump/restore/...) |
|
217 # |
|
218 # restrict appobject_path to only load hooks and entity classes in |
|
219 # the registry |
|
220 config.cube_appobject_path = set(('hooks', 'entities')) |
|
221 config.cubicweb_appobject_path = set(('hooks', 'entities')) |
|
222 # limit connections pool to 1 |
|
223 config['connections-pool-size'] = 1 |
|
224 if config.quick_start or config.creating or not config.read_instance_schema: |
|
225 # load schema from the file system |
|
226 if not config.creating: |
|
227 self.info("set fs instance'schema") |
|
228 self.set_schema(config.load_schema(expand_cubes=True)) |
|
229 else: |
|
230 # normal start: load the instance schema from the database |
|
231 self.info('loading schema from the repository') |
|
232 self.set_schema(self.deserialize_schema()) |
|
233 # 3. initialize data sources |
|
234 if config.creating: |
|
235 # call init_creating so that for instance native source can |
|
236 # configurate tsearch according to postgres version |
|
237 self.system_source.init_creating() |
|
238 else: |
|
239 self.init_sources_from_database() |
|
240 if 'CWProperty' in self.schema: |
|
241 self.vreg.init_properties(self.properties()) |
|
242 # 4. close initialization connection set and reopen fresh ones for |
|
243 # proper initialization |
|
244 self._get_cnxset().close(True) |
|
245 self.cnxsets = [] # list of available cnxsets (can't iterate on a Queue) |
|
246 for i in range(config['connections-pool-size']): |
|
247 self.cnxsets.append(self.system_source.wrapped_connection()) |
|
248 self._cnxsets_pool.put_nowait(self.cnxsets[-1]) |
|
249 |
|
250 # internals ############################################################### |
|
251 |
|
252 def init_sources_from_database(self): |
|
253 self.sources_by_eid = {} |
|
254 if self.config.quick_start \ |
|
255 or not 'CWSource' in self.schema: # # 3.10 migration |
|
256 self.system_source.init_creating() |
|
257 return |
|
258 with self.internal_cnx() as cnx: |
|
259 # FIXME: sources should be ordered (add_entity priority) |
|
260 for sourceent in cnx.execute( |
|
261 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, ' |
|
262 'S name SN, S type SA, S config SC').entities(): |
|
263 if sourceent.name == 'system': |
|
264 self.system_source.eid = sourceent.eid |
|
265 self.sources_by_eid[sourceent.eid] = self.system_source |
|
266 self.system_source.init(True, sourceent) |
|
267 continue |
|
268 self.add_source(sourceent) |
|
269 |
|
270 def _clear_planning_caches(self): |
|
271 clear_cache(self, 'source_defs') |
|
272 |
|
273 def add_source(self, sourceent): |
|
274 try: |
|
275 source = self.get_source(sourceent.type, sourceent.name, |
|
276 sourceent.host_config, sourceent.eid) |
|
277 except RuntimeError: |
|
278 if self.config.repairing: |
|
279 self.exception('cant setup source %s, skipped', sourceent.name) |
|
280 return |
|
281 raise |
|
282 self.sources_by_eid[sourceent.eid] = source |
|
283 self.sources_by_uri[sourceent.name] = source |
|
284 if self.config.source_enabled(source): |
|
285 # call source's init method to complete their initialisation if |
|
286 # needed (for instance looking for persistent configuration using an |
|
287 # internal session, which is not possible until connections sets have been |
|
288 # initialized) |
|
289 source.init(True, sourceent) |
|
290 else: |
|
291 source.init(False, sourceent) |
|
292 self._clear_planning_caches() |
|
293 |
|
294 def remove_source(self, uri): |
|
295 source = self.sources_by_uri.pop(uri) |
|
296 del self.sources_by_eid[source.eid] |
|
297 self._clear_planning_caches() |
|
298 |
|
299 def get_source(self, type, uri, source_config, eid=None): |
|
300 # set uri and type in source config so it's available through |
|
301 # source_defs() |
|
302 source_config['uri'] = uri |
|
303 source_config['type'] = type |
|
304 return sources.get_source(type, source_config, self, eid) |
|
305 |
|
306 def set_schema(self, schema, resetvreg=True): |
|
307 self.info('set schema %s %#x', schema.name, id(schema)) |
|
308 if resetvreg: |
|
309 # trigger full reload of all appobjects |
|
310 self.vreg.set_schema(schema) |
|
311 else: |
|
312 self.vreg._set_schema(schema) |
|
313 self.querier.set_schema(schema) |
|
314 for source in self.sources_by_uri.values(): |
|
315 source.set_schema(schema) |
|
316 self.schema = schema |
|
317 |
|
318 def deserialize_schema(self): |
|
319 """load schema from the database""" |
|
320 from cubicweb.server.schemaserial import deserialize_schema |
|
321 appschema = schema.CubicWebSchema(self.config.appid) |
|
322 self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema)) |
|
323 with self.internal_cnx() as cnx: |
|
324 try: |
|
325 deserialize_schema(appschema, cnx) |
|
326 except BadSchemaDefinition: |
|
327 raise |
|
328 except Exception as ex: |
|
329 import traceback |
|
330 traceback.print_exc() |
|
331 raise Exception('Is the database initialised ? (cause: %s)' % ex) |
|
332 return appschema |
|
333 |
|
334 def _prepare_startup(self): |
|
335 """Prepare "Repository as a server" for startup. |
|
336 |
|
337 * trigger server startup hook, |
|
338 * register session clean up task. |
|
339 """ |
|
340 if not (self.config.creating or self.config.repairing |
|
341 or self.config.quick_start): |
|
342 # call instance level initialisation hooks |
|
343 self.hm.call_hooks('server_startup', repo=self) |
|
344 # register a task to cleanup expired session |
|
345 self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24 |
|
346 assert self.cleanup_session_time > 0 |
|
347 cleanup_session_interval = min(60*60, self.cleanup_session_time / 3) |
|
348 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
|
349 self._tasks_manager.add_looping_task(cleanup_session_interval, |
|
350 self.clean_sessions) |
|
351 |
|
352 def start_looping_tasks(self): |
|
353 """Actual "Repository as a server" startup. |
|
354 |
|
355 * trigger server startup hook, |
|
356 * register session clean up task, |
|
357 * start all tasks. |
|
358 |
|
359 XXX Other startup related stuffs are done elsewhere. In Repository |
|
360 XXX __init__ or in external codes (various server managers). |
|
361 """ |
|
362 self._prepare_startup() |
|
363 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
|
364 self._tasks_manager.start() |
|
365 |
|
366 def looping_task(self, interval, func, *args): |
|
367 """register a function to be called every `interval` seconds. |
|
368 |
|
369 looping tasks can only be registered during repository initialization, |
|
370 once done this method will fail. |
|
371 """ |
|
372 assert self._tasks_manager is not None, "This Repository is not intended to be used as a server" |
|
373 self._tasks_manager.add_looping_task(interval, func, *args) |
|
374 |
|
375 def threaded_task(self, func): |
|
376 """start function in a separated thread""" |
|
377 utils.RepoThread(func, self._running_threads).start() |
|
378 |
|
379 #@locked |
|
380 def _get_cnxset(self): |
|
381 try: |
|
382 return self._cnxsets_pool.get(True, timeout=5) |
|
383 except queue.Empty: |
|
384 raise Exception('no connections set available after 5 secs, probably either a ' |
|
385 'bug in code (too many uncommited/rolled back ' |
|
386 'connections) or too much load on the server (in ' |
|
387 'which case you can try to set a bigger ' |
|
388 'connections pool size)') |
|
389 |
|
390 def _free_cnxset(self, cnxset): |
|
391 self._cnxsets_pool.put_nowait(cnxset) |
|
392 |
|
393 def shutdown(self): |
|
394 """called on server stop event to properly close opened sessions and |
|
395 connections |
|
396 """ |
|
397 assert not self.shutting_down, 'already shutting down' |
|
398 if not (self.config.creating or self.config.repairing |
|
399 or self.config.quick_start): |
|
400 # then, the system source is still available |
|
401 self.hm.call_hooks('before_server_shutdown', repo=self) |
|
402 self.shutting_down = True |
|
403 self.system_source.shutdown() |
|
404 if self._tasks_manager is not None: |
|
405 self._tasks_manager.stop() |
|
406 if not (self.config.creating or self.config.repairing |
|
407 or self.config.quick_start): |
|
408 self.hm.call_hooks('server_shutdown', repo=self) |
|
409 for thread in self._running_threads: |
|
410 self.info('waiting thread %s...', thread.getName()) |
|
411 thread.join() |
|
412 self.info('thread %s finished', thread.getName()) |
|
413 self.close_sessions() |
|
414 while not self._cnxsets_pool.empty(): |
|
415 cnxset = self._cnxsets_pool.get_nowait() |
|
416 try: |
|
417 cnxset.close(True) |
|
418 except Exception: |
|
419 self.exception('error while closing %s' % cnxset) |
|
420 continue |
|
421 hits, misses = self.querier.cache_hit, self.querier.cache_miss |
|
422 try: |
|
423 self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses, |
|
424 (hits * 100) / (hits + misses)) |
|
425 hits, misses = self.system_source.cache_hit, self.system_source.cache_miss |
|
426 self.info('sql cache hit/miss: %s/%s (%s%% hits)', hits, misses, |
|
427 (hits * 100) / (hits + misses)) |
|
428 nocache = self.system_source.no_cache |
|
429 self.info('sql cache usage: %s/%s (%s%%)', hits+ misses, nocache, |
|
430 ((hits + misses) * 100) / (hits + misses + nocache)) |
|
431 except ZeroDivisionError: |
|
432 pass |
|
433 |
|
434 def check_auth_info(self, cnx, login, authinfo): |
|
435 """validate authentication, raise AuthenticationError on failure, return |
|
436 associated CWUser's eid on success. |
|
437 """ |
|
438 # iter on sources_by_uri then check enabled source since sources doesn't |
|
439 # contain copy based sources |
|
440 for source in self.sources_by_uri.values(): |
|
441 if self.config.source_enabled(source) and source.support_entity('CWUser'): |
|
442 try: |
|
443 return source.authenticate(cnx, login, **authinfo) |
|
444 except AuthenticationError: |
|
445 continue |
|
446 else: |
|
447 raise AuthenticationError('authentication failed with all sources') |
|
448 |
|
449 def authenticate_user(self, cnx, login, **authinfo): |
|
450 """validate login / password, raise AuthenticationError on failure |
|
451 return associated CWUser instance on success |
|
452 """ |
|
453 eid = self.check_auth_info(cnx, login, authinfo) |
|
454 cwuser = self._build_user(cnx, eid) |
|
455 if self.config.consider_user_state and \ |
|
456 not cwuser.cw_adapt_to('IWorkflowable').state in cwuser.AUTHENTICABLE_STATES: |
|
457 raise AuthenticationError('user is not in authenticable state') |
|
458 return cwuser |
|
459 |
|
460 def _build_user(self, cnx, eid): |
|
461 """return a CWUser entity for user with the given eid""" |
|
462 cls = self.vreg['etypes'].etype_class('CWUser') |
|
463 st = cls.fetch_rqlst(cnx.user, ordermethod=None) |
|
464 st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute') |
|
465 rset = cnx.execute(st.as_string(), {'x': eid}) |
|
466 assert len(rset) == 1, rset |
|
467 cwuser = rset.get_entity(0, 0) |
|
468 # pylint: disable=W0104 |
|
469 # prefetch / cache cwuser's groups and properties. This is especially |
|
470 # useful for internal sessions to avoid security insertions |
|
471 cwuser.groups |
|
472 cwuser.properties |
|
473 return cwuser |
|
474 |
|
475 # public (dbapi) interface ################################################ |
|
476 |
|
477 @deprecated("[3.19] use _cw.call_service('repo_stats')") |
|
478 def stats(self): # XXX restrict to managers session? |
|
479 """Return a dictionary containing some statistics about the repository |
|
480 resources usage. |
|
481 |
|
482 This is a public method, not requiring a session id. |
|
483 |
|
484 This method is deprecated in favor of using _cw.call_service('repo_stats') |
|
485 """ |
|
486 with self.internal_cnx() as cnx: |
|
487 return cnx.call_service('repo_stats') |
|
488 |
|
489 @deprecated("[3.19] use _cw.call_service('repo_gc_stats')") |
|
490 def gc_stats(self, nmax=20): |
|
491 """Return a dictionary containing some statistics about the repository |
|
492 memory usage. |
|
493 |
|
494 This is a public method, not requiring a session id. |
|
495 |
|
496 nmax is the max number of (most) referenced object returned as |
|
497 the 'referenced' result |
|
498 """ |
|
499 with self.internal_cnx() as cnx: |
|
500 return cnx.call_service('repo_gc_stats', nmax=nmax) |
|
501 |
|
502 def get_schema(self): |
|
503 """Return the instance schema. |
|
504 |
|
505 This is a public method, not requiring a session id. |
|
506 """ |
|
507 return self.schema |
|
508 |
|
509 def get_cubes(self): |
|
510 """Return the list of cubes used by this instance. |
|
511 |
|
512 This is a public method, not requiring a session id. |
|
513 """ |
|
514 versions = self.get_versions(not (self.config.creating |
|
515 or self.config.repairing |
|
516 or self.config.quick_start |
|
517 or self.config.mode == 'test')) |
|
518 cubes = list(versions) |
|
519 cubes.remove('cubicweb') |
|
520 return cubes |
|
521 |
|
522 def get_option_value(self, option, foreid=None): |
|
523 """Return the value for `option` in the configuration. |
|
524 |
|
525 This is a public method, not requiring a session id. |
|
526 |
|
527 `foreid` argument is deprecated and now useless (as of 3.19). |
|
528 """ |
|
529 if foreid is not None: |
|
530 warn('[3.19] foreid argument is deprecated', DeprecationWarning, |
|
531 stacklevel=2) |
|
532 # XXX we may want to check we don't give sensible information |
|
533 return self.config[option] |
|
534 |
|
535 @cached |
|
536 def get_versions(self, checkversions=False): |
|
537 """Return the a dictionary containing cubes used by this instance |
|
538 as key with their version as value, including cubicweb version. |
|
539 |
|
540 This is a public method, not requiring a session id. |
|
541 """ |
|
542 from logilab.common.changelog import Version |
|
543 vcconf = {} |
|
544 with self.internal_cnx() as cnx: |
|
545 for pk, version in cnx.execute( |
|
546 'Any K,V WHERE P is CWProperty, P value V, P pkey K, ' |
|
547 'P pkey ~="system.version.%"', build_descr=False): |
|
548 cube = pk.split('.')[-1] |
|
549 # XXX cubicweb migration |
|
550 if cube in CW_MIGRATION_MAP: |
|
551 cube = CW_MIGRATION_MAP[cube] |
|
552 version = Version(version) |
|
553 vcconf[cube] = version |
|
554 if checkversions: |
|
555 if cube != 'cubicweb': |
|
556 fsversion = self.config.cube_version(cube) |
|
557 else: |
|
558 fsversion = self.config.cubicweb_version() |
|
559 if version < fsversion: |
|
560 msg = ('instance has %s version %s but %s ' |
|
561 'is installed. Run "cubicweb-ctl upgrade".') |
|
562 raise ExecutionError(msg % (cube, version, fsversion)) |
|
563 return vcconf |
|
564 |
|
565 @cached |
|
566 def source_defs(self): |
|
567 """Return the a dictionary containing source uris as value and a |
|
568 dictionary describing each source as value. |
|
569 |
|
570 This is a public method, not requiring a session id. |
|
571 """ |
|
572 sources = {} |
|
573 # remove sensitive information |
|
574 for uri, source in self.sources_by_uri.items(): |
|
575 sources[uri] = source.public_config |
|
576 return sources |
|
577 |
|
578 def properties(self): |
|
579 """Return a result set containing system wide properties. |
|
580 |
|
581 This is a public method, not requiring a session id. |
|
582 """ |
|
583 with self.internal_cnx() as cnx: |
|
584 # don't use cnx.execute, we don't want rset.req set |
|
585 return self.querier.execute(cnx, 'Any K,V WHERE P is CWProperty,' |
|
586 'P pkey K, P value V, NOT P for_user U', |
|
587 build_descr=False) |
|
588 |
|
589 @deprecated("[3.19] Use session.call_service('register_user') instead'") |
|
590 def register_user(self, login, password, email=None, **kwargs): |
|
591 """check a user with the given login exists, if not create it with the |
|
592 given password. This method is designed to be used for anonymous |
|
593 registration on public web site. |
|
594 """ |
|
595 with self.internal_cnx() as cnx: |
|
596 cnx.call_service('register_user', login=login, password=password, |
|
597 email=email, **kwargs) |
|
598 cnx.commit() |
|
599 |
|
600 def find_users(self, fetch_attrs, **query_attrs): |
|
601 """yield user attributes for cwusers matching the given query_attrs |
|
602 (the result set cannot survive this method call) |
|
603 |
|
604 This can be used by low-privileges account (anonymous comes to |
|
605 mind). |
|
606 |
|
607 `fetch_attrs`: tuple of attributes to be fetched |
|
608 `query_attrs`: dict of attr/values to restrict the query |
|
609 """ |
|
610 assert query_attrs |
|
611 if not hasattr(self, '_cwuser_attrs'): |
|
612 cwuser = self.schema['CWUser'] |
|
613 self._cwuser_attrs = set(str(rschema) |
|
614 for rschema, _eschema in cwuser.attribute_definitions() |
|
615 if not rschema.meta) |
|
616 cwuserattrs = self._cwuser_attrs |
|
617 for k in chain(fetch_attrs, query_attrs): |
|
618 if k not in cwuserattrs: |
|
619 raise Exception('bad input for find_user') |
|
620 with self.internal_cnx() as cnx: |
|
621 varmaker = rqlvar_maker() |
|
622 vars = [(attr, next(varmaker)) for attr in fetch_attrs] |
|
623 rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars) |
|
624 rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ',' |
|
625 rset = cnx.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr) |
|
626 for attr in query_attrs), |
|
627 query_attrs) |
|
628 return rset.rows |
|
629 |
|
630 def new_session(self, login, **kwargs): |
|
631 """open a new session for a given user |
|
632 |
|
633 raise `AuthenticationError` if the authentication failed |
|
634 raise `ConnectionError` if we can't open a connection |
|
635 """ |
|
636 cnxprops = kwargs.pop('cnxprops', None) |
|
637 # use an internal connection |
|
638 with self.internal_cnx() as cnx: |
|
639 # try to get a user object |
|
640 user = self.authenticate_user(cnx, login, **kwargs) |
|
641 session = Session(user, self, cnxprops) |
|
642 user._cw = user.cw_rset.req = session |
|
643 user.cw_clear_relation_cache() |
|
644 self._sessions[session.sessionid] = session |
|
645 self.info('opened session %s for user %s', session.sessionid, login) |
|
646 with session.new_cnx() as cnx: |
|
647 self.hm.call_hooks('session_open', cnx) |
|
648 # commit connection at this point in case write operation has been |
|
649 # done during `session_open` hooks |
|
650 cnx.commit() |
|
651 return session |
|
652 |
|
653 def connect(self, login, **kwargs): |
|
654 """open a new session for a given user and return its sessionid """ |
|
655 return self.new_session(login, **kwargs).sessionid |
|
656 |
|
657 def close(self, sessionid, txid=None, checkshuttingdown=True): |
|
658 """close the session with the given id""" |
|
659 session = self._get_session(sessionid, txid=txid, |
|
660 checkshuttingdown=checkshuttingdown) |
|
661 # operation uncommited before close are rolled back before hook is called |
|
662 with session.new_cnx() as cnx: |
|
663 self.hm.call_hooks('session_close', cnx) |
|
664 # commit connection at this point in case write operation has been |
|
665 # done during `session_close` hooks |
|
666 cnx.commit() |
|
667 session.close() |
|
668 del self._sessions[sessionid] |
|
669 self.info('closed session %s for user %s', sessionid, session.user.login) |
|
670 |
|
671 # session handling ######################################################## |
|
672 |
|
673 def close_sessions(self): |
|
674 """close every opened sessions""" |
|
675 for sessionid in list(self._sessions): |
|
676 try: |
|
677 self.close(sessionid, checkshuttingdown=False) |
|
678 except Exception: # XXX BaseException? |
|
679 self.exception('error while closing session %s' % sessionid) |
|
680 |
|
681 def clean_sessions(self): |
|
682 """close sessions not used since an amount of time specified in the |
|
683 configuration |
|
684 """ |
|
685 mintime = time() - self.cleanup_session_time |
|
686 self.debug('cleaning session unused since %s', |
|
687 strftime('%H:%M:%S', localtime(mintime))) |
|
688 nbclosed = 0 |
|
689 for session in self._sessions.values(): |
|
690 if session.timestamp < mintime: |
|
691 self.close(session.sessionid) |
|
692 nbclosed += 1 |
|
693 return nbclosed |
|
694 |
|
695 @contextmanager |
|
696 def internal_cnx(self): |
|
697 """Context manager returning a Connection using internal user which have |
|
698 every access rights on the repository. |
|
699 |
|
700 Beware that unlike the older :meth:`internal_session`, internal |
|
701 connections have all hooks beside security enabled. |
|
702 """ |
|
703 with Session(InternalManager(), self) as session: |
|
704 with session.new_cnx() as cnx: |
|
705 cnx.user._cw = cnx # XXX remove when "vreg = user._cw.vreg" |
|
706 # hack in entity.py is gone |
|
707 with cnx.security_enabled(read=False, write=False): |
|
708 yield cnx |
|
709 |
|
710 def _get_session(self, sessionid, txid=None, checkshuttingdown=True): |
|
711 """return the session associated with the given session identifier""" |
|
712 if checkshuttingdown and self.shutting_down: |
|
713 raise ShuttingDown('Repository is shutting down') |
|
714 try: |
|
715 session = self._sessions[sessionid] |
|
716 except KeyError: |
|
717 raise BadConnectionId('No such session %s' % sessionid) |
|
718 return session |
|
719 |
|
720 # data sources handling ################################################### |
|
721 # * correspondance between eid and (type, source) |
|
722 # * correspondance between eid and local id (i.e. specific to a given source) |
|
723 |
|
724 def type_and_source_from_eid(self, eid, cnx): |
|
725 """return a tuple `(type, extid, actual source uri)` for the entity of |
|
726 the given `eid` |
|
727 """ |
|
728 try: |
|
729 eid = int(eid) |
|
730 except ValueError: |
|
731 raise UnknownEid(eid) |
|
732 try: |
|
733 return self._type_source_cache[eid] |
|
734 except KeyError: |
|
735 etype, extid, auri = self.system_source.eid_type_source(cnx, eid) |
|
736 self._type_source_cache[eid] = (etype, extid, auri) |
|
737 return etype, extid, auri |
|
738 |
|
739 def clear_caches(self, eids): |
|
740 etcache = self._type_source_cache |
|
741 extidcache = self._extid_cache |
|
742 rqlcache = self.querier._rql_cache |
|
743 for eid in eids: |
|
744 try: |
|
745 etype, extid, auri = etcache.pop(int(eid)) # may be a string in some cases |
|
746 rqlcache.pop( ('%s X WHERE X eid %s' % (etype, eid),), None) |
|
747 extidcache.pop(extid, None) |
|
748 except KeyError: |
|
749 etype = None |
|
750 rqlcache.pop( ('Any X WHERE X eid %s' % eid,), None) |
|
751 self.system_source.clear_eid_cache(eid, etype) |
|
752 |
|
753 def type_from_eid(self, eid, cnx): |
|
754 """return the type of the entity with id <eid>""" |
|
755 return self.type_and_source_from_eid(eid, cnx)[0] |
|
756 |
|
757 def querier_cache_key(self, cnx, rql, args, eidkeys): |
|
758 cachekey = [rql] |
|
759 for key in sorted(eidkeys): |
|
760 try: |
|
761 etype = self.type_from_eid(args[key], cnx) |
|
762 except KeyError: |
|
763 raise QueryError('bad cache key %s (no value)' % key) |
|
764 except TypeError: |
|
765 raise QueryError('bad cache key %s (value: %r)' % ( |
|
766 key, args[key])) |
|
767 cachekey.append(etype) |
|
768 # ensure eid is correctly typed in args |
|
769 args[key] = int(args[key]) |
|
770 return tuple(cachekey) |
|
771 |
|
772 @deprecated('[3.22] use the new store API') |
|
773 def extid2eid(self, source, extid, etype, cnx, insert=True, |
|
774 sourceparams=None): |
|
775 """Return eid from a local id. If the eid is a negative integer, that |
|
776 means the entity is known but has been copied back to the system source |
|
777 hence should be ignored. |
|
778 |
|
779 If no record is found, ie the entity is not known yet: |
|
780 |
|
781 1. an eid is attributed |
|
782 |
|
783 2. the source's :meth:`before_entity_insertion` method is called to |
|
784 build the entity instance |
|
785 |
|
786 3. unless source's :attr:`should_call_hooks` tell otherwise, |
|
787 'before_add_entity' hooks are called |
|
788 |
|
789 4. record is added into the system source |
|
790 |
|
791 5. the source's :meth:`after_entity_insertion` method is called to |
|
792 complete building of the entity instance |
|
793 |
|
794 6. unless source's :attr:`should_call_hooks` tell otherwise, |
|
795 'before_add_entity' hooks are called |
|
796 """ |
|
797 try: |
|
798 return self._extid_cache[extid] |
|
799 except KeyError: |
|
800 pass |
|
801 eid = self.system_source.extid2eid(cnx, extid) |
|
802 if eid is not None: |
|
803 self._extid_cache[extid] = eid |
|
804 self._type_source_cache[eid] = (etype, extid, source.uri) |
|
805 return eid |
|
806 if not insert: |
|
807 return |
|
808 # no link between extid and eid, create one |
|
809 # write query, ensure connection's mode is 'write' so connections |
|
810 # won't be released until commit/rollback |
|
811 try: |
|
812 eid = self.system_source.create_eid(cnx) |
|
813 self._extid_cache[extid] = eid |
|
814 self._type_source_cache[eid] = (etype, extid, source.uri) |
|
815 entity = source.before_entity_insertion( |
|
816 cnx, extid, etype, eid, sourceparams) |
|
817 if source.should_call_hooks: |
|
818 # get back a copy of operation for later restore if |
|
819 # necessary, see below |
|
820 pending_operations = cnx.pending_operations[:] |
|
821 self.hm.call_hooks('before_add_entity', cnx, entity=entity) |
|
822 self.add_info(cnx, entity, source, extid) |
|
823 source.after_entity_insertion(cnx, extid, entity, sourceparams) |
|
824 if source.should_call_hooks: |
|
825 self.hm.call_hooks('after_add_entity', cnx, entity=entity) |
|
826 return eid |
|
827 except Exception: |
|
828 # XXX do some cleanup manually so that the transaction has a |
|
829 # chance to be commited, with simply this entity discarded |
|
830 self._extid_cache.pop(extid, None) |
|
831 self._type_source_cache.pop(eid, None) |
|
832 if 'entity' in locals(): |
|
833 hook.CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(entity.eid) |
|
834 self.system_source.delete_info_multi(cnx, [entity]) |
|
835 if source.should_call_hooks: |
|
836 cnx.pending_operations = pending_operations |
|
837 raise |
|
838 |
|
839 def add_info(self, cnx, entity, source, extid=None): |
|
840 """add type and source info for an eid into the system table, |
|
841 and index the entity with the full text index |
|
842 """ |
|
843 # begin by inserting eid/type/source/extid into the entities table |
|
844 hook.CleanupNewEidsCacheOp.get_instance(cnx).add_data(entity.eid) |
|
845 self.system_source.add_info(cnx, entity, source, extid) |
|
846 |
|
847 def _delete_cascade_multi(self, cnx, entities): |
|
848 """same as _delete_cascade but accepts a list of entities with |
|
849 the same etype and belonging to the same source. |
|
850 """ |
|
851 pendingrtypes = cnx.transaction_data.get('pendingrtypes', ()) |
|
852 # delete remaining relations: if user can delete the entity, he can |
|
853 # delete all its relations without security checking |
|
854 with cnx.security_enabled(read=False, write=False): |
|
855 in_eids = ','.join([str(_e.eid) for _e in entities]) |
|
856 with cnx.running_hooks_ops(): |
|
857 for rschema, _, role in entities[0].e_schema.relation_definitions(): |
|
858 if rschema.rule: |
|
859 continue # computed relation |
|
860 rtype = rschema.type |
|
861 if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes: |
|
862 continue |
|
863 if role == 'subject': |
|
864 # don't skip inlined relation so they are regularly |
|
865 # deleted and so hooks are correctly called |
|
866 rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids) |
|
867 else: |
|
868 rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids) |
|
869 try: |
|
870 cnx.execute(rql, build_descr=False) |
|
871 except ValidationError: |
|
872 raise |
|
873 except Unauthorized: |
|
874 self.exception('Unauthorized exception while cascading delete for entity %s. ' |
|
875 'RQL: %s.\nThis should not happen since security is disabled here.', |
|
876 entities, rql) |
|
877 raise |
|
878 except Exception: |
|
879 if self.config.mode == 'test': |
|
880 raise |
|
881 self.exception('error while cascading delete for entity %s. RQL: %s', |
|
882 entities, rql) |
|
883 |
|
884 def init_entity_caches(self, cnx, entity, source): |
|
885 """add entity to connection entities cache and repo's extid cache. |
|
886 Return entity's ext id if the source isn't the system source. |
|
887 """ |
|
888 cnx.set_entity_cache(entity) |
|
889 if source.uri == 'system': |
|
890 extid = None |
|
891 else: |
|
892 extid = source.get_extid(entity) |
|
893 self._extid_cache[str(extid)] = entity.eid |
|
894 self._type_source_cache[entity.eid] = (entity.cw_etype, extid, source.uri) |
|
895 return extid |
|
896 |
|
897 def glob_add_entity(self, cnx, edited): |
|
898 """add an entity to the repository |
|
899 |
|
900 the entity eid should originally be None and a unique eid is assigned to |
|
901 the entity instance |
|
902 """ |
|
903 entity = edited.entity |
|
904 entity._cw_is_saved = False # entity has an eid but is not yet saved |
|
905 # init edited_attributes before calling before_add_entity hooks |
|
906 entity.cw_edited = edited |
|
907 source = self.system_source |
|
908 # allocate an eid to the entity before calling hooks |
|
909 entity.eid = self.system_source.create_eid(cnx) |
|
910 # set caches asap |
|
911 extid = self.init_entity_caches(cnx, entity, source) |
|
912 if server.DEBUG & server.DBG_REPO: |
|
913 print('ADD entity', self, entity.cw_etype, entity.eid, edited) |
|
914 prefill_entity_caches(entity) |
|
915 self.hm.call_hooks('before_add_entity', cnx, entity=entity) |
|
916 relations = preprocess_inlined_relations(cnx, entity) |
|
917 edited.set_defaults() |
|
918 if cnx.is_hook_category_activated('integrity'): |
|
919 edited.check(creation=True) |
|
920 self.add_info(cnx, entity, source, extid) |
|
921 try: |
|
922 source.add_entity(cnx, entity) |
|
923 except (UniqueTogetherError, ViolatedConstraint) as exc: |
|
924 userhdlr = cnx.vreg['adapters'].select( |
|
925 'IUserFriendlyError', cnx, entity=entity, exc=exc) |
|
926 userhdlr.raise_user_exception() |
|
927 edited.saved = entity._cw_is_saved = True |
|
928 # trigger after_add_entity after after_add_relation |
|
929 self.hm.call_hooks('after_add_entity', cnx, entity=entity) |
|
930 # call hooks for inlined relations |
|
931 for attr, value in relations: |
|
932 self.hm.call_hooks('before_add_relation', cnx, |
|
933 eidfrom=entity.eid, rtype=attr, eidto=value) |
|
934 self.hm.call_hooks('after_add_relation', cnx, |
|
935 eidfrom=entity.eid, rtype=attr, eidto=value) |
|
936 return entity.eid |
|
937 |
|
938 def glob_update_entity(self, cnx, edited): |
|
939 """replace an entity in the repository |
|
940 the type and the eid of an entity must not be changed |
|
941 """ |
|
942 entity = edited.entity |
|
943 if server.DEBUG & server.DBG_REPO: |
|
944 print('UPDATE entity', entity.cw_etype, entity.eid, |
|
945 entity.cw_attr_cache, edited) |
|
946 hm = self.hm |
|
947 eschema = entity.e_schema |
|
948 cnx.set_entity_cache(entity) |
|
949 orig_edited = getattr(entity, 'cw_edited', None) |
|
950 entity.cw_edited = edited |
|
951 source = self.system_source |
|
952 try: |
|
953 only_inline_rels, need_fti_update = True, False |
|
954 relations = [] |
|
955 for attr in list(edited): |
|
956 if attr == 'eid': |
|
957 continue |
|
958 rschema = eschema.subjrels[attr] |
|
959 if rschema.final: |
|
960 if getattr(eschema.rdef(attr), 'fulltextindexed', False): |
|
961 need_fti_update = True |
|
962 only_inline_rels = False |
|
963 else: |
|
964 # inlined relation |
|
965 previous_value = entity.related(attr) or None |
|
966 if previous_value is not None: |
|
967 previous_value = previous_value[0][0] # got a result set |
|
968 if previous_value == entity.cw_attr_cache[attr]: |
|
969 previous_value = None |
|
970 else: |
|
971 hm.call_hooks('before_delete_relation', cnx, |
|
972 eidfrom=entity.eid, rtype=attr, |
|
973 eidto=previous_value) |
|
974 relations.append((attr, edited[attr], previous_value)) |
|
975 # call hooks for inlined relations |
|
976 for attr, value, _t in relations: |
|
977 hm.call_hooks('before_add_relation', cnx, |
|
978 eidfrom=entity.eid, rtype=attr, eidto=value) |
|
979 if not only_inline_rels: |
|
980 hm.call_hooks('before_update_entity', cnx, entity=entity) |
|
981 if cnx.is_hook_category_activated('integrity'): |
|
982 edited.check() |
|
983 try: |
|
984 source.update_entity(cnx, entity) |
|
985 edited.saved = True |
|
986 except (UniqueTogetherError, ViolatedConstraint) as exc: |
|
987 userhdlr = cnx.vreg['adapters'].select( |
|
988 'IUserFriendlyError', cnx, entity=entity, exc=exc) |
|
989 userhdlr.raise_user_exception() |
|
990 self.system_source.update_info(cnx, entity, need_fti_update) |
|
991 if not only_inline_rels: |
|
992 hm.call_hooks('after_update_entity', cnx, entity=entity) |
|
993 for attr, value, prevvalue in relations: |
|
994 # if the relation is already cached, update existant cache |
|
995 relcache = entity.cw_relation_cached(attr, 'subject') |
|
996 if prevvalue is not None: |
|
997 hm.call_hooks('after_delete_relation', cnx, |
|
998 eidfrom=entity.eid, rtype=attr, eidto=prevvalue) |
|
999 if relcache is not None: |
|
1000 cnx.update_rel_cache_del(entity.eid, attr, prevvalue) |
|
1001 del_existing_rel_if_needed(cnx, entity.eid, attr, value) |
|
1002 cnx.update_rel_cache_add(entity.eid, attr, value) |
|
1003 hm.call_hooks('after_add_relation', cnx, |
|
1004 eidfrom=entity.eid, rtype=attr, eidto=value) |
|
1005 finally: |
|
1006 if orig_edited is not None: |
|
1007 entity.cw_edited = orig_edited |
|
1008 |
|
1009 |
|
1010 def glob_delete_entities(self, cnx, eids): |
|
1011 """delete a list of entities and all related entities from the repository""" |
|
1012 # mark eids as being deleted in cnx info and setup cache update |
|
1013 # operation (register pending eids before actual deletion to avoid |
|
1014 # multiple call to glob_delete_entities) |
|
1015 op = hook.CleanupDeletedEidsCacheOp.get_instance(cnx) |
|
1016 if not isinstance(eids, (set, frozenset)): |
|
1017 warn('[3.13] eids should be given as a set', DeprecationWarning, |
|
1018 stacklevel=2) |
|
1019 eids = frozenset(eids) |
|
1020 eids = eids - op._container |
|
1021 op._container |= eids |
|
1022 data_by_etype = {} # values are [list of entities] |
|
1023 # |
|
1024 # WARNING: the way this dictionary is populated is heavily optimized |
|
1025 # and does not use setdefault on purpose. Unless a new release |
|
1026 # of the Python interpreter advertises large perf improvements |
|
1027 # in setdefault, this should not be changed without profiling. |
|
1028 for eid in eids: |
|
1029 etype = self.type_from_eid(eid, cnx) |
|
1030 # XXX should cache entity's cw_metainformation |
|
1031 entity = cnx.entity_from_eid(eid, etype) |
|
1032 try: |
|
1033 data_by_etype[etype].append(entity) |
|
1034 except KeyError: |
|
1035 data_by_etype[etype] = [entity] |
|
1036 source = self.system_source |
|
1037 for etype, entities in data_by_etype.items(): |
|
1038 if server.DEBUG & server.DBG_REPO: |
|
1039 print('DELETE entities', etype, [entity.eid for entity in entities]) |
|
1040 self.hm.call_hooks('before_delete_entity', cnx, entities=entities) |
|
1041 self._delete_cascade_multi(cnx, entities) |
|
1042 source.delete_entities(cnx, entities) |
|
1043 source.delete_info_multi(cnx, entities) |
|
1044 self.hm.call_hooks('after_delete_entity', cnx, entities=entities) |
|
1045 # don't clear cache here, it is done in a hook on commit |
|
1046 |
|
1047 def glob_add_relation(self, cnx, subject, rtype, object): |
|
1048 """add a relation to the repository""" |
|
1049 self.glob_add_relations(cnx, {rtype: [(subject, object)]}) |
|
1050 |
|
1051 def glob_add_relations(self, cnx, relations): |
|
1052 """add several relations to the repository |
|
1053 |
|
1054 relations is a dictionary rtype: [(subj_eid, obj_eid), ...] |
|
1055 """ |
|
1056 source = self.system_source |
|
1057 relations_by_rtype = {} |
|
1058 subjects_by_types = {} |
|
1059 objects_by_types = {} |
|
1060 activintegrity = cnx.is_hook_category_activated('activeintegrity') |
|
1061 for rtype, eids_subj_obj in relations.items(): |
|
1062 if server.DEBUG & server.DBG_REPO: |
|
1063 for subjeid, objeid in eids_subj_obj: |
|
1064 print('ADD relation', subjeid, rtype, objeid) |
|
1065 for subjeid, objeid in eids_subj_obj: |
|
1066 if rtype in relations_by_rtype: |
|
1067 relations_by_rtype[rtype].append((subjeid, objeid)) |
|
1068 else: |
|
1069 relations_by_rtype[rtype] = [(subjeid, objeid)] |
|
1070 if not activintegrity: |
|
1071 continue |
|
1072 # take care to relation of cardinality '?1', as all eids will |
|
1073 # be inserted later, we've remove duplicated eids since they |
|
1074 # won't be caught by `del_existing_rel_if_needed` |
|
1075 rdef = cnx.rtype_eids_rdef(rtype, subjeid, objeid) |
|
1076 card = rdef.cardinality |
|
1077 if card[0] in '?1': |
|
1078 with cnx.security_enabled(read=False): |
|
1079 cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, ' |
|
1080 'NOT Y eid %%(y)s' % rtype, |
|
1081 {'x': subjeid, 'y': objeid}) |
|
1082 subjects = subjects_by_types.setdefault(rdef, {}) |
|
1083 if subjeid in subjects: |
|
1084 del relations_by_rtype[rtype][subjects[subjeid]] |
|
1085 subjects[subjeid] = len(relations_by_rtype[rtype]) - 1 |
|
1086 continue |
|
1087 subjects[subjeid] = len(relations_by_rtype[rtype]) - 1 |
|
1088 if card[1] in '?1': |
|
1089 with cnx.security_enabled(read=False): |
|
1090 cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, ' |
|
1091 'NOT X eid %%(x)s' % rtype, |
|
1092 {'x': subjeid, 'y': objeid}) |
|
1093 objects = objects_by_types.setdefault(rdef, {}) |
|
1094 if objeid in objects: |
|
1095 del relations_by_rtype[rtype][objects[objeid]] |
|
1096 objects[objeid] = len(relations_by_rtype[rtype]) |
|
1097 continue |
|
1098 objects[objeid] = len(relations_by_rtype[rtype]) |
|
1099 for rtype, source_relations in relations_by_rtype.items(): |
|
1100 self.hm.call_hooks('before_add_relation', cnx, |
|
1101 rtype=rtype, eids_from_to=source_relations) |
|
1102 for rtype, source_relations in relations_by_rtype.items(): |
|
1103 source.add_relations(cnx, rtype, source_relations) |
|
1104 rschema = self.schema.rschema(rtype) |
|
1105 for subjeid, objeid in source_relations: |
|
1106 cnx.update_rel_cache_add(subjeid, rtype, objeid, rschema.symmetric) |
|
1107 for rtype, source_relations in relations_by_rtype.items(): |
|
1108 self.hm.call_hooks('after_add_relation', cnx, |
|
1109 rtype=rtype, eids_from_to=source_relations) |
|
1110 |
|
1111 def glob_delete_relation(self, cnx, subject, rtype, object): |
|
1112 """delete a relation from the repository""" |
|
1113 if server.DEBUG & server.DBG_REPO: |
|
1114 print('DELETE relation', subject, rtype, object) |
|
1115 source = self.system_source |
|
1116 self.hm.call_hooks('before_delete_relation', cnx, |
|
1117 eidfrom=subject, rtype=rtype, eidto=object) |
|
1118 source.delete_relation(cnx, subject, rtype, object) |
|
1119 rschema = self.schema.rschema(rtype) |
|
1120 cnx.update_rel_cache_del(subject, rtype, object, rschema.symmetric) |
|
1121 self.hm.call_hooks('after_delete_relation', cnx, |
|
1122 eidfrom=subject, rtype=rtype, eidto=object) |
|
1123 |
|
1124 |
|
1125 |
|
1126 |
|
1127 # these are overridden by set_log_methods below |
|
1128 # only defining here to prevent pylint from complaining |
|
1129 info = warning = error = critical = exception = debug = lambda msg, *a, **kw: None |
|
1130 |
|
1131 from logging import getLogger |
|
1132 from cubicweb import set_log_methods |
|
1133 set_log_methods(Repository, getLogger('cubicweb.repository')) |