cubicweb/server/repository.py
author Laurent Peuch <cortex@worlddomination.be>
Thu, 05 Dec 2019 18:36:55 +0100
changeset 12804 6de3af773073
parent 12803 bf940d5416e5
child 12805 8cf02773a6b3
permissions -rw-r--r--
[refactoring] not needed anymore in python3

# copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
"""Defines the central class for the CubicWeb RQL server: the repository.

The repository is an abstraction allowing execution of rql queries against
data sources. Most of the work is actually done in helper classes. The
repository mainly:

* brings these classes all together to provide a single access
  point to a cubicweb instance.
* handles session management
"""

from itertools import chain
from contextlib import contextmanager
from logging import getLogger
import queue

from logilab.common.decorators import cached, clear_cache

from yams import BadSchemaDefinition
from rql.utils import rqlvar_maker

from cubicweb import (CW_MIGRATION_MAP,
                      UnknownEid, AuthenticationError, ExecutionError,
                      UniqueTogetherError, ViolatedConstraint)
from cubicweb import set_log_methods
from cubicweb import cwvreg, schema, server
from cubicweb.server import utils, hook, querier, sources
from cubicweb.server.session import InternalManager, Connection


NO_CACHE_RELATIONS = set([
    ('owned_by', 'object'),
    ('created_by', 'object'),
    ('cw_source', 'object'),
])


def prefill_entity_caches(entity):
    cnx = entity._cw
    # prefill entity relation caches
    for rschema in entity.e_schema.subject_relations():
        rtype = str(rschema)
        if rtype in schema.VIRTUAL_RTYPES or (rtype, 'subject') in NO_CACHE_RELATIONS:
            continue
        if rschema.final:
            entity.cw_attr_cache.setdefault(rtype, None)
        else:
            entity.cw_set_relation_cache(rtype, 'subject',
                                         cnx.empty_rset())
    for rschema in entity.e_schema.object_relations():
        rtype = str(rschema)
        if rtype in schema.VIRTUAL_RTYPES or (rtype, 'object') in NO_CACHE_RELATIONS:
            continue
        entity.cw_set_relation_cache(rtype, 'object', cnx.empty_rset())


def del_existing_rel_if_needed(cnx, eidfrom, rtype, eidto):
    """delete existing relation when adding a new one if card is 1 or ?

    have to be done once the new relation has been inserted to avoid having
    an entity without a relation for some time

    this kind of behaviour has to be done in the repository so we don't have
    hooks order hazardness
    """
    # skip that if integrity explicitly disabled
    if not cnx.is_hook_category_activated('activeintegrity'):
        return
    rdef = cnx.rtype_eids_rdef(rtype, eidfrom, eidto)
    card = rdef.cardinality
    # one may be tented to check for neweids but this may cause more than one
    # relation even with '1?'  cardinality if thoses relations are added in the
    # same transaction where the entity is being created. This never occurs from
    # the web interface but may occurs during test or dbapi connection (though
    # not expected for this).  So: don't do it, we pretend to ensure repository
    # consistency.
    #
    # notes:
    # * inlined relations will be implicitly deleted for the subject entity
    # * we don't want read permissions to be applied but we want delete
    #   permission to be checked
    if card[0] in '1?':
        with cnx.security_enabled(read=False):
            cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
                        'NOT Y eid %%(y)s' % rtype,
                        {'x': eidfrom, 'y': eidto})
    if card[1] in '1?':
        with cnx.security_enabled(read=False):
            cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
                        'NOT X eid %%(x)s' % rtype,
                        {'x': eidfrom, 'y': eidto})


def preprocess_inlined_relations(cnx, entity):
    """when an entity is added, check if it has some inlined relation which
    requires to be extrated for proper call hooks
    """
    relations = []
    activeintegrity = cnx.is_hook_category_activated('activeintegrity')
    eschema = entity.e_schema
    for attr in entity.cw_edited:
        rschema = eschema.subjrels[attr]
        if not rschema.final:  # inlined relation
            value = entity.cw_edited[attr]
            relations.append((attr, value))
            cnx.update_rel_cache_add(entity.eid, attr, value)
            rdef = cnx.rtype_eids_rdef(attr, entity.eid, value)
            if rdef.cardinality[1] in '1?' and activeintegrity:
                with cnx.security_enabled(read=False):
                    cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s' % attr,
                                {'x': entity.eid, 'y': value})
    return relations


class NullEventBus(object):
    def publish(self, msg):
        pass

    def add_subscription(self, topic, callback):
        pass

    def start(self):
        pass

    def stop(self):
        pass


class _CnxSetPool:

    def __init__(self, source, size):
        self._cnxsets = []
        if size is not None:
            self._queue = queue.Queue()
            for i in range(size):
                cnxset = source.wrapped_connection()
                self._cnxsets.append(cnxset)
                self._queue.put_nowait(cnxset)
        else:
            self._queue = None
            self._source = source

    def qsize(self):
        if self._queue is None:
            return None
        return self._queue.qsize()

    def get(self):
        if self._queue is None:
            return self._source.wrapped_connection()
        try:
            return self._queue.get(True, timeout=5)
        except queue.Empty:
            raise Exception('no connections set available after 5 secs, probably either a '
                            'bug in code (too many uncommited/rolled back '
                            'connections) or too much load on the server (in '
                            'which case you can try to set a bigger '
                            'connections pool size)')

    def release(self, cnxset):
        if self._queue is None:
            cnxset.close(True)
        else:
            self._queue.put_nowait(cnxset)

    def __iter__(self):
        for cnxset in self._cnxsets:
            yield cnxset

    def close(self):
        if self._queue is not None:
            while not self._queue.empty():
                cnxset = self._queue.get_nowait()
                try:
                    cnxset.close(True)
                except Exception:
                    self.exception('error while closing %s' % cnxset)


class Repository(object):
    """a repository provides access to a set of persistent storages for
    entities and relations
    """

    def __init__(self, config, scheduler=None, vreg=None):
        self.config = config
        if vreg is None:
            vreg = cwvreg.CWRegistryStore(config)
        self.vreg = vreg
        self._scheduler = scheduler

        self.app_instances_bus = NullEventBus()

        # list of functions to be called at regular interval
        # list of running threads
        self._running_threads = []
        # initial schema, should be build or replaced latter
        self.schema = schema.CubicWebSchema(config.appid)
        self.vreg.schema = self.schema  # until actual schema is loaded...
        # shutdown flag
        self.shutting_down = None
        # sources (additional sources info in the system database)
        self.system_source = self.get_source('native', 'system',
                                             config.system_source_config.copy())
        # querier helper, need to be created after sources initialization
        self.querier = querier.QuerierHelper(self, self.schema)
        # cache eid -> type
        self._type_cache = {}
        # the hooks manager
        self.hm = hook.HooksManager(self.vreg)

    def bootstrap(self):
        self.info('starting repository from %s', self.config.apphome)
        self.shutting_down = False
        config = self.config
        # copy pool size here since config.init_cube() and config.load_schema()
        # reload configuration from file and could reset a manually set pool
        # size.
        if config['connections-pooler-enabled']:
            pool_size, min_pool_size = config['connections-pool-size'], 1
        else:
            pool_size = min_pool_size = None
        # 0. init a cnxset that will be used to fetch bootstrap information from
        #    the database
        self.cnxsets = _CnxSetPool(self.system_source, min_pool_size)
        # 1. set used cubes
        if config.creating or not config.read_instance_schema:
            config.bootstrap_cubes()
        else:
            self.set_schema(self.config.load_bootstrap_schema(), resetvreg=False)
            config.init_cubes(self.get_cubes())
        # 2. load schema
        if config.quick_start:
            # quick start: only to get a minimal repository to get cubes
            # information (eg dump/restore/...)
            #
            # restrict appobject_path to only load hooks and entity classes in
            # the registry
            config.cube_appobject_path = set(('hooks', 'entities'))
            config.cubicweb_appobject_path = set(('hooks', 'entities'))
            # limit connections pool size
            pool_size = min_pool_size
        if config.quick_start or config.creating or not config.read_instance_schema:
            # load schema from the file system
            if not config.creating:
                self.info("set fs instance's schema")
            self.set_schema(config.load_schema(expand_cubes=True))
            if not config.creating:
                # set eids on entities schema
                with self.internal_cnx() as cnx:
                    for etype, eid in cnx.execute('Any XN,X WHERE X is CWEType, X name XN'):
                        try:
                            self.schema.eschema(etype).eid = eid
                        except KeyError:
                            # etype in the database doesn't exist in the fs schema, this may occur
                            # during dev and we shouldn't crash
                            self.warning('No %s entity type in the file system schema', etype)
        else:
            # normal start: load the instance schema from the database
            self.info('loading schema from the repository')
            self.set_schema(self.deserialize_schema())
        # 3. initialize data sources
        if config.creating:
            # call init_creating so that for instance native source can
            # configurate tsearch according to postgres version
            self.system_source.init_creating()
        else:
            self._init_system_source()
            if 'CWProperty' in self.schema:
                self.vreg.init_properties(self.properties())
        # 4. close initialization connection set and reopen fresh ones for
        #    proper initialization
        self.cnxsets.close()
        self.cnxsets = _CnxSetPool(self.system_source, pool_size)
        # 5. call instance level initialisation hooks
        self.hm.call_hooks('server_startup', repo=self)

    def source_by_uri(self, uri):
        with self.internal_cnx() as cnx:
            rset = cnx.find('CWSource', name=uri)
            if not rset:
                raise ValueError('no source with uri %s found' % uri)
            return self._source_from_cwsource(rset.one())

    def source_by_eid(self, eid):
        with self.internal_cnx() as cnx:
            rset = cnx.find('CWSource', eid=eid)
            if not rset:
                raise ValueError('no source with eid %d found' % eid)
            return self._source_from_cwsource(rset.one())

    @property
    def sources_by_uri(self):
        mapping = {'system': self.system_source}
        mapping.update((sourceent.name, source)
                       for sourceent, source in self._sources())
        return mapping

    def _sources(self):
        if self.config.quick_start:
            return
        with self.internal_cnx() as cnx:
            for sourceent in cnx.execute(
                    'Any S, SN, SA, SC WHERE S is_instance_of CWSource, '
                    'S name SN, S type SA, S config SC, S name != "system"').entities():
                source = self._source_from_cwsource(sourceent)
                yield sourceent, source
        self._clear_source_defs_caches()

    def _source_from_cwsource(self, sourceent):
        source = self.get_source(sourceent.type, sourceent.name,
                                 sourceent.host_config, sourceent.eid)
        if self.config.source_enabled(source):
            # call source's init method to complete their initialisation if
            # needed (for instance looking for persistent configuration using an
            # internal session, which is not possible until connections sets have been
            # initialized)
            source.init(sourceent)
        return source

    # internals ###############################################################

    def _init_system_source(self):
        if self.config.quick_start:
            self.system_source.init_creating()
            return
        with self.internal_cnx() as cnx:
            sourceent = cnx.execute(
                'Any S, SA, SC WHERE S is_instance_of CWSource,'
                ' S name "system", S type SA, S config SC'
            ).one()
            self.system_source.eid = sourceent.eid
            self.system_source.init(sourceent)

    def get_source(self, type, uri, source_config, eid=None):
        # set uri and type in source config so it's available through
        # source_defs()
        source_config['uri'] = uri
        source_config['type'] = type
        return sources.get_source(type, source_config, self, eid)

    def set_schema(self, schema, resetvreg=True):
        self.info('set schema %s %#x', schema.name, id(schema))
        if resetvreg:
            # trigger full reload of all appobjects
            self.vreg.set_schema(schema)
        else:
            self.vreg._set_schema(schema)
        self.querier.set_schema(schema)
        self.system_source.set_schema(schema)
        self.schema = schema

    def deserialize_schema(self):
        """load schema from the database"""
        from cubicweb.server.schemaserial import deserialize_schema
        appschema = schema.CubicWebSchema(self.config.appid)
        self.debug('deserializing db schema into %s %#x', appschema.name, id(appschema))
        with self.internal_cnx() as cnx:
            try:
                deserialize_schema(appschema, cnx)
            except BadSchemaDefinition:
                raise
            except Exception as ex:
                import traceback
                traceback.print_exc()
                raise Exception('Is the database initialised ? (cause: %s)' % ex)
        return appschema

    def has_scheduler(self):
        """Return True if the repository has a scheduler attached and is able
        to register looping tasks.
        """
        return self._scheduler is not None

    def run_scheduler(self):
        """Start repository scheduler after preparing the repository for that.

        * trigger server startup hook,
        * start the scheduler *and block*.

        XXX Other startup related stuffs are done elsewhere. In Repository
        XXX __init__ or in external codes (various server managers).
        """
        assert self.has_scheduler(), \
            "This Repository is not intended to be used as a server"
        self.info(
            'starting repository scheduler with tasks: %s',
            ', '.join(e.action.__name__ for e in self._scheduler.queue))
        self._scheduler.run()

    def looping_task(self, interval, func, *args):
        """register a function to be called every `interval` seconds.

        looping tasks can only be registered during repository initialization,
        once done this method will fail.
        """
        if self.config.repairing:
            return
        if not self.has_scheduler():
            self.warning(
                'looping task %s will not run in this process where repository '
                'has no scheduler; use "cubicweb-ctl scheduler <appid>" to '
                'have it running', func)
            return
        event = utils.schedule_periodic_task(
            self._scheduler, interval, func, *args)
        self.info('scheduled periodic task %s (interval: %.2fs)',
                  event.action.__name__, interval)

    def threaded_task(self, func):
        """start function in a separated thread"""
        utils.RepoThread(func, self._running_threads).start()

    def shutdown(self):
        """called on server stop event to properly close opened sessions and
        connections
        """
        assert not self.shutting_down, 'already shutting down'
        if not (self.config.creating or self.config.repairing
                or self.config.quick_start):
            # then, the system source is still available
            self.hm.call_hooks('before_server_shutdown', repo=self)
        self.shutting_down = True
        self.info('shutting down repository')
        self.system_source.shutdown()
        if not (self.config.creating or self.config.repairing
                or self.config.quick_start):
            self.hm.call_hooks('server_shutdown', repo=self)
        for thread in self._running_threads:
            self.info('waiting thread %s...', thread.getName())
            thread.join()
            self.info('thread %s finished', thread.getName())
        self.cnxsets.close()
        hits, misses = self.querier.rql_cache.cache_hit, self.querier.rql_cache.cache_miss
        try:
            self.info('rql st cache hit/miss: %s/%s (%s%% hits)', hits, misses,
                      (hits * 100) / (hits + misses))
            hits, misses = self.system_source.cache_hit, self.system_source.cache_miss
            self.info('sql cache hit/miss: %s/%s (%s%% hits)', hits, misses,
                      (hits * 100) / (hits + misses))
            nocache = self.system_source.no_cache
            self.info('sql cache usage: %s/%s (%s%%)', hits + misses, nocache,
                      ((hits + misses) * 100) / (hits + misses + nocache))
        except ZeroDivisionError:
            pass

    def check_auth_info(self, cnx, login, authinfo):
        """validate authentication, raise AuthenticationError on failure, return
        associated CWUser's eid on success.
        """
        # iter on sources_by_uri then check enabled source since sources doesn't
        # contain copy based sources
        for source in self.sources_by_uri.values():
            if self.config.source_enabled(source):
                try:
                    return source.authenticate(cnx, login, **authinfo)
                except (NotImplementedError, AuthenticationError):
                    continue
        else:
            raise AuthenticationError('authentication failed with all sources')

    def authenticate_user(self, cnx, login, **authinfo):
        """validate login / password, raise AuthenticationError on failure
        return associated CWUser instance on success
        """
        eid = self.check_auth_info(cnx, login, authinfo)
        cwuser = self._build_user(cnx, eid)
        if self.config.consider_user_state and \
           not cwuser.cw_adapt_to('IWorkflowable').state in cwuser.AUTHENTICABLE_STATES:
            raise AuthenticationError('user is not in authenticable state')
        return cwuser

    def _build_user(self, cnx, eid):
        """return a CWUser entity for user with the given eid"""
        cls = self.vreg['etypes'].etype_class('CWUser')
        st = cls.fetch_rqlst(cnx.user, ordermethod=None)
        st.add_eid_restriction(st.get_variable('X'), 'x', 'Substitute')
        rset = cnx.execute(st.as_string(), {'x': eid})
        assert len(rset) == 1, rset
        return rset.get_entity(0, 0)

    # public (dbapi) interface ################################################

    def get_schema(self):
        """Return the instance schema.

        This is a public method, not requiring a session id.
        """
        return self.schema

    def get_cubes(self):
        """Return the list of cubes used by this instance.

        This is a public method, not requiring a session id.
        """
        versions = self.get_versions(not (self.config.creating
                                          or self.config.repairing
                                          or self.config.quick_start
                                          or self.config.mode == 'test'))
        cubes = list(versions)
        cubes.remove('cubicweb')
        return cubes

    def get_option_value(self, option):
        """Return the value for `option` in the configuration.

        This is a public method, not requiring a session id.
        """
        # XXX we may want to check we don't give sensible information
        return self.config[option]

    @cached
    def get_versions(self, checkversions=False):
        """Return the a dictionary containing cubes used by this instance
        as key with their version as value, including cubicweb version.

        This is a public method, not requiring a session id.
        """
        from logilab.common.changelog import Version
        vcconf = {}
        with self.internal_cnx() as cnx:
            for pk, version in cnx.execute(
                    'Any K,V WHERE P is CWProperty, P value V, P pkey K, '
                    'P pkey ~="system.version.%"', build_descr=False):
                cube = pk.split('.')[-1]
                # XXX cubicweb migration
                if cube in CW_MIGRATION_MAP:
                    cube = CW_MIGRATION_MAP[cube]
                version = Version(version)
                vcconf[cube] = version
                if checkversions:
                    if cube != 'cubicweb':
                        fsversion = self.config.cube_version(cube)
                    else:
                        fsversion = self.config.cubicweb_version()
                    if version < fsversion:
                        msg = ('instance has %s version %s but %s '
                               'is installed. Run "cubicweb-ctl upgrade %s".')
                        raise ExecutionError(msg % (cube, version, fsversion, self.config.appid))
        return vcconf

    @cached
    def source_defs(self):
        """Return the a dictionary containing source uris as value and a
        dictionary describing each source as value.

        This is a public method, not requiring a session id.
        """
        sources = {}
        # remove sensitive information
        for uri, source in self.sources_by_uri.items():
            sources[uri] = source.public_config
        return sources

    def _clear_source_defs_caches(self):
        clear_cache(self, 'source_defs')

    def properties(self):
        """Return a result set containing system wide properties.

        This is a public method, not requiring a session id.
        """
        with self.internal_cnx() as cnx:
            # don't use cnx.execute, we don't want rset.req set
            return self.querier.execute(cnx, 'Any K,V WHERE P is CWProperty,'
                                        'P pkey K, P value V, NOT P for_user U',
                                        build_descr=False)

    def find_users(self, fetch_attrs, **query_attrs):
        """yield user attributes for cwusers matching the given query_attrs
        (the result set cannot survive this method call)

        This can be used by low-privileges account (anonymous comes to
        mind).

        `fetch_attrs`: tuple of attributes to be fetched
        `query_attrs`: dict of attr/values to restrict the query
        """
        assert query_attrs
        if not hasattr(self, '_cwuser_attrs'):
            cwuser = self.schema['CWUser']
            self._cwuser_attrs = set(str(rschema)
                                     for rschema, _eschema in cwuser.attribute_definitions()
                                     if not rschema.meta)
        cwuserattrs = self._cwuser_attrs
        for k in chain(fetch_attrs, query_attrs):
            if k not in cwuserattrs:
                raise Exception('bad input for find_user')
        with self.internal_cnx() as cnx:
            varmaker = rqlvar_maker()
            vars = [(attr, next(varmaker)) for attr in fetch_attrs]
            rql = 'Any %s WHERE X is CWUser, ' % ','.join(var[1] for var in vars)
            rql += ','.join('X %s %s' % (var[0], var[1]) for var in vars) + ','
            rset = cnx.execute(rql + ','.join('X %s %%(%s)s' % (attr, attr)
                                              for attr in query_attrs),
                               query_attrs)
            return rset.rows

    # session handling ########################################################

    @contextmanager
    def internal_cnx(self):
        """Context manager returning a Connection using internal user which have
        every access rights on the repository.

        Internal connections have all hooks beside security enabled.
        """
        with Connection(self, InternalManager()) as cnx:
            cnx.user._cw = cnx  # XXX remove when "vreg = user._cw.vreg" hack in entity.py is gone
            with cnx.security_enabled(read=False, write=False):
                yield cnx

    # data sources handling ###################################################
    # * correspondance between eid and type
    # * correspondance between eid and local id (i.e. specific to a given source)

    def clear_caches(self, eids=None):
        if eids is None:
            self._type_cache = {}
            etypes = None
        else:
            etypes = []
            etcache = self._type_cache
            for eid in eids:
                try:
                    etype = etcache.pop(int(eid))  # may be a string in some cases
                except KeyError:
                    etype = None
                etypes.append(etype)
        self.querier.clear_caches(eids, etypes)
        self.system_source.clear_caches(eids, etypes)

    def type_from_eid(self, eid, cnx):
        """Return the type of the entity with id `eid`"""
        try:
            eid = int(eid)
        except ValueError:
            raise UnknownEid(eid)
        try:
            return self._type_cache[eid]
        except KeyError:
            etype = self.system_source.eid_type(cnx, eid)
            self._type_cache[eid] = etype
            return etype

    def add_info(self, cnx, entity, source):
        """add type and source info for an eid into the system table,
        and index the entity with the full text index
        """
        # begin by inserting eid/type/source into the entities table
        hook.CleanupNewEidsCacheOp.get_instance(cnx).add_data(entity.eid)
        self.system_source.add_info(cnx, entity, source)

    def _delete_cascade_multi(self, cnx, entities):
        """same as _delete_cascade but accepts a list of entities with
        the same etype and belonging to the same source.
        """
        pendingrtypes = cnx.transaction_data.get('pendingrtypes', ())
        # delete remaining relations: if user can delete the entity, he can
        # delete all its relations without security checking
        with cnx.security_enabled(read=False, write=False):
            in_eids = ','.join([str(_e.eid) for _e in entities])
            with cnx.running_hooks_ops():
                for rschema, _, role in entities[0].e_schema.relation_definitions():
                    if rschema.rule:
                        continue  # computed relation
                    rtype = rschema.type
                    if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
                        continue
                    if role == 'subject':
                        # don't skip inlined relation so they are regularly
                        # deleted and so hooks are correctly called
                        rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids)
                    else:
                        rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids)
                    cnx.execute(rql, build_descr=False)

    def init_entity_caches(self, cnx, entity, source):
        """Add entity to connection entities cache and repo's cache."""
        cnx.set_entity_cache(entity)
        self._type_cache[entity.eid] = entity.cw_etype

    def glob_add_entity(self, cnx, edited):
        """add an entity to the repository

        the entity eid should originally be None and a unique eid is assigned to
        the entity instance
        """
        entity = edited.entity
        entity._cw_is_saved = False  # entity has an eid but is not yet saved
        # init edited_attributes before calling before_add_entity hooks
        entity.cw_edited = edited
        source = self.system_source
        # allocate an eid to the entity before calling hooks
        entity.eid = self.system_source.create_eid(cnx)
        # set caches asap
        self.init_entity_caches(cnx, entity, source)
        if server.DEBUG & server.DBG_REPO:
            print('ADD entity', self, entity.cw_etype, entity.eid, edited)
        prefill_entity_caches(entity)
        self.hm.call_hooks('before_add_entity', cnx, entity=entity)
        relations = preprocess_inlined_relations(cnx, entity)
        edited.set_defaults()
        if cnx.is_hook_category_activated('integrity'):
            edited.check(creation=True)
        self.add_info(cnx, entity, source)
        try:
            source.add_entity(cnx, entity)
        except (UniqueTogetherError, ViolatedConstraint) as exc:
            userhdlr = cnx.vreg['adapters'].select(
                'IUserFriendlyError', cnx, entity=entity, exc=exc)
            userhdlr.raise_user_exception()
        edited.saved = entity._cw_is_saved = True
        # trigger after_add_entity after after_add_relation
        self.hm.call_hooks('after_add_entity', cnx, entity=entity)
        # call hooks for inlined relations
        for attr, value in relations:
            self.hm.call_hooks('before_add_relation', cnx,
                               eidfrom=entity.eid, rtype=attr, eidto=value)
            self.hm.call_hooks('after_add_relation', cnx,
                               eidfrom=entity.eid, rtype=attr, eidto=value)
        return entity.eid

    def glob_update_entity(self, cnx, edited):
        """replace an entity in the repository
        the type and the eid of an entity must not be changed
        """
        entity = edited.entity
        if server.DEBUG & server.DBG_REPO:
            print('UPDATE entity', entity.cw_etype, entity.eid,
                  entity.cw_attr_cache, edited)
        hm = self.hm
        eschema = entity.e_schema
        cnx.set_entity_cache(entity)
        orig_edited = getattr(entity, 'cw_edited', None)
        entity.cw_edited = edited
        source = self.system_source
        try:
            only_inline_rels, need_fti_update = True, False
            relations = []
            for attr in list(edited):
                if attr == 'eid':
                    continue
                rschema = eschema.subjrels[attr]
                if rschema.final:
                    if getattr(eschema.rdef(attr), 'fulltextindexed', False):
                        need_fti_update = True
                    only_inline_rels = False
                else:
                    # inlined relation
                    previous_value = entity.related(attr) or None
                    if previous_value is not None:
                        previous_value = previous_value[0][0]  # got a result set
                        if previous_value == entity.cw_attr_cache[attr]:
                            previous_value = None
                        else:
                            hm.call_hooks('before_delete_relation', cnx,
                                          eidfrom=entity.eid, rtype=attr,
                                          eidto=previous_value)
                    relations.append((attr, edited[attr], previous_value))
            # call hooks for inlined relations
            for attr, value, _t in relations:
                hm.call_hooks('before_add_relation', cnx,
                              eidfrom=entity.eid, rtype=attr, eidto=value)
            if not only_inline_rels:
                hm.call_hooks('before_update_entity', cnx, entity=entity)
            if cnx.is_hook_category_activated('integrity'):
                edited.check()
            try:
                source.update_entity(cnx, entity)
                edited.saved = True
            except (UniqueTogetherError, ViolatedConstraint) as exc:
                userhdlr = cnx.vreg['adapters'].select(
                    'IUserFriendlyError', cnx, entity=entity, exc=exc)
                userhdlr.raise_user_exception()
            self.system_source.update_info(cnx, entity, need_fti_update)
            if not only_inline_rels:
                hm.call_hooks('after_update_entity', cnx, entity=entity)
            for attr, value, prevvalue in relations:
                # if the relation is already cached, update existant cache
                relcache = entity.cw_relation_cached(attr, 'subject')
                if prevvalue is not None:
                    hm.call_hooks('after_delete_relation', cnx,
                                  eidfrom=entity.eid, rtype=attr, eidto=prevvalue)
                    if relcache is not None:
                        cnx.update_rel_cache_del(entity.eid, attr, prevvalue)
                del_existing_rel_if_needed(cnx, entity.eid, attr, value)
                cnx.update_rel_cache_add(entity.eid, attr, value)
                hm.call_hooks('after_add_relation', cnx,
                              eidfrom=entity.eid, rtype=attr, eidto=value)
        finally:
            if orig_edited is not None:
                entity.cw_edited = orig_edited

    def glob_delete_entities(self, cnx, eids):
        """delete a list of  entities and all related entities from the repository"""
        # mark eids as being deleted in cnx info and setup cache update
        # operation (register pending eids before actual deletion to avoid
        # multiple call to glob_delete_entities)
        op = hook.CleanupDeletedEidsCacheOp.get_instance(cnx)
        eids = eids - op._container
        op._container |= eids
        data_by_etype = {}  # values are [list of entities]
        #
        # WARNING: the way this dictionary is populated is heavily optimized
        # and does not use setdefault on purpose. Unless a new release
        # of the Python interpreter advertises large perf improvements
        # in setdefault, this should not be changed without profiling.
        for eid in eids:
            etype = self.type_from_eid(eid, cnx)
            entity = cnx.entity_from_eid(eid, etype)
            try:
                data_by_etype[etype].append(entity)
            except KeyError:
                data_by_etype[etype] = [entity]
        source = self.system_source
        for etype, entities in data_by_etype.items():
            if server.DEBUG & server.DBG_REPO:
                print('DELETE entities', etype, [e.eid for e in entities])
            self.hm.call_hooks('before_delete_entity', cnx, entities=entities)
            self._delete_cascade_multi(cnx, entities)
            source.delete_entities(cnx, entities)
            source.delete_info_multi(cnx, entities)
            self.hm.call_hooks('after_delete_entity', cnx, entities=entities)
        # don't clear cache here, it is done in a hook on commit

    def glob_add_relation(self, cnx, subject, rtype, object):
        """add a relation to the repository"""
        self.glob_add_relations(cnx, {rtype: [(subject, object)]})

    def glob_add_relations(self, cnx, relations):
        """add several relations to the repository

        relations is a dictionary rtype: [(subj_eid, obj_eid), ...]
        """
        source = self.system_source
        relations_by_rtype = {}
        subjects_by_types = {}
        objects_by_types = {}
        activintegrity = cnx.is_hook_category_activated('activeintegrity')
        for rtype, eids_subj_obj in relations.items():
            if server.DEBUG & server.DBG_REPO:
                for subjeid, objeid in eids_subj_obj:
                    print('ADD relation', subjeid, rtype, objeid)
            for subjeid, objeid in eids_subj_obj:
                relations_by_rtype.setdefault(rtype, []).append((subjeid, objeid))
                if not activintegrity:
                    continue
                # take care to relation of cardinality '?1', as all eids will
                # be inserted later, we've remove duplicated eids since they
                # won't be caught by `del_existing_rel_if_needed`
                rdef = cnx.rtype_eids_rdef(rtype, subjeid, objeid)
                card = rdef.cardinality
                if card[0] in '?1':
                    with cnx.security_enabled(read=False):
                        cnx.execute('DELETE X %s Y WHERE X eid %%(x)s, '
                                    'NOT Y eid %%(y)s' % rtype,
                                    {'x': subjeid, 'y': objeid})
                    subjects = subjects_by_types.setdefault(rdef, {})
                    if subjeid in subjects:
                        del relations_by_rtype[rtype][subjects[subjeid]]
                        subjects[subjeid] = len(relations_by_rtype[rtype]) - 1
                        continue
                    subjects[subjeid] = len(relations_by_rtype[rtype]) - 1
                if card[1] in '?1':
                    with cnx.security_enabled(read=False):
                        cnx.execute('DELETE X %s Y WHERE Y eid %%(y)s, '
                                    'NOT X eid %%(x)s' % rtype,
                                    {'x': subjeid, 'y': objeid})
                    objects = objects_by_types.setdefault(rdef, {})
                    if objeid in objects:
                        del relations_by_rtype[rtype][objects[objeid]]
                        objects[objeid] = len(relations_by_rtype[rtype])
                        continue
                    objects[objeid] = len(relations_by_rtype[rtype])
        for rtype, source_relations in relations_by_rtype.items():
            self.hm.call_hooks('before_add_relation', cnx,
                               rtype=rtype, eids_from_to=source_relations)
        for rtype, source_relations in relations_by_rtype.items():
            source.add_relations(cnx, rtype, source_relations)
            rschema = self.schema.rschema(rtype)
            for subjeid, objeid in source_relations:
                cnx.update_rel_cache_add(subjeid, rtype, objeid, rschema.symmetric)
        for rtype, source_relations in relations_by_rtype.items():
            self.hm.call_hooks('after_add_relation', cnx,
                               rtype=rtype, eids_from_to=source_relations)

    def glob_delete_relation(self, cnx, subject, rtype, object):
        """delete a relation from the repository"""
        if server.DEBUG & server.DBG_REPO:
            print('DELETE relation', subject, rtype, object)
        source = self.system_source
        self.hm.call_hooks('before_delete_relation', cnx,
                           eidfrom=subject, rtype=rtype, eidto=object)
        source.delete_relation(cnx, subject, rtype, object)
        rschema = self.schema.rschema(rtype)
        cnx.update_rel_cache_del(subject, rtype, object, rschema.symmetric)
        self.hm.call_hooks('after_delete_relation', cnx,
                           eidfrom=subject, rtype=rtype, eidto=object)

    # these are overridden by set_log_methods below
    # only defining here to prevent pylint from complaining
    info = warning = error = critical = exception = debug = lambda msg, *a, **kw: None


set_log_methods(Repository, getLogger('cubicweb.repository'))