author Philippe Pepiot <>
Mon, 30 Mar 2020 15:46:12 +0200
changeset 12963 dd9e98b25213
parent 12962 fa0cd558d829
child 12947 27323f7200fc
permissions -rw-r--r--
[server] dynamically close idle database connections When pool hasn't been empty for `idle_timeout` time, start closing connections.

# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact --
# This file is part of CubicWeb.
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <>.
"""Defines the central class for the CubicWeb RQL server: the repository.

The repository is an abstraction allowing execution of rql queries against
data sources. Most of the work is actually done in helper classes. The
repository mainly:

* brings these classes all together to provide a single access
  point to a cubicweb instance.
* handles session management

from itertools import chain
from contextlib import contextmanager
from logging import getLogger
import queue
import threading
import time

from logilab.common.decorators import cached, clear_cache

from yams import BadSchemaDefinition
from rql.utils import rqlvar_maker

from cubicweb import (CW_MIGRATION_MAP,
                      UnknownEid, AuthenticationError, ExecutionError,
                      UniqueTogetherError, ViolatedConstraint)
from cubicweb import set_log_methods
from cubicweb import cwvreg, schema, server
from cubicweb.server import utils, hook, querier, sources
from cubicweb.server.session import InternalManager, Connection

    ('owned_by', 'object'),
    ('created_by', 'object'),
    ('cw_source', 'object'),

def prefill_entity_caches(entity):
    cnx = entity._cw
    # prefill entity relation caches
    for rschema in entity.e_schema.subject_relations():
        rtype = str(rschema)
        if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS:
            entity.cw_attr_cache.setdefault(rtype, None)
            entity.cw_set_relation_cache(rtype, 'subject',
    for rschema in entity.e_schema.object_relations():
        rtype = str(rschema)
        if rtype in schema.VIRTUAL_RTYPES or (rtype, 'object') in NO_CACHE_RELATIONS:
        entity.cw_set_relation_cache(rtype, 'object', cnx.empty_rset())

def del_existing_rel_if_needed(cnx, eidfrom, rtype, eidto):
    """delete existing relation when adding a new one if card is 1 or ?

    have to be done once the new relation has been inserted to avoid having
    an entity without a relation for some time

    this kind of behaviour has to be done in the repository so we don't have
    hooks order hazardness
    # skip that if integrity explicitly disabled
    if not cnx.is_hook_category_activated('activeintegrity'):
    rdef = cnx.rtype_eids_rdef(rtype, eidfrom, eidto)
    card = rdef.cardinality
    # one may be tented to check for neweids but this may cause more than one
    # relation even with '1?'  cardinality if thoses relations are added in the
    # same transaction where the entity is being created. This never occurs from
    # the web interface but may occurs during test or dbapi connection (though
    # not expected for this).  So: don't do it, we pretend to ensure repository
    # consistency.
    # notes:
    # * inlined relations will be implicitly deleted for the subject entity
    # * we don't want read permissions to be applied but we want delete
    #   permission to be checked
    if card[0] in '1?':
        with cnx.security_enabled(read=False):
            cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
                        'NOT Y eid %%(y)s' % rtype,
                        {'x': eidfrom, 'y': eidto})
    if card[1] in '1?':
        with cnx.security_enabled(read=False):
            cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
                        'NOT X eid %%(x)s' % rtype,
                        {'x': eidfrom, 'y': eidto})

def preprocess_inlined_relations(cnx, entity):
    """when an entity is added, check if it has some inlined relation which
    requires to be extrated for proper call hooks
    relations = []
    activeintegrity = cnx.is_hook_category_activated('activeintegrity')
    eschema = entity.e_schema
    for attr in entity.cw_edited:
        rschema = eschema.subjrels[attr]
        if not  # inlined relation
            value = entity.cw_edited[attr]
            relations.append((attr, value))
            cnx.update_rel_cache_add(entity.eid, attr, value)
            rdef = cnx.rtype_eids_rdef(attr, entity.eid, value)
            if rdef.cardinality[1] in '1?' and activeintegrity:
                with cnx.security_enabled(read=False):
                    cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr,
                                {'x': entity.eid, 'y': value})
    return relations

class NullEventBus(object):
    def publish(self, msg):

    def add_subscription(self, topic, callback):

    def start(self):

    def stop(self):

class _BaseCnxSet:

    def __init__(self, source):
        self._source = source

    def qsize(self):
        return None

    def get(self):
        return self._new_cnxset()

    def _new_cnxset(self):
        return self._source.wrapped_connection()

    def release(self, cnxset):

    def __iter__(self):

    def close(self):

class _CnxSetPool(_BaseCnxSet):

    def __init__(self, source, min_size=1, max_size=4, idle_timeout=300):
        self._cnxsets = []
        self._queue = queue.LifoQueue()
        self.lock = threading.Lock()
        self.min_size = min_size
        self.max_size = max_size
        self.idle = time.time()
        self.idle_timeout = idle_timeout

        for i in range(min_size):

    def _new_cnxset(self):
        cnxset = super()._new_cnxset()
        with self.lock:
        return cnxset

    def _close_idle_cnxset(self):
        # close connections not being used since idle_timeout
        if abs(time.time() - self.idle) > self.idle_timeout and self.size() > self.min_size:
                cnxset = self._queue.get_nowait()
            except queue.Empty:
                # the queue has been used since we checked it size
                with self.lock:

    def size(self):
        with self.lock:
            return len(self._cnxsets)

    def qsize(self):
        return self._queue.qsize()

    def get(self):
            cnxset = self._queue.get_nowait()
            return cnxset
        except queue.Empty:
            # reset idle time
            self.idle = time.time()
            if self.max_size and self.size() >= self.max_size:
                    return self._queue.get(True, timeout=5)
                except queue.Empty:
                    raise Exception('no connections set available after 5 secs, probably either a '
                                    'bug in code (too many uncommited/rolled back '
                                    'connections) or too much load on the server (in '
                                    'which case you can try to set a bigger '
                                    'connections pool size)')
            return self._new_cnxset()

    def release(self, cnxset):

    def __iter__(self):
        with self.lock:
            for cnxset in self._cnxsets:
                yield cnxset

    def close(self):
        while True:
                cnxset = self._queue.get_nowait()
            except queue.Empty:
            except Exception as e:
                self.exception('error while closing %s, error: %s' % (cnxset, e))

def get_cnxset(source, size):
    if not size:
        return _BaseCnxSet(source)
    return _CnxSetPool(source, min_size=1, max_size=size)

class Repository(object):
    """a repository provides access to a set of persistent storages for
    entities and relations

    def __init__(self, config, scheduler=None, vreg=None):
        self.config = config
        if vreg is None:
            vreg = cwvreg.CWRegistryStore(config)
        self.vreg = vreg
        self._scheduler = scheduler

        self.app_instances_bus = NullEventBus()

        # list of functions to be called at regular interval
        # list of running threads
        self._running_threads = []
        # initial schema, should be build or replaced latter
        self.schema = schema.CubicWebSchema(config.appid)
        self.vreg.schema = self.schema  # until actual schema is loaded...
        # shutdown flag
        self.shutting_down = None
        # sources (additional sources info in the system database)
        self.system_source = self.get_source('native', 'system',
        # querier helper, need to be created after sources initialization
        self.querier = querier.QuerierHelper(self, self.schema)
        # cache eid -> type
        self._type_cache = {}
        # the hooks manager = hook.HooksManager(self.vreg)

    def bootstrap(self):'starting repository from %s', self.config.apphome)
        self.shutting_down = False
        config = self.config
        # copy pool size here since config.init_cube() and config.load_schema()
        # reload configuration from file and could reset a manually set pool
        # size.
        if config['connections-pooler-enabled']:
            pool_size, min_pool_size = config['connections-pool-size'], 1
            pool_size = min_pool_size = None
        # 0. init a cnxset that will be used to fetch bootstrap information from
        #    the database
        self.cnxsets = get_cnxset(self.system_source, min_pool_size)
        # 1. set used cubes
        if config.creating or not config.read_instance_schema:
            self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
        # 2. load schema
        if config.quick_start:
            # quick start: only to get a minimal repository to get cubes
            # information (eg dump/restore/...)
            # restrict appobject_path to only load hooks and entity classes in
            # the registry
            config.cube_appobject_path = set(('hooks', 'entities'))
            config.cubicweb_appobject_path = set(('hooks', 'entities'))
            # limit connections pool size
            pool_size = min_pool_size
        if config.quick_start or config.creating or not config.read_instance_schema:
            # load schema from the file system
            if not config.creating:
      "set fs instance's schema")
            if not config.creating:
                # set eids on entities schema
                with self.internal_cnx() as cnx:
                    for etype, eid in cnx.execute('Any XN,X WHERE X is CWEType, X name XN'):
                            self.schema.eschema(etype).eid = eid
                        except KeyError:
                            # etype in the database doesn't exist in the fs schema, this may occur
                            # during dev and we shouldn't crash
                            self.warning('No %s entity type in the file system schema', etype)
            # normal start: load the instance schema from the database
  'loading schema from the repository')
        # 3. initialize data sources
        if config.creating:
            # call init_creating so that for instance native source can
            # configurate tsearch according to postgres version
            if 'CWProperty' in self.schema:
        # 4. close initialization connection set and reopen fresh ones for
        #    proper initialization
        self.cnxsets = get_cnxset(self.system_source, pool_size)
        # 5. call instance level initialisation hooks'server_startup', repo=self)

    def source_by_uri(self, uri):
        with self.internal_cnx() as cnx:
            rset = cnx.find('CWSource', name=uri)
            if not rset:
                raise ValueError('no source with uri %s found' % uri)
            return self._source_from_cwsource(

    def source_by_eid(self, eid):
        with self.internal_cnx() as cnx:
            rset = cnx.find('CWSource', eid=eid)
            if not rset:
                raise ValueError('no source with eid %d found' % eid)
            return self._source_from_cwsource(

    def sources_by_uri(self):
        mapping = {'system': self.system_source}
        mapping.update((, source)
                       for sourceent, source in self._sources())
        return mapping

    def _sources(self):
        if self.config.quick_start:
        with self.internal_cnx() as cnx:
            for sourceent in cnx.execute(
                    'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
                    'S name SN, S type SA, S config SC, S name != "system"').entities():
                source = self._source_from_cwsource(sourceent)
                yield sourceent, source

    def _source_from_cwsource(self, sourceent):
        source = self.get_source(sourceent.type,,
                                 sourceent.host_config, sourceent.eid)
        if self.config.source_enabled(source):
            # call source's init method to complete their initialisation if
            # needed (for instance looking for persistent configuration using an
            # internal session, which is not possible until connections sets have been
            # initialized)
        return source

    # internals ###############################################################

    def _init_system_source(self):
        if self.config.quick_start:
        with self.internal_cnx() as cnx:
            sourceent = cnx.execute(
                'Any S, SA, SC WHERE S is_instance_of CWSource,'
                ' S name "system", S type SA, S config SC'
            self.system_source.eid = sourceent.eid

    def get_source(self, type, uri, source_config, eid=None):
        # set uri and type in source config so it's available through
        # source_defs()
        source_config['uri'] = uri
        source_config['type'] = type
        return sources.get_source(type, source_config, self, eid)

    def set_schema(self, schema, resetvreg=True):'set schema %s %#x',, id(schema))
        if resetvreg:
            # trigger full reload of all appobjects
        self.schema = schema

    def deserialize_schema(self):
        """load schema from the database"""
        from cubicweb.server.schemaserial import deserialize_schema
        appschema = schema.CubicWebSchema(self.config.appid)
        self.debug('deserializing db schema into %s %#x',, id(appschema))
        with self.internal_cnx() as cnx:
                deserialize_schema(appschema, cnx)
            except BadSchemaDefinition:
            except Exception as ex:
                import traceback
                raise Exception('Is the database initialised ? (cause: %s)' % ex)
        return appschema

    def has_scheduler(self):
        """Return True if the repository has a scheduler attached and is able
        to register looping tasks.
        return self._scheduler is not None

    def run_scheduler(self):
        """Start repository scheduler after preparing the repository for that.

        * trigger server startup hook,
        * start the scheduler *and block*.

        XXX Other startup related stuffs are done elsewhere. In Repository
        XXX __init__ or in external codes (various server managers).
        assert self.has_scheduler(), \
            "This Repository is not intended to be used as a server"
            'starting repository scheduler with tasks: %s',
            ', '.join(e.action.__name__ for e in self._scheduler.queue))

    def looping_task(self, interval, func, *args):
        """register a function to be called every `interval` seconds.

        looping tasks can only be registered during repository initialization,
        once done this method will fail.
        if self.config.repairing:
        if not self.has_scheduler():
                'looping task %s will not run in this process where repository '
                'has no scheduler; use "cubicweb-ctl scheduler <appid>" to '
                'have it running', func)
        event = utils.schedule_periodic_task(
            self._scheduler, interval, func, *args)'scheduled periodic task %s (interval: %.2fs)',
                  event.action.__name__, interval)

    def threaded_task(self, func):
        """start function in a separated thread"""
        utils.RepoThread(func, self._running_threads).start()

    def shutdown(self):
        """called on server stop event to properly close opened sessions and
        assert not self.shutting_down, 'already shutting down'
        if not (self.config.creating or self.config.repairing
                or self.config.quick_start):
            # then, the system source is still available
  'before_server_shutdown', repo=self)
        self.shutting_down = True'shutting down repository')
        if not (self.config.creating or self.config.repairing
                or self.config.quick_start):
  'server_shutdown', repo=self)
        for thread in self._running_threads:
  'waiting thread %s...', thread.getName())
  'thread %s finished', thread.getName())
        hits, misses = self.querier.rql_cache.cache_hit, self.querier.rql_cache.cache_miss
  'rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
                      (hits * 100) / (hits + misses))
            hits, misses = self.system_source.cache_hit, self.system_source.cache_miss
  'sql cache hit/miss: %s/%s (%s%% hits)', hits, misses,
                      (hits * 100) / (hits + misses))
            nocache = self.system_source.no_cache
  'sql cache usage: %s/%s (%s%%)', hits + misses, nocache,
                      ((hits + misses) * 100) / (hits + misses + nocache))
        except ZeroDivisionError:

    def check_auth_info(self, cnx, login, authinfo):
        """validate authentication, raise AuthenticationError on failure, return
        associated CWUser's eid on success.
        # iter on sources_by_uri then check enabled source since sources doesn't
        # contain copy based sources
        for source in self.sources_by_uri.values():
            if self.config.source_enabled(source):
                    return source.authenticate(cnx, login, **authinfo)
                except (NotImplementedError, AuthenticationError):
            raise AuthenticationError('authentication failed with all sources')

    def authenticate_user(self, cnx, login, **authinfo):
        """validate login / password, raise AuthenticationError on failure
        return associated CWUser instance on success
        eid = self.check_auth_info(cnx, login, authinfo)
        cwuser = self._build_user(cnx, eid)
        if self.config.consider_user_state and \
           not cwuser.cw_adapt_to('IWorkflowable').state in cwuser.AUTHENTICABLE_STATES:
            raise AuthenticationError('user is not in authenticable state')
        return cwuser

    def _build_user(self, cnx, eid):
        """return a CWUser entity for user with the given eid"""
        cls = self.vreg['etypes'].etype_class('CWUser')
        st = cls.fetch_rqlst(cnx.user, ordermethod=None)
        st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
        rset = cnx.execute(st.as_string(), {'x': eid})
        assert len(rset) == 1, rset
        return rset.get_entity(0, 0)

    # public (dbapi) interface ################################################

    def get_schema(self):
        """Return the instance schema.

        This is a public method, not requiring a session id.
        return self.schema

    def get_cubes(self):
        """Return the list of cubes used by this instance.

        This is a public method, not requiring a session id.
        versions = self.get_versions(not (self.config.creating
                                          or self.config.repairing
                                          or self.config.quick_start
                                          or self.config.mode == 'test'))
        cubes = list(versions)
        return cubes

    def get_option_value(self, option):
        """Return the value for `option` in the configuration.

        This is a public method, not requiring a session id.
        # XXX we may want to check we don't give sensible information
        return self.config[option]

    def get_versions(self, checkversions=False):
        """Return the a dictionary containing cubes used by this instance
        as key with their version as value, including cubicweb version.

        This is a public method, not requiring a session id.
        from logilab.common.changelog import Version
        vcconf = {}
        with self.internal_cnx() as cnx:
            for pk, version in cnx.execute(
                    'Any K,V WHERE P is CWProperty, P value V, P pkey K, '
                    'P pkey ~="system.version.%"', build_descr=False):
                cube = pk.split('.')[-1]
                # XXX cubicweb migration
                if cube in CW_MIGRATION_MAP:
                    cube = CW_MIGRATION_MAP[cube]
                version = Version(version)
                vcconf[cube] = version
                if checkversions:
                    if cube != 'cubicweb':
                        fsversion = self.config.cube_version(cube)
                        fsversion = self.config.cubicweb_version()
                    if version < fsversion:
                        msg = ('instance has %s version %s but %s '
                               'is installed. Run "cubicweb-ctl upgrade %s".')
                        raise ExecutionError(msg % (cube, version, fsversion, self.config.appid))
        return vcconf

    def source_defs(self):
        """Return the a dictionary containing source uris as value and a
        dictionary describing each source as value.

        This is a public method, not requiring a session id.
        sources = {}
        # remove sensitive information
        for uri, source in self.sources_by_uri.items():
            sources[uri] = source.public_config
        return sources

    def _clear_source_defs_caches(self):
        clear_cache(self, 'source_defs')

    def properties(self):
        """Return a result set containing system wide properties.

        This is a public method, not requiring a session id.
        with self.internal_cnx() as cnx:
            # don't use cnx.execute, we don't want rset.req set
            return self.querier.execute(cnx, 'Any K,V WHERE P is CWProperty,'
                                        'P pkey K, P value V, NOT P for_user U',

    def find_users(self, fetch_attrs, **query_attrs):
        """yield user attributes for cwusers matching the given query_attrs
        (the result set cannot survive this method call)

        This can be used by low-privileges account (anonymous comes to

        `fetch_attrs`: tuple of attributes to be fetched
        `query_attrs`: dict of attr/values to restrict the query
        assert query_attrs
        if not hasattr(self, '_cwuser_attrs'):
            cwuser = self.schema['CWUser']
            self._cwuser_attrs = set(str(rschema)
                                     for rschema, _eschema in cwuser.attribute_definitions()
                                     if not rschema.meta)
        cwuserattrs = self._cwuser_attrs
        for k in chain(fetch_attrs, query_attrs):
            if k not in cwuserattrs:
                raise Exception('bad input for find_user')
        with self.internal_cnx() as cnx:
            varmaker = rqlvar_maker()
            vars = [(attr, next(varmaker)) for attr in fetch_attrs]
            rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars)
            rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ','
            rset = cnx.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr)
                                              for attr in query_attrs),
            return rset.rows

    # session handling ########################################################

    def internal_cnx(self):
        """Context manager returning a Connection using internal user which have
        every access rights on the repository.

        Internal connections have all hooks beside security enabled.
        with Connection(self, InternalManager()) as cnx:
            cnx.user._cw = cnx  # XXX remove when "vreg = user._cw.vreg" hack in is gone
            with cnx.security_enabled(read=False, write=False):
                yield cnx

    # data sources handling ###################################################
    # * correspondance between eid and type
    # * correspondance between eid and local id (i.e. specific to a given source)

    def clear_caches(self, eids=None):
        if eids is None:
            self._type_cache = {}
            etypes = None
            etypes = []
            etcache = self._type_cache
            for eid in eids:
                    etype = etcache.pop(int(eid))  # may be a string in some cases
                except KeyError:
                    etype = None
        self.querier.clear_caches(eids, etypes)
        self.system_source.clear_caches(eids, etypes)

    def type_from_eid(self, eid, cnx):
        """Return the type of the entity with id `eid`"""
            eid = int(eid)
        except ValueError:
            raise UnknownEid(eid)
            return self._type_cache[eid]
        except KeyError:
            etype = self.system_source.eid_type(cnx, eid)
            self._type_cache[eid] = etype
            return etype

    def add_info(self, cnx, entity, source):
        """add type and source info for an eid into the system table,
        and index the entity with the full text index
        # begin by inserting eid/type/source into the entities table
        self.system_source.add_info(cnx, entity, source)

    def _delete_cascade_multi(self, cnx, entities):
        """same as _delete_cascade but accepts a list of entities with
        the same etype and belonging to the same source.
        pendingrtypes = cnx.transaction_data.get('pendingrtypes', ())
        # delete remaining relations: if user can delete the entity, he can
        # delete all its relations without security checking
        with cnx.security_enabled(read=False, write=False):
            in_eids = ','.join([str(_e.eid) for _e in entities])
            with cnx.running_hooks_ops():
                for rschema, _, role in entities[0].e_schema.relation_definitions():
                    if rschema.rule:
                        continue  # computed relation
                    rtype = rschema.type
                    if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
                    if role == 'subject':
                        # don't skip inlined relation so they are regularly
                        # deleted and so hooks are correctly called
                        rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids)
                        rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids)
                    cnx.execute(rql, build_descr=False)

    def init_entity_caches(self, cnx, entity, source):
        """Add entity to connection entities cache and repo's cache."""
        self._type_cache[entity.eid] = entity.cw_etype

    def glob_add_entity(self, cnx, edited):
        """add an entity to the repository

        the entity eid should originally be None and a unique eid is assigned to
        the entity instance
        entity = edited.entity
        entity._cw_is_saved = False  # entity has an eid but is not yet saved
        # init edited_attributes before calling before_add_entity hooks
        entity.cw_edited = edited
        source = self.system_source
        # allocate an eid to the entity before calling hooks
        entity.eid = self.system_source.create_eid(cnx)
        # set caches asap
        self.init_entity_caches(cnx, entity, source)
        if server.DEBUG & server.DBG_REPO:
            print('ADD entity', self, entity.cw_etype, entity.eid, edited)
        prefill_entity_caches(entity)'before_add_entity', cnx, entity=entity)
        relations = preprocess_inlined_relations(cnx, entity)
        if cnx.is_hook_category_activated('integrity'):
        self.add_info(cnx, entity, source)
            source.add_entity(cnx, entity)
        except (UniqueTogetherError, ViolatedConstraint) as exc:
            userhdlr = cnx.vreg['adapters'].select(
                'IUserFriendlyError', cnx, entity=entity, exc=exc)
        edited.saved = entity._cw_is_saved = True
        # trigger after_add_entity after after_add_relation'after_add_entity', cnx, entity=entity)
        # call hooks for inlined relations
        for attr, value in relations:
  'before_add_relation', cnx,
                               eidfrom=entity.eid, rtype=attr, eidto=value)
  'after_add_relation', cnx,
                               eidfrom=entity.eid, rtype=attr, eidto=value)
        return entity.eid

    def glob_update_entity(self, cnx, edited):
        """replace an entity in the repository
        the type and the eid of an entity must not be changed
        entity = edited.entity
        if server.DEBUG & server.DBG_REPO:
            print('UPDATE entity', entity.cw_etype, entity.eid,
                  entity.cw_attr_cache, edited)
        hm =
        eschema = entity.e_schema
        orig_edited = getattr(entity, 'cw_edited', None)
        entity.cw_edited = edited
        source = self.system_source
            only_inline_rels, need_fti_update = True, False
            relations = []
            for attr in list(edited):
                if attr == 'eid':
                rschema = eschema.subjrels[attr]
                    if getattr(eschema.rdef(attr), 'fulltextindexed', False):
                        need_fti_update = True
                    only_inline_rels = False
                    # inlined relation
                    previous_value = entity.related(attr) or None
                    if previous_value is not None:
                        previous_value = previous_value[0][0]  # got a result set
                        if previous_value == entity.cw_attr_cache[attr]:
                            previous_value = None
                            hm.call_hooks('before_delete_relation', cnx,
                                          eidfrom=entity.eid, rtype=attr,
                    relations.append((attr, edited[attr], previous_value))
            # call hooks for inlined relations
            for attr, value, _t in relations:
                hm.call_hooks('before_add_relation', cnx,
                              eidfrom=entity.eid, rtype=attr, eidto=value)
            if not only_inline_rels:
                hm.call_hooks('before_update_entity', cnx, entity=entity)
            if cnx.is_hook_category_activated('integrity'):
                source.update_entity(cnx, entity)
                edited.saved = True
            except (UniqueTogetherError, ViolatedConstraint) as exc:
                userhdlr = cnx.vreg['adapters'].select(
                    'IUserFriendlyError', cnx, entity=entity, exc=exc)
            self.system_source.update_info(cnx, entity, need_fti_update)
            if not only_inline_rels:
                hm.call_hooks('after_update_entity', cnx, entity=entity)
            for attr, value, prevvalue in relations:
                # if the relation is already cached, update existant cache
                relcache = entity.cw_relation_cached(attr, 'subject')
                if prevvalue is not None:
                    hm.call_hooks('after_delete_relation', cnx,
                                  eidfrom=entity.eid, rtype=attr, eidto=prevvalue)
                    if relcache is not None:
                        cnx.update_rel_cache_del(entity.eid, attr, prevvalue)
                del_existing_rel_if_needed(cnx, entity.eid, attr, value)
                cnx.update_rel_cache_add(entity.eid, attr, value)
                hm.call_hooks('after_add_relation', cnx,
                              eidfrom=entity.eid, rtype=attr, eidto=value)
            if orig_edited is not None:
                entity.cw_edited = orig_edited

    def glob_delete_entities(self, cnx, eids):
        """delete a list of  entities and all related entities from the repository"""
        # mark eids as being deleted in cnx info and setup cache update
        # operation (register pending eids before actual deletion to avoid
        # multiple call to glob_delete_entities)
        op = hook.CleanupDeletedEidsCacheOp.get_instance(cnx)
        eids = eids - op._container
        op._container |= eids
        data_by_etype = {}  # values are [list of entities]
        # WARNING: the way this dictionary is populated is heavily optimized
        # and does not use setdefault on purpose. Unless a new release
        # of the Python interpreter advertises large perf improvements
        # in setdefault, this should not be changed without profiling.
        for eid in eids:
            etype = self.type_from_eid(eid, cnx)
            entity = cnx.entity_from_eid(eid, etype)
            except KeyError:
                data_by_etype[etype] = [entity]
        source = self.system_source
        for etype, entities in data_by_etype.items():
            if server.DEBUG & server.DBG_REPO:
                print('DELETE entities', etype, [e.eid for e in entities])
  'before_delete_entity', cnx, entities=entities)
            self._delete_cascade_multi(cnx, entities)
            source.delete_entities(cnx, entities)
            source.delete_info_multi(cnx, entities)
  'after_delete_entity', cnx, entities=entities)
        # don't clear cache here, it is done in a hook on commit

    def glob_add_relation(self, cnx, subject, rtype, object):
        """add a relation to the repository"""
        self.glob_add_relations(cnx, {rtype: [(subject, object)]})

    def glob_add_relations(self, cnx, relations):
        """add several relations to the repository

        relations is a dictionary rtype: [(subj_eid, obj_eid), ...]
        source = self.system_source
        relations_by_rtype = {}
        subjects_by_types = {}
        objects_by_types = {}
        activintegrity = cnx.is_hook_category_activated('activeintegrity')
        for rtype, eids_subj_obj in relations.items():
            if server.DEBUG & server.DBG_REPO:
                for subjeid, objeid in eids_subj_obj:
                    print('ADD relation', subjeid, rtype, objeid)
            for subjeid, objeid in eids_subj_obj:
                relations_by_rtype.setdefault(rtype, []).append((subjeid, objeid))
                if not activintegrity:
                # take care to relation of cardinality '?1', as all eids will
                # be inserted later, we've remove duplicated eids since they
                # won't be caught by `del_existing_rel_if_needed`
                rdef = cnx.rtype_eids_rdef(rtype, subjeid, objeid)
                card = rdef.cardinality
                if card[0] in '?1':
                    with cnx.security_enabled(read=False):
                        cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
                                    'NOT Y eid %%(y)s' % rtype,
                                    {'x': subjeid, 'y': objeid})
                    subjects = subjects_by_types.setdefault(rdef, {})
                    if subjeid in subjects:
                        del relations_by_rtype[rtype][subjects[subjeid]]
                        subjects[subjeid] = len(relations_by_rtype[rtype]) - 1
                    subjects[subjeid] = len(relations_by_rtype[rtype]) - 1
                if card[1] in '?1':
                    with cnx.security_enabled(read=False):
                        cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
                                    'NOT X eid %%(x)s' % rtype,
                                    {'x': subjeid, 'y': objeid})
                    objects = objects_by_types.setdefault(rdef, {})
                    if objeid in objects:
                        del relations_by_rtype[rtype][objects[objeid]]
                        objects[objeid] = len(relations_by_rtype[rtype])
                    objects[objeid] = len(relations_by_rtype[rtype])
        for rtype, source_relations in relations_by_rtype.items():
  'before_add_relation', cnx,
                               rtype=rtype, eids_from_to=source_relations)
        for rtype, source_relations in relations_by_rtype.items():
            source.add_relations(cnx, rtype, source_relations)
            rschema = self.schema.rschema(rtype)
            for subjeid, objeid in source_relations:
                cnx.update_rel_cache_add(subjeid, rtype, objeid, rschema.symmetric)
        for rtype, source_relations in relations_by_rtype.items():
  'after_add_relation', cnx,
                               rtype=rtype, eids_from_to=source_relations)

    def glob_delete_relation(self, cnx, subject, rtype, object):
        """delete a relation from the repository"""
        if server.DEBUG & server.DBG_REPO:
            print('DELETE relation', subject, rtype, object)
        source = self.system_source'before_delete_relation', cnx,
                           eidfrom=subject, rtype=rtype, eidto=object)
        source.delete_relation(cnx, subject, rtype, object)
        rschema = self.schema.rschema(rtype)
        cnx.update_rel_cache_del(subject, rtype, object, rschema.symmetric)'after_delete_relation', cnx,
                           eidfrom=subject, rtype=rtype, eidto=object)

    # these are overridden by set_log_methods below
    # only defining here to prevent pylint from complaining
    info = warning = error = critical = exception = debug = lambda msg, *a, **kw: None

set_log_methods(Repository, getLogger('cubicweb.repository'))