server/repository.py
changeset 11057 0b59724cb3f2
parent 11052 058bb3dc685f
child 11058 23eb30449fe5
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
     1 # copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """Defines the central class for the CubicWeb RQL server: the repository.
       
    19 
       
    20 The repository is an abstraction allowing execution of rql queries against
       
    21 data sources. Most of the work is actually done in helper classes. The
       
    22 repository mainly:
       
    23 
       
    24 * brings these classes all together to provide a single access
       
    25   point to a cubicweb instance.
       
    26 * handles session management
       
    27 """
       
    28 from __future__ import print_function
       
    29 
       
    30 __docformat__ = "restructuredtext en"
       
    31 
       
    32 import threading
       
    33 from warnings import warn
       
    34 from itertools import chain
       
    35 from time import time, localtime, strftime
       
    36 from contextlib import contextmanager
       
    37 
       
    38 from six.moves import range, queue
       
    39 
       
    40 from logilab.common.decorators import cached, clear_cache
       
    41 from logilab.common.deprecation import deprecated
       
    42 
       
    43 from yams import BadSchemaDefinition
       
    44 from rql.utils import rqlvar_maker
       
    45 
       
    46 from cubicweb import (CW_MIGRATION_MAP, QueryError,
       
    47                       UnknownEid, AuthenticationError, ExecutionError,
       
    48                       BadConnectionId, ValidationError, Unauthorized,
       
    49                       UniqueTogetherError, onevent, ViolatedConstraint)
       
    50 from cubicweb import cwvreg, schema, server
       
    51 from cubicweb.server import ShuttingDown, utils, hook, querier, sources
       
    52 from cubicweb.server.session import Session, InternalManager
       
    53 
       
    54 NO_CACHE_RELATIONS = set( [('owned_by', 'object'),
       
    55                            ('created_by', 'object'),
       
    56                            ('cw_source', 'object'),
       
    57                            ])
       
    58 
       
    59 def prefill_entity_caches(entity):
       
    60     cnx = entity._cw
       
    61     # prefill entity relation caches
       
    62     for rschema in entity.e_schema.subject_relations():
       
    63         rtype = str(rschema)
       
    64         if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS:
       
    65             continue
       
    66         if rschema.final:
       
    67             entity.cw_attr_cache.setdefault(rtype, None)
       
    68         else:
       
    69             entity.cw_set_relation_cache(rtype, 'subject',
       
    70                                          cnx.empty_rset())
       
    71     for rschema in entity.e_schema.object_relations():
       
    72         rtype = str(rschema)
       
    73         if rtype in schema.VIRTUAL_RTYPES or (rtype, 'object') in NO_CACHE_RELATIONS:
       
    74             continue
       
    75         entity.cw_set_relation_cache(rtype, 'object', cnx.empty_rset())
       
    76 
       
    77 def del_existing_rel_if_needed(cnx, eidfrom, rtype, eidto):
       
    78     """delete existing relation when adding a new one if card is 1 or ?
       
    79 
       
    80     have to be done once the new relation has been inserted to avoid having
       
    81     an entity without a relation for some time
       
    82 
       
    83     this kind of behaviour has to be done in the repository so we don't have
       
    84     hooks order hazardness
       
    85     """
       
    86     # skip that if integrity explicitly disabled
       
    87     if not cnx.is_hook_category_activated('activeintegrity'):
       
    88         return
       
    89     rdef = cnx.rtype_eids_rdef(rtype, eidfrom, eidto)
       
    90     card = rdef.cardinality
       
    91     # one may be tented to check for neweids but this may cause more than one
       
    92     # relation even with '1?'  cardinality if thoses relations are added in the
       
    93     # same transaction where the entity is being created. This never occurs from
       
    94     # the web interface but may occurs during test or dbapi connection (though
       
    95     # not expected for this).  So: don't do it, we pretend to ensure repository
       
    96     # consistency.
       
    97     #
       
    98     # notes:
       
    99     # * inlined relations will be implicitly deleted for the subject entity
       
   100     # * we don't want read permissions to be applied but we want delete
       
   101     #   permission to be checked
       
   102     if card[0] in '1?':
       
   103         with cnx.security_enabled(read=False):
       
   104             cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
       
   105                         'NOT Y eid %%(y)s' % rtype,
       
   106                         {'x': eidfrom, 'y': eidto})
       
   107     if card[1] in '1?':
       
   108         with cnx.security_enabled(read=False):
       
   109             cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
       
   110                         'NOT X eid %%(x)s' % rtype,
       
   111                         {'x': eidfrom, 'y': eidto})
       
   112 
       
   113 
       
   114 def preprocess_inlined_relations(cnx, entity):
       
   115     """when an entity is added, check if it has some inlined relation which
       
   116     requires to be extrated for proper call hooks
       
   117     """
       
   118     relations = []
       
   119     activeintegrity = cnx.is_hook_category_activated('activeintegrity')
       
   120     eschema = entity.e_schema
       
   121     for attr in entity.cw_edited:
       
   122         rschema = eschema.subjrels[attr]
       
   123         if not rschema.final: # inlined relation
       
   124             value = entity.cw_edited[attr]
       
   125             relations.append((attr, value))
       
   126             cnx.update_rel_cache_add(entity.eid, attr, value)
       
   127             rdef = cnx.rtype_eids_rdef(attr, entity.eid, value)
       
   128             if rdef.cardinality[1] in '1?' and activeintegrity:
       
   129                 with cnx.security_enabled(read=False):
       
   130                     cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr,
       
   131                                     {'x': entity.eid, 'y': value})
       
   132     return relations
       
   133 
       
   134 
       
   135 class NullEventBus(object):
       
   136     def publish(self, msg):
       
   137         pass
       
   138 
       
   139     def add_subscription(self, topic, callback):
       
   140         pass
       
   141 
       
   142     def start(self):
       
   143         pass
       
   144 
       
   145     def stop(self):
       
   146         pass
       
   147 
       
   148 
       
   149 class Repository(object):
       
   150     """a repository provides access to a set of persistent storages for
       
   151     entities and relations
       
   152     """
       
   153 
       
   154     def __init__(self, config, tasks_manager=None, vreg=None):
       
   155         self.config = config
       
   156         if vreg is None:
       
   157             vreg = cwvreg.CWRegistryStore(config)
       
   158         self.vreg = vreg
       
   159         self._tasks_manager = tasks_manager
       
   160 
       
   161         self.app_instances_bus = NullEventBus()
       
   162         self.info('starting repository from %s', self.config.apphome)
       
   163         # dictionary of opened sessions
       
   164         self._sessions = {}
       
   165 
       
   166         # list of functions to be called at regular interval
       
   167         # list of running threads
       
   168         self._running_threads = []
       
   169         # initial schema, should be build or replaced latter
       
   170         self.schema = schema.CubicWebSchema(config.appid)
       
   171         self.vreg.schema = self.schema # until actual schema is loaded...
       
   172         # shutdown flag
       
   173         self.shutting_down = False
       
   174         # sources (additional sources info in the system database)
       
   175         self.system_source = self.get_source('native', 'system',
       
   176                                              config.system_source_config.copy())
       
   177         self.sources_by_uri = {'system': self.system_source}
       
   178         # querier helper, need to be created after sources initialization
       
   179         self.querier = querier.QuerierHelper(self, self.schema)
       
   180         # cache eid -> (type, extid, actual source)
       
   181         self._type_source_cache = {}
       
   182         # cache extid -> eid
       
   183         self._extid_cache = {}
       
   184         # open some connection sets
       
   185         if config.init_cnxset_pool:
       
   186             self.init_cnxset_pool()
       
   187         # the hooks manager
       
   188         self.hm = hook.HooksManager(self.vreg)
       
   189         # registry hook to fix user class on registry reload
       
   190         @onevent('after-registry-reload', self)
       
   191         def fix_user_classes(self):
       
   192             # After registry reload the 'CWUser' class used for CWEtype
       
   193             # changed.  So any existing user object have a different class than
       
   194             # the new loaded one. We are hot fixing this.
       
   195             usercls = self.vreg['etypes'].etype_class('CWUser')
       
   196             for session in self._sessions.values():
       
   197                 if not isinstance(session.user, InternalManager):
       
   198                     session.user.__class__ = usercls
       
   199 
       
   200     def init_cnxset_pool(self):
       
   201         """should be called bootstrap_repository, as this is what it does"""
       
   202         config = self.config
       
   203         self._cnxsets_pool = queue.Queue()
       
   204         # 0. init a cnxset that will be used to fetch bootstrap information from
       
   205         #    the database
       
   206         self._cnxsets_pool.put_nowait(self.system_source.wrapped_connection())
       
   207         # 1. set used cubes
       
   208         if config.creating or not config.read_instance_schema:
       
   209             config.bootstrap_cubes()
       
   210         else:
       
   211             self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
       
   212             config.init_cubes(self.get_cubes())
       
   213         # 2. load schema
       
   214         if config.quick_start:
       
   215             # quick start: only to get a minimal repository to get cubes
       
   216             # information (eg dump/restore/...)
       
   217             #
       
   218             # restrict appobject_path to only load hooks and entity classes in
       
   219             # the registry
       
   220             config.cube_appobject_path = set(('hooks', 'entities'))
       
   221             config.cubicweb_appobject_path = set(('hooks', 'entities'))
       
   222             # limit connections pool to 1
       
   223             config['connections-pool-size'] = 1
       
   224         if config.quick_start or config.creating or not config.read_instance_schema:
       
   225             # load schema from the file system
       
   226             if not config.creating:
       
   227                 self.info("set fs instance'schema")
       
   228             self.set_schema(config.load_schema(expand_cubes=True))
       
   229         else:
       
   230             # normal start: load the instance schema from the database
       
   231             self.info('loading schema from the repository')
       
   232             self.set_schema(self.deserialize_schema())
       
   233         # 3. initialize data sources
       
   234         if config.creating:
       
   235             # call init_creating so that for instance native source can
       
   236             # configurate tsearch according to postgres version
       
   237             self.system_source.init_creating()
       
   238         else:
       
   239             self.init_sources_from_database()
       
   240             if 'CWProperty' in self.schema:
       
   241                 self.vreg.init_properties(self.properties())
       
   242         # 4. close initialization connection set and reopen fresh ones for
       
   243         #    proper initialization
       
   244         self._get_cnxset().close(True)
       
   245         self.cnxsets = [] # list of available cnxsets (can't iterate on a Queue)
       
   246         for i in range(config['connections-pool-size']):
       
   247             self.cnxsets.append(self.system_source.wrapped_connection())
       
   248             self._cnxsets_pool.put_nowait(self.cnxsets[-1])
       
   249 
       
   250     # internals ###############################################################
       
   251 
       
   252     def init_sources_from_database(self):
       
   253         self.sources_by_eid = {}
       
   254         if self.config.quick_start \
       
   255                or not 'CWSource' in self.schema: # # 3.10 migration
       
   256             self.system_source.init_creating()
       
   257             return
       
   258         with self.internal_cnx() as cnx:
       
   259             # FIXME: sources should be ordered (add_entity priority)
       
   260             for sourceent in cnx.execute(
       
   261                 'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
       
   262                 'S name SN, S type SA, S config SC').entities():
       
   263                 if sourceent.name == 'system':
       
   264                     self.system_source.eid = sourceent.eid
       
   265                     self.sources_by_eid[sourceent.eid] = self.system_source
       
   266                     self.system_source.init(True, sourceent)
       
   267                     continue
       
   268                 self.add_source(sourceent)
       
   269 
       
   270     def _clear_planning_caches(self):
       
   271         clear_cache(self, 'source_defs')
       
   272 
       
   273     def add_source(self, sourceent):
       
   274         try:
       
   275             source = self.get_source(sourceent.type, sourceent.name,
       
   276                                      sourceent.host_config, sourceent.eid)
       
   277         except RuntimeError:
       
   278             if self.config.repairing:
       
   279                 self.exception('cant setup source %s, skipped', sourceent.name)
       
   280                 return
       
   281             raise
       
   282         self.sources_by_eid[sourceent.eid] = source
       
   283         self.sources_by_uri[sourceent.name] = source
       
   284         if self.config.source_enabled(source):
       
   285             # call source's init method to complete their initialisation if
       
   286             # needed (for instance looking for persistent configuration using an
       
   287             # internal session, which is not possible until connections sets have been
       
   288             # initialized)
       
   289             source.init(True, sourceent)
       
   290         else:
       
   291             source.init(False, sourceent)
       
   292         self._clear_planning_caches()
       
   293 
       
   294     def remove_source(self, uri):
       
   295         source = self.sources_by_uri.pop(uri)
       
   296         del self.sources_by_eid[source.eid]
       
   297         self._clear_planning_caches()
       
   298 
       
   299     def get_source(self, type, uri, source_config, eid=None):
       
   300         # set uri and type in source config so it's available through
       
   301         # source_defs()
       
   302         source_config['uri'] = uri
       
   303         source_config['type'] = type
       
   304         return sources.get_source(type, source_config, self, eid)
       
   305 
       
   306     def set_schema(self, schema, resetvreg=True):
       
   307         self.info('set schema %s %#x', schema.name, id(schema))
       
   308         if resetvreg:
       
   309             # trigger full reload of all appobjects
       
   310             self.vreg.set_schema(schema)
       
   311         else:
       
   312             self.vreg._set_schema(schema)
       
   313         self.querier.set_schema(schema)
       
   314         for source in self.sources_by_uri.values():
       
   315             source.set_schema(schema)
       
   316         self.schema = schema
       
   317 
       
   318     def deserialize_schema(self):
       
   319         """load schema from the database"""
       
   320         from cubicweb.server.schemaserial import deserialize_schema
       
   321         appschema = schema.CubicWebSchema(self.config.appid)
       
   322         self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
       
   323         with self.internal_cnx() as cnx:
       
   324             try:
       
   325                 deserialize_schema(appschema, cnx)
       
   326             except BadSchemaDefinition:
       
   327                 raise
       
   328             except Exception as ex:
       
   329                 import traceback
       
   330                 traceback.print_exc()
       
   331                 raise Exception('Is the database initialised ? (cause: %s)' % ex)
       
   332         return appschema
       
   333 
       
   334     def _prepare_startup(self):
       
   335         """Prepare "Repository as a server" for startup.
       
   336 
       
   337         * trigger server startup hook,
       
   338         * register session clean up task.
       
   339         """
       
   340         if not (self.config.creating or self.config.repairing
       
   341                 or self.config.quick_start):
       
   342             # call instance level initialisation hooks
       
   343             self.hm.call_hooks('server_startup', repo=self)
       
   344             # register a task to cleanup expired session
       
   345             self.cleanup_session_time = self.config['cleanup-session-time'] or 60 * 60 * 24
       
   346             assert self.cleanup_session_time > 0
       
   347             cleanup_session_interval = min(60*60, self.cleanup_session_time / 3)
       
   348             assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
       
   349             self._tasks_manager.add_looping_task(cleanup_session_interval,
       
   350                                                  self.clean_sessions)
       
   351 
       
   352     def start_looping_tasks(self):
       
   353         """Actual "Repository as a server" startup.
       
   354 
       
   355         * trigger server startup hook,
       
   356         * register session clean up task,
       
   357         * start all tasks.
       
   358 
       
   359         XXX Other startup related stuffs are done elsewhere. In Repository
       
   360         XXX __init__ or in external codes (various server managers).
       
   361         """
       
   362         self._prepare_startup()
       
   363         assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
       
   364         self._tasks_manager.start()
       
   365 
       
   366     def looping_task(self, interval, func, *args):
       
   367         """register a function to be called every `interval` seconds.
       
   368 
       
   369         looping tasks can only be registered during repository initialization,
       
   370         once done this method will fail.
       
   371         """
       
   372         assert self._tasks_manager is not None, "This Repository is not intended to be used as a server"
       
   373         self._tasks_manager.add_looping_task(interval, func, *args)
       
   374 
       
   375     def threaded_task(self, func):
       
   376         """start function in a separated thread"""
       
   377         utils.RepoThread(func, self._running_threads).start()
       
   378 
       
   379     #@locked
       
   380     def _get_cnxset(self):
       
   381         try:
       
   382             return self._cnxsets_pool.get(True, timeout=5)
       
   383         except queue.Empty:
       
   384             raise Exception('no connections set available after 5 secs, probably either a '
       
   385                             'bug in code (too many uncommited/rolled back '
       
   386                             'connections) or too much load on the server (in '
       
   387                             'which case you can try to set a bigger '
       
   388                             'connections pool size)')
       
   389 
       
   390     def _free_cnxset(self, cnxset):
       
   391         self._cnxsets_pool.put_nowait(cnxset)
       
   392 
       
   393     def shutdown(self):
       
   394         """called on server stop event to properly close opened sessions and
       
   395         connections
       
   396         """
       
   397         assert not self.shutting_down, 'already shutting down'
       
   398         if not (self.config.creating or self.config.repairing
       
   399                 or self.config.quick_start):
       
   400             # then, the system source is still available
       
   401             self.hm.call_hooks('before_server_shutdown', repo=self)
       
   402         self.shutting_down = True
       
   403         self.system_source.shutdown()
       
   404         if self._tasks_manager is not None:
       
   405             self._tasks_manager.stop()
       
   406         if not (self.config.creating or self.config.repairing
       
   407                 or self.config.quick_start):
       
   408             self.hm.call_hooks('server_shutdown', repo=self)
       
   409         for thread in self._running_threads:
       
   410             self.info('waiting thread %s...', thread.getName())
       
   411             thread.join()
       
   412             self.info('thread %s finished', thread.getName())
       
   413         self.close_sessions()
       
   414         while not self._cnxsets_pool.empty():
       
   415             cnxset = self._cnxsets_pool.get_nowait()
       
   416             try:
       
   417                 cnxset.close(True)
       
   418             except Exception:
       
   419                 self.exception('error while closing %s' % cnxset)
       
   420                 continue
       
   421         hits, misses = self.querier.cache_hit, self.querier.cache_miss
       
   422         try:
       
   423             self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
       
   424                       (hits * 100) / (hits + misses))
       
   425             hits, misses = self.system_source.cache_hit, self.system_source.cache_miss
       
   426             self.info('sql cache hit/miss: %s/%s (%s%% hits)', hits, misses,
       
   427                       (hits * 100) / (hits + misses))
       
   428             nocache  = self.system_source.no_cache
       
   429             self.info('sql cache usage: %s/%s (%s%%)', hits+ misses, nocache,
       
   430                       ((hits + misses) * 100) / (hits + misses + nocache))
       
   431         except ZeroDivisionError:
       
   432             pass
       
   433 
       
   434     def check_auth_info(self, cnx, login, authinfo):
       
   435         """validate authentication, raise AuthenticationError on failure, return
       
   436         associated CWUser's eid on success.
       
   437         """
       
   438         # iter on sources_by_uri then check enabled source since sources doesn't
       
   439         # contain copy based sources
       
   440         for source in self.sources_by_uri.values():
       
   441             if self.config.source_enabled(source) and source.support_entity('CWUser'):
       
   442                 try:
       
   443                     return source.authenticate(cnx, login, **authinfo)
       
   444                 except AuthenticationError:
       
   445                     continue
       
   446         else:
       
   447             raise AuthenticationError('authentication failed with all sources')
       
   448 
       
   449     def authenticate_user(self, cnx, login, **authinfo):
       
   450         """validate login / password, raise AuthenticationError on failure
       
   451         return associated CWUser instance on success
       
   452         """
       
   453         eid = self.check_auth_info(cnx, login, authinfo)
       
   454         cwuser = self._build_user(cnx, eid)
       
   455         if self.config.consider_user_state and \
       
   456                not cwuser.cw_adapt_to('IWorkflowable').state in cwuser.AUTHENTICABLE_STATES:
       
   457             raise AuthenticationError('user is not in authenticable state')
       
   458         return cwuser
       
   459 
       
   460     def _build_user(self, cnx, eid):
       
   461         """return a CWUser entity for user with the given eid"""
       
   462         cls = self.vreg['etypes'].etype_class('CWUser')
       
   463         st = cls.fetch_rqlst(cnx.user, ordermethod=None)
       
   464         st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
       
   465         rset = cnx.execute(st.as_string(), {'x': eid})
       
   466         assert len(rset) == 1, rset
       
   467         cwuser = rset.get_entity(0, 0)
       
   468         # pylint: disable=W0104
       
   469         # prefetch / cache cwuser's groups and properties. This is especially
       
   470         # useful for internal sessions to avoid security insertions
       
   471         cwuser.groups
       
   472         cwuser.properties
       
   473         return cwuser
       
   474 
       
   475     # public (dbapi) interface ################################################
       
   476 
       
   477     @deprecated("[3.19] use _cw.call_service('repo_stats')")
       
   478     def stats(self): # XXX restrict to managers session?
       
   479         """Return a dictionary containing some statistics about the repository
       
   480         resources usage.
       
   481 
       
   482         This is a public method, not requiring a session id.
       
   483 
       
   484         This method is deprecated in favor of using _cw.call_service('repo_stats')
       
   485         """
       
   486         with self.internal_cnx() as cnx:
       
   487             return cnx.call_service('repo_stats')
       
   488 
       
   489     @deprecated("[3.19] use _cw.call_service('repo_gc_stats')")
       
   490     def gc_stats(self, nmax=20):
       
   491         """Return a dictionary containing some statistics about the repository
       
   492         memory usage.
       
   493 
       
   494         This is a public method, not requiring a session id.
       
   495 
       
   496         nmax is the max number of (most) referenced object returned as
       
   497         the 'referenced' result
       
   498         """
       
   499         with self.internal_cnx() as cnx:
       
   500             return cnx.call_service('repo_gc_stats', nmax=nmax)
       
   501 
       
   502     def get_schema(self):
       
   503         """Return the instance schema.
       
   504 
       
   505         This is a public method, not requiring a session id.
       
   506         """
       
   507         return self.schema
       
   508 
       
   509     def get_cubes(self):
       
   510         """Return the list of cubes used by this instance.
       
   511 
       
   512         This is a public method, not requiring a session id.
       
   513         """
       
   514         versions = self.get_versions(not (self.config.creating
       
   515                                           or self.config.repairing
       
   516                                           or self.config.quick_start
       
   517                                           or self.config.mode == 'test'))
       
   518         cubes = list(versions)
       
   519         cubes.remove('cubicweb')
       
   520         return cubes
       
   521 
       
   522     def get_option_value(self, option, foreid=None):
       
   523         """Return the value for `option` in the configuration.
       
   524 
       
   525         This is a public method, not requiring a session id.
       
   526 
       
   527         `foreid` argument is deprecated and now useless (as of 3.19).
       
   528         """
       
   529         if foreid is not None:
       
   530             warn('[3.19] foreid argument is deprecated', DeprecationWarning,
       
   531                  stacklevel=2)
       
   532         # XXX we may want to check we don't give sensible information
       
   533         return self.config[option]
       
   534 
       
   535     @cached
       
   536     def get_versions(self, checkversions=False):
       
   537         """Return the a dictionary containing cubes used by this instance
       
   538         as key with their version as value, including cubicweb version.
       
   539 
       
   540         This is a public method, not requiring a session id.
       
   541         """
       
   542         from logilab.common.changelog import Version
       
   543         vcconf = {}
       
   544         with self.internal_cnx() as cnx:
       
   545             for pk, version in cnx.execute(
       
   546                 'Any K,V WHERE P is CWProperty, P value V, P pkey K, '
       
   547                 'P pkey ~="system.version.%"', build_descr=False):
       
   548                 cube = pk.split('.')[-1]
       
   549                 # XXX cubicweb migration
       
   550                 if cube in CW_MIGRATION_MAP:
       
   551                     cube = CW_MIGRATION_MAP[cube]
       
   552                 version = Version(version)
       
   553                 vcconf[cube] = version
       
   554                 if checkversions:
       
   555                     if cube != 'cubicweb':
       
   556                         fsversion = self.config.cube_version(cube)
       
   557                     else:
       
   558                         fsversion = self.config.cubicweb_version()
       
   559                     if version < fsversion:
       
   560                         msg = ('instance has %s version %s but %s '
       
   561                                'is installed. Run "cubicweb-ctl upgrade".')
       
   562                         raise ExecutionError(msg % (cube, version, fsversion))
       
   563         return vcconf
       
   564 
       
   565     @cached
       
   566     def source_defs(self):
       
   567         """Return the a dictionary containing source uris as value and a
       
   568         dictionary describing each source as value.
       
   569 
       
   570         This is a public method, not requiring a session id.
       
   571         """
       
   572         sources = {}
       
   573         # remove sensitive information
       
   574         for uri, source in self.sources_by_uri.items():
       
   575             sources[uri] = source.public_config
       
   576         return sources
       
   577 
       
   578     def properties(self):
       
   579         """Return a result set containing system wide properties.
       
   580 
       
   581         This is a public method, not requiring a session id.
       
   582         """
       
   583         with self.internal_cnx() as cnx:
       
   584             # don't use cnx.execute, we don't want rset.req set
       
   585             return self.querier.execute(cnx, 'Any K,V WHERE P is CWProperty,'
       
   586                                         'P pkey K, P value V, NOT P for_user U',
       
   587                                         build_descr=False)
       
   588 
       
   589     @deprecated("[3.19] Use session.call_service('register_user') instead'")
       
   590     def register_user(self, login, password, email=None, **kwargs):
       
   591         """check a user with the given login exists, if not create it with the
       
   592         given password. This method is designed to be used for anonymous
       
   593         registration on public web site.
       
   594         """
       
   595         with self.internal_cnx() as cnx:
       
   596             cnx.call_service('register_user', login=login, password=password,
       
   597                              email=email, **kwargs)
       
   598             cnx.commit()
       
   599 
       
   600     def find_users(self, fetch_attrs, **query_attrs):
       
   601         """yield user attributes for cwusers matching the given query_attrs
       
   602         (the result set cannot survive this method call)
       
   603 
       
   604         This can be used by low-privileges account (anonymous comes to
       
   605         mind).
       
   606 
       
   607         `fetch_attrs`: tuple of attributes to be fetched
       
   608         `query_attrs`: dict of attr/values to restrict the query
       
   609         """
       
   610         assert query_attrs
       
   611         if not hasattr(self, '_cwuser_attrs'):
       
   612             cwuser = self.schema['CWUser']
       
   613             self._cwuser_attrs = set(str(rschema)
       
   614                                      for rschema, _eschema in cwuser.attribute_definitions()
       
   615                                      if not rschema.meta)
       
   616         cwuserattrs = self._cwuser_attrs
       
   617         for k in chain(fetch_attrs, query_attrs):
       
   618             if k not in cwuserattrs:
       
   619                 raise Exception('bad input for find_user')
       
   620         with self.internal_cnx() as cnx:
       
   621             varmaker = rqlvar_maker()
       
   622             vars = [(attr, next(varmaker)) for attr in fetch_attrs]
       
   623             rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars)
       
   624             rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ','
       
   625             rset = cnx.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr)
       
   626                                               for attr in query_attrs),
       
   627                                query_attrs)
       
   628             return rset.rows
       
   629 
       
   630     def new_session(self, login, **kwargs):
       
   631         """open a new session for a given user
       
   632 
       
   633         raise `AuthenticationError` if the authentication failed
       
   634         raise `ConnectionError` if we can't open a connection
       
   635         """
       
   636         cnxprops = kwargs.pop('cnxprops', None)
       
   637         # use an internal connection
       
   638         with self.internal_cnx() as cnx:
       
   639             # try to get a user object
       
   640             user = self.authenticate_user(cnx, login, **kwargs)
       
   641         session = Session(user, self, cnxprops)
       
   642         user._cw = user.cw_rset.req = session
       
   643         user.cw_clear_relation_cache()
       
   644         self._sessions[session.sessionid] = session
       
   645         self.info('opened session %s for user %s', session.sessionid, login)
       
   646         with session.new_cnx() as cnx:
       
   647             self.hm.call_hooks('session_open', cnx)
       
   648             # commit connection at this point in case write operation has been
       
   649             # done during `session_open` hooks
       
   650             cnx.commit()
       
   651         return session
       
   652 
       
   653     def connect(self, login, **kwargs):
       
   654         """open a new session for a given user and return its sessionid """
       
   655         return self.new_session(login, **kwargs).sessionid
       
   656 
       
   657     def close(self, sessionid, txid=None, checkshuttingdown=True):
       
   658         """close the session with the given id"""
       
   659         session = self._get_session(sessionid, txid=txid,
       
   660                                     checkshuttingdown=checkshuttingdown)
       
   661         # operation uncommited before close are rolled back before hook is called
       
   662         with session.new_cnx() as cnx:
       
   663             self.hm.call_hooks('session_close', cnx)
       
   664             # commit connection at this point in case write operation has been
       
   665             # done during `session_close` hooks
       
   666             cnx.commit()
       
   667         session.close()
       
   668         del self._sessions[sessionid]
       
   669         self.info('closed session %s for user %s', sessionid, session.user.login)
       
   670 
       
   671     # session handling ########################################################
       
   672 
       
   673     def close_sessions(self):
       
   674         """close every opened sessions"""
       
   675         for sessionid in list(self._sessions):
       
   676             try:
       
   677                 self.close(sessionid, checkshuttingdown=False)
       
   678             except Exception: # XXX BaseException?
       
   679                 self.exception('error while closing session %s' % sessionid)
       
   680 
       
   681     def clean_sessions(self):
       
   682         """close sessions not used since an amount of time specified in the
       
   683         configuration
       
   684         """
       
   685         mintime = time() - self.cleanup_session_time
       
   686         self.debug('cleaning session unused since %s',
       
   687                    strftime('%H:%M:%S', localtime(mintime)))
       
   688         nbclosed = 0
       
   689         for session in self._sessions.values():
       
   690             if session.timestamp < mintime:
       
   691                 self.close(session.sessionid)
       
   692                 nbclosed += 1
       
   693         return nbclosed
       
   694 
       
   695     @contextmanager
       
   696     def internal_cnx(self):
       
   697         """Context manager returning a Connection using internal user which have
       
   698         every access rights on the repository.
       
   699 
       
   700         Beware that unlike the older :meth:`internal_session`, internal
       
   701         connections have all hooks beside security enabled.
       
   702         """
       
   703         with Session(InternalManager(), self) as session:
       
   704             with session.new_cnx() as cnx:
       
   705                 cnx.user._cw = cnx  # XXX remove when "vreg = user._cw.vreg"
       
   706                                     # hack in entity.py is gone
       
   707                 with cnx.security_enabled(read=False, write=False):
       
   708                     yield cnx
       
   709 
       
   710     def _get_session(self, sessionid, txid=None, checkshuttingdown=True):
       
   711         """return the session associated with the given session identifier"""
       
   712         if checkshuttingdown and self.shutting_down:
       
   713             raise ShuttingDown('Repository is shutting down')
       
   714         try:
       
   715             session = self._sessions[sessionid]
       
   716         except KeyError:
       
   717             raise BadConnectionId('No such session %s' % sessionid)
       
   718         return session
       
   719 
       
   720     # data sources handling ###################################################
       
   721     # * correspondance between eid and (type, source)
       
   722     # * correspondance between eid and local id (i.e. specific to a given source)
       
   723 
       
   724     def type_and_source_from_eid(self, eid, cnx):
       
   725         """return a tuple `(type, extid, actual source uri)` for the entity of
       
   726         the given `eid`
       
   727         """
       
   728         try:
       
   729             eid = int(eid)
       
   730         except ValueError:
       
   731             raise UnknownEid(eid)
       
   732         try:
       
   733             return self._type_source_cache[eid]
       
   734         except KeyError:
       
   735             etype, extid, auri = self.system_source.eid_type_source(cnx, eid)
       
   736             self._type_source_cache[eid] = (etype, extid, auri)
       
   737             return etype, extid, auri
       
   738 
       
   739     def clear_caches(self, eids):
       
   740         etcache = self._type_source_cache
       
   741         extidcache = self._extid_cache
       
   742         rqlcache = self.querier._rql_cache
       
   743         for eid in eids:
       
   744             try:
       
   745                 etype, extid, auri = etcache.pop(int(eid)) # may be a string in some cases
       
   746                 rqlcache.pop( ('%s X WHERE X eid %s' % (etype, eid),), None)
       
   747                 extidcache.pop(extid, None)
       
   748             except KeyError:
       
   749                 etype = None
       
   750             rqlcache.pop( ('Any X WHERE X eid %s' % eid,), None)
       
   751             self.system_source.clear_eid_cache(eid, etype)
       
   752 
       
   753     def type_from_eid(self, eid, cnx):
       
   754         """return the type of the entity with id <eid>"""
       
   755         return self.type_and_source_from_eid(eid, cnx)[0]
       
   756 
       
   757     def querier_cache_key(self, cnx, rql, args, eidkeys):
       
   758         cachekey = [rql]
       
   759         for key in sorted(eidkeys):
       
   760             try:
       
   761                 etype = self.type_from_eid(args[key], cnx)
       
   762             except KeyError:
       
   763                 raise QueryError('bad cache key %s (no value)' % key)
       
   764             except TypeError:
       
   765                 raise QueryError('bad cache key %s (value: %r)' % (
       
   766                     key, args[key]))
       
   767             cachekey.append(etype)
       
   768             # ensure eid is correctly typed in args
       
   769             args[key] = int(args[key])
       
   770         return tuple(cachekey)
       
   771 
       
   772     @deprecated('[3.22] use the new store API')
       
   773     def extid2eid(self, source, extid, etype, cnx, insert=True,
       
   774                   sourceparams=None):
       
   775         """Return eid from a local id. If the eid is a negative integer, that
       
   776         means the entity is known but has been copied back to the system source
       
   777         hence should be ignored.
       
   778 
       
   779         If no record is found, ie the entity is not known yet:
       
   780 
       
   781         1. an eid is attributed
       
   782 
       
   783         2. the source's :meth:`before_entity_insertion` method is called to
       
   784            build the entity instance
       
   785 
       
   786         3. unless source's :attr:`should_call_hooks` tell otherwise,
       
   787           'before_add_entity' hooks are called
       
   788 
       
   789         4. record is added into the system source
       
   790 
       
   791         5. the source's :meth:`after_entity_insertion` method is called to
       
   792            complete building of the entity instance
       
   793 
       
   794         6. unless source's :attr:`should_call_hooks` tell otherwise,
       
   795           'before_add_entity' hooks are called
       
   796         """
       
   797         try:
       
   798             return self._extid_cache[extid]
       
   799         except KeyError:
       
   800             pass
       
   801         eid = self.system_source.extid2eid(cnx, extid)
       
   802         if eid is not None:
       
   803             self._extid_cache[extid] = eid
       
   804             self._type_source_cache[eid] = (etype, extid, source.uri)
       
   805             return eid
       
   806         if not insert:
       
   807             return
       
   808         # no link between extid and eid, create one
       
   809         # write query, ensure connection's mode is 'write' so connections
       
   810         # won't be released until commit/rollback
       
   811         try:
       
   812             eid = self.system_source.create_eid(cnx)
       
   813             self._extid_cache[extid] = eid
       
   814             self._type_source_cache[eid] = (etype, extid, source.uri)
       
   815             entity = source.before_entity_insertion(
       
   816                 cnx, extid, etype, eid, sourceparams)
       
   817             if source.should_call_hooks:
       
   818                 # get back a copy of operation for later restore if
       
   819                 # necessary, see below
       
   820                 pending_operations = cnx.pending_operations[:]
       
   821                 self.hm.call_hooks('before_add_entity', cnx, entity=entity)
       
   822             self.add_info(cnx, entity, source, extid)
       
   823             source.after_entity_insertion(cnx, extid, entity, sourceparams)
       
   824             if source.should_call_hooks:
       
   825                 self.hm.call_hooks('after_add_entity', cnx, entity=entity)
       
   826             return eid
       
   827         except Exception:
       
   828             # XXX do some cleanup manually so that the transaction has a
       
   829             # chance to be commited, with simply this entity discarded
       
   830             self._extid_cache.pop(extid, None)
       
   831             self._type_source_cache.pop(eid, None)
       
   832             if 'entity' in locals():
       
   833                 hook.CleanupDeletedEidsCacheOp.get_instance(cnx).add_data(entity.eid)
       
   834                 self.system_source.delete_info_multi(cnx, [entity])
       
   835                 if source.should_call_hooks:
       
   836                     cnx.pending_operations = pending_operations
       
   837             raise
       
   838 
       
   839     def add_info(self, cnx, entity, source, extid=None):
       
   840         """add type and source info for an eid into the system table,
       
   841         and index the entity with the full text index
       
   842         """
       
   843         # begin by inserting eid/type/source/extid into the entities table
       
   844         hook.CleanupNewEidsCacheOp.get_instance(cnx).add_data(entity.eid)
       
   845         self.system_source.add_info(cnx, entity, source, extid)
       
   846 
       
   847     def _delete_cascade_multi(self, cnx, entities):
       
   848         """same as _delete_cascade but accepts a list of entities with
       
   849         the same etype and belonging to the same source.
       
   850         """
       
   851         pendingrtypes = cnx.transaction_data.get('pendingrtypes', ())
       
   852         # delete remaining relations: if user can delete the entity, he can
       
   853         # delete all its relations without security checking
       
   854         with cnx.security_enabled(read=False, write=False):
       
   855             in_eids = ','.join([str(_e.eid) for _e in entities])
       
   856             with cnx.running_hooks_ops():
       
   857                 for rschema, _, role in entities[0].e_schema.relation_definitions():
       
   858                     if rschema.rule:
       
   859                         continue # computed relation
       
   860                     rtype = rschema.type
       
   861                     if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
       
   862                         continue
       
   863                     if role == 'subject':
       
   864                         # don't skip inlined relation so they are regularly
       
   865                         # deleted and so hooks are correctly called
       
   866                         rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids)
       
   867                     else:
       
   868                         rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids)
       
   869                     try:
       
   870                         cnx.execute(rql, build_descr=False)
       
   871                     except ValidationError:
       
   872                         raise
       
   873                     except Unauthorized:
       
   874                         self.exception('Unauthorized exception while cascading delete for entity %s. '
       
   875                                        'RQL: %s.\nThis should not happen since security is disabled here.',
       
   876                                        entities, rql)
       
   877                         raise
       
   878                     except Exception:
       
   879                         if self.config.mode == 'test':
       
   880                             raise
       
   881                         self.exception('error while cascading delete for entity %s. RQL: %s',
       
   882                                        entities, rql)
       
   883 
       
   884     def init_entity_caches(self, cnx, entity, source):
       
   885         """add entity to connection entities cache and repo's extid cache.
       
   886         Return entity's ext id if the source isn't the system source.
       
   887         """
       
   888         cnx.set_entity_cache(entity)
       
   889         if source.uri == 'system':
       
   890             extid = None
       
   891         else:
       
   892             extid = source.get_extid(entity)
       
   893             self._extid_cache[str(extid)] = entity.eid
       
   894         self._type_source_cache[entity.eid] = (entity.cw_etype, extid, source.uri)
       
   895         return extid
       
   896 
       
   897     def glob_add_entity(self, cnx, edited):
       
   898         """add an entity to the repository
       
   899 
       
   900         the entity eid should originally be None and a unique eid is assigned to
       
   901         the entity instance
       
   902         """
       
   903         entity = edited.entity
       
   904         entity._cw_is_saved = False # entity has an eid but is not yet saved
       
   905         # init edited_attributes before calling before_add_entity hooks
       
   906         entity.cw_edited = edited
       
   907         source = self.system_source
       
   908         # allocate an eid to the entity before calling hooks
       
   909         entity.eid = self.system_source.create_eid(cnx)
       
   910         # set caches asap
       
   911         extid = self.init_entity_caches(cnx, entity, source)
       
   912         if server.DEBUG & server.DBG_REPO:
       
   913             print('ADD entity', self, entity.cw_etype, entity.eid, edited)
       
   914         prefill_entity_caches(entity)
       
   915         self.hm.call_hooks('before_add_entity', cnx, entity=entity)
       
   916         relations = preprocess_inlined_relations(cnx, entity)
       
   917         edited.set_defaults()
       
   918         if cnx.is_hook_category_activated('integrity'):
       
   919             edited.check(creation=True)
       
   920         self.add_info(cnx, entity, source, extid)
       
   921         try:
       
   922             source.add_entity(cnx, entity)
       
   923         except (UniqueTogetherError, ViolatedConstraint) as exc:
       
   924             userhdlr = cnx.vreg['adapters'].select(
       
   925                 'IUserFriendlyError', cnx, entity=entity, exc=exc)
       
   926             userhdlr.raise_user_exception()
       
   927         edited.saved = entity._cw_is_saved = True
       
   928         # trigger after_add_entity after after_add_relation
       
   929         self.hm.call_hooks('after_add_entity', cnx, entity=entity)
       
   930         # call hooks for inlined relations
       
   931         for attr, value in relations:
       
   932             self.hm.call_hooks('before_add_relation', cnx,
       
   933                                 eidfrom=entity.eid, rtype=attr, eidto=value)
       
   934             self.hm.call_hooks('after_add_relation', cnx,
       
   935                                 eidfrom=entity.eid, rtype=attr, eidto=value)
       
   936         return entity.eid
       
   937 
       
   938     def glob_update_entity(self, cnx, edited):
       
   939         """replace an entity in the repository
       
   940         the type and the eid of an entity must not be changed
       
   941         """
       
   942         entity = edited.entity
       
   943         if server.DEBUG & server.DBG_REPO:
       
   944             print('UPDATE entity', entity.cw_etype, entity.eid,
       
   945                   entity.cw_attr_cache, edited)
       
   946         hm = self.hm
       
   947         eschema = entity.e_schema
       
   948         cnx.set_entity_cache(entity)
       
   949         orig_edited = getattr(entity, 'cw_edited', None)
       
   950         entity.cw_edited = edited
       
   951         source = self.system_source
       
   952         try:
       
   953             only_inline_rels, need_fti_update = True, False
       
   954             relations = []
       
   955             for attr in list(edited):
       
   956                 if attr == 'eid':
       
   957                     continue
       
   958                 rschema = eschema.subjrels[attr]
       
   959                 if rschema.final:
       
   960                     if getattr(eschema.rdef(attr), 'fulltextindexed', False):
       
   961                         need_fti_update = True
       
   962                     only_inline_rels = False
       
   963                 else:
       
   964                     # inlined relation
       
   965                     previous_value = entity.related(attr) or None
       
   966                     if previous_value is not None:
       
   967                         previous_value = previous_value[0][0] # got a result set
       
   968                         if previous_value == entity.cw_attr_cache[attr]:
       
   969                             previous_value = None
       
   970                         else:
       
   971                             hm.call_hooks('before_delete_relation', cnx,
       
   972                                           eidfrom=entity.eid, rtype=attr,
       
   973                                           eidto=previous_value)
       
   974                     relations.append((attr, edited[attr], previous_value))
       
   975             # call hooks for inlined relations
       
   976             for attr, value, _t in relations:
       
   977                 hm.call_hooks('before_add_relation', cnx,
       
   978                               eidfrom=entity.eid, rtype=attr, eidto=value)
       
   979             if not only_inline_rels:
       
   980                 hm.call_hooks('before_update_entity', cnx, entity=entity)
       
   981             if cnx.is_hook_category_activated('integrity'):
       
   982                 edited.check()
       
   983             try:
       
   984                 source.update_entity(cnx, entity)
       
   985                 edited.saved = True
       
   986             except (UniqueTogetherError, ViolatedConstraint) as exc:
       
   987                 userhdlr = cnx.vreg['adapters'].select(
       
   988                     'IUserFriendlyError', cnx, entity=entity, exc=exc)
       
   989                 userhdlr.raise_user_exception()
       
   990             self.system_source.update_info(cnx, entity, need_fti_update)
       
   991             if not only_inline_rels:
       
   992                 hm.call_hooks('after_update_entity', cnx, entity=entity)
       
   993             for attr, value, prevvalue in relations:
       
   994                 # if the relation is already cached, update existant cache
       
   995                 relcache = entity.cw_relation_cached(attr, 'subject')
       
   996                 if prevvalue is not None:
       
   997                     hm.call_hooks('after_delete_relation', cnx,
       
   998                                   eidfrom=entity.eid, rtype=attr, eidto=prevvalue)
       
   999                     if relcache is not None:
       
  1000                         cnx.update_rel_cache_del(entity.eid, attr, prevvalue)
       
  1001                 del_existing_rel_if_needed(cnx, entity.eid, attr, value)
       
  1002                 cnx.update_rel_cache_add(entity.eid, attr, value)
       
  1003                 hm.call_hooks('after_add_relation', cnx,
       
  1004                               eidfrom=entity.eid, rtype=attr, eidto=value)
       
  1005         finally:
       
  1006             if orig_edited is not None:
       
  1007                 entity.cw_edited = orig_edited
       
  1008 
       
  1009 
       
  1010     def glob_delete_entities(self, cnx, eids):
       
  1011         """delete a list of  entities and all related entities from the repository"""
       
  1012         # mark eids as being deleted in cnx info and setup cache update
       
  1013         # operation (register pending eids before actual deletion to avoid
       
  1014         # multiple call to glob_delete_entities)
       
  1015         op = hook.CleanupDeletedEidsCacheOp.get_instance(cnx)
       
  1016         if not isinstance(eids, (set, frozenset)):
       
  1017             warn('[3.13] eids should be given as a set', DeprecationWarning,
       
  1018                  stacklevel=2)
       
  1019             eids = frozenset(eids)
       
  1020         eids = eids - op._container
       
  1021         op._container |= eids
       
  1022         data_by_etype = {} # values are [list of entities]
       
  1023         #
       
  1024         # WARNING: the way this dictionary is populated is heavily optimized
       
  1025         # and does not use setdefault on purpose. Unless a new release
       
  1026         # of the Python interpreter advertises large perf improvements
       
  1027         # in setdefault, this should not be changed without profiling.
       
  1028         for eid in eids:
       
  1029             etype = self.type_from_eid(eid, cnx)
       
  1030             # XXX should cache entity's cw_metainformation
       
  1031             entity = cnx.entity_from_eid(eid, etype)
       
  1032             try:
       
  1033                 data_by_etype[etype].append(entity)
       
  1034             except KeyError:
       
  1035                 data_by_etype[etype] = [entity]
       
  1036         source = self.system_source
       
  1037         for etype, entities in data_by_etype.items():
       
  1038             if server.DEBUG & server.DBG_REPO:
       
  1039                 print('DELETE entities', etype, [entity.eid for entity in entities])
       
  1040             self.hm.call_hooks('before_delete_entity', cnx, entities=entities)
       
  1041             self._delete_cascade_multi(cnx, entities)
       
  1042             source.delete_entities(cnx, entities)
       
  1043             source.delete_info_multi(cnx, entities)
       
  1044             self.hm.call_hooks('after_delete_entity', cnx, entities=entities)
       
  1045         # don't clear cache here, it is done in a hook on commit
       
  1046 
       
  1047     def glob_add_relation(self, cnx, subject, rtype, object):
       
  1048         """add a relation to the repository"""
       
  1049         self.glob_add_relations(cnx, {rtype: [(subject, object)]})
       
  1050 
       
  1051     def glob_add_relations(self, cnx, relations):
       
  1052         """add several relations to the repository
       
  1053 
       
  1054         relations is a dictionary rtype: [(subj_eid, obj_eid), ...]
       
  1055         """
       
  1056         source = self.system_source
       
  1057         relations_by_rtype = {}
       
  1058         subjects_by_types = {}
       
  1059         objects_by_types = {}
       
  1060         activintegrity = cnx.is_hook_category_activated('activeintegrity')
       
  1061         for rtype, eids_subj_obj in relations.items():
       
  1062             if server.DEBUG & server.DBG_REPO:
       
  1063                 for subjeid, objeid in eids_subj_obj:
       
  1064                     print('ADD relation', subjeid, rtype, objeid)
       
  1065             for subjeid, objeid in eids_subj_obj:
       
  1066                 if rtype in relations_by_rtype:
       
  1067                     relations_by_rtype[rtype].append((subjeid, objeid))
       
  1068                 else:
       
  1069                     relations_by_rtype[rtype] = [(subjeid, objeid)]
       
  1070                 if not activintegrity:
       
  1071                     continue
       
  1072                 # take care to relation of cardinality '?1', as all eids will
       
  1073                 # be inserted later, we've remove duplicated eids since they
       
  1074                 # won't be caught by `del_existing_rel_if_needed`
       
  1075                 rdef = cnx.rtype_eids_rdef(rtype, subjeid, objeid)
       
  1076                 card = rdef.cardinality
       
  1077                 if card[0] in '?1':
       
  1078                     with cnx.security_enabled(read=False):
       
  1079                         cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
       
  1080                                     'NOT Y eid %%(y)s' % rtype,
       
  1081                                     {'x': subjeid, 'y': objeid})
       
  1082                     subjects = subjects_by_types.setdefault(rdef, {})
       
  1083                     if subjeid in subjects:
       
  1084                         del relations_by_rtype[rtype][subjects[subjeid]]
       
  1085                         subjects[subjeid] = len(relations_by_rtype[rtype]) - 1
       
  1086                         continue
       
  1087                     subjects[subjeid] = len(relations_by_rtype[rtype]) - 1
       
  1088                 if card[1] in '?1':
       
  1089                     with cnx.security_enabled(read=False):
       
  1090                         cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
       
  1091                                     'NOT X eid %%(x)s' % rtype,
       
  1092                                     {'x': subjeid, 'y': objeid})
       
  1093                     objects = objects_by_types.setdefault(rdef, {})
       
  1094                     if objeid in objects:
       
  1095                         del relations_by_rtype[rtype][objects[objeid]]
       
  1096                         objects[objeid] = len(relations_by_rtype[rtype])
       
  1097                         continue
       
  1098                     objects[objeid] = len(relations_by_rtype[rtype])
       
  1099         for rtype, source_relations in relations_by_rtype.items():
       
  1100             self.hm.call_hooks('before_add_relation', cnx,
       
  1101                                rtype=rtype, eids_from_to=source_relations)
       
  1102         for rtype, source_relations in relations_by_rtype.items():
       
  1103             source.add_relations(cnx, rtype, source_relations)
       
  1104             rschema = self.schema.rschema(rtype)
       
  1105             for subjeid, objeid in source_relations:
       
  1106                 cnx.update_rel_cache_add(subjeid, rtype, objeid, rschema.symmetric)
       
  1107         for rtype, source_relations in relations_by_rtype.items():
       
  1108             self.hm.call_hooks('after_add_relation', cnx,
       
  1109                                rtype=rtype, eids_from_to=source_relations)
       
  1110 
       
  1111     def glob_delete_relation(self, cnx, subject, rtype, object):
       
  1112         """delete a relation from the repository"""
       
  1113         if server.DEBUG & server.DBG_REPO:
       
  1114             print('DELETE relation', subject, rtype, object)
       
  1115         source = self.system_source
       
  1116         self.hm.call_hooks('before_delete_relation', cnx,
       
  1117                            eidfrom=subject, rtype=rtype, eidto=object)
       
  1118         source.delete_relation(cnx, subject, rtype, object)
       
  1119         rschema = self.schema.rschema(rtype)
       
  1120         cnx.update_rel_cache_del(subject, rtype, object, rschema.symmetric)
       
  1121         self.hm.call_hooks('after_delete_relation', cnx,
       
  1122                            eidfrom=subject, rtype=rtype, eidto=object)
       
  1123 
       
  1124 
       
  1125 
       
  1126 
       
  1127     # these are overridden by set_log_methods below
       
  1128     # only defining here to prevent pylint from complaining
       
  1129     info = warning = error = critical = exception = debug = lambda msg, *a, **kw: None
       
  1130 
       
  1131 from logging import getLogger
       
  1132 from cubicweb import set_log_methods
       
  1133 set_log_methods(Repository, getLogger('cubicweb.repository'))