authorAlexandre Fayolle <>
Tue, 25 Jan 2011 12:09:59 +0100 (2011-01-25)
improve massive deletion performance change hooks.integrity._DelayedDeleteOp implementation to give it a chance of processing the entities by chunks of reasonnable size (500 entities at a time) adapt ssplanner.DeleteEntitiesStep to call a variant of glob_delete_entity with several entities. That variant calls all the before_delete_entities hooks in one go, then performs the deletion, and then calls all the after_delete_entities hooks. The deletion is performed by grouping together entities by etype and by source. adapt the HooksManager to call the hooks on a list of entities instead of on a single entity. adapt the sources to be able to delete several entities of the same etype at once. changed the source fti_(un)index_entity methods to fti_(un)index_entities taking a collection of entities.
--- a/hooks/	Tue Jan 25 10:01:19 2011 +0100
+++ b/hooks/	Tue Jan 25 12:09:59 2011 +0100
@@ -110,15 +110,32 @@
     category = 'integrity'
-class CheckCardinalityHook(IntegrityHook):
+class CheckCardinalityHookBeforeDeleteRelation(IntegrityHook):
     """check cardinalities are satisfied"""
-    __regid__ = 'checkcard'
-    events = ('after_add_entity', 'before_delete_relation')
+    __regid__ = 'checkcard_before_delete_relation'
+    events = ('before_delete_relation',)
     def __call__(self):
-        getattr(self, self.event)()
+        rtype = self.rtype
+        if rtype in DONT_CHECK_RTYPES_ON_DEL:
+            return
+        session = self._cw
+        eidfrom, eidto = self.eidfrom, self.eidto
+        pendingrdefs = session.transaction_data.get('pendingrdefs', ())
+        if (session.describe(eidfrom)[0], rtype, session.describe(eidto)[0]) in pendingrdefs:
+            return
+        card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality')
+        if card[0] in '1+' and not session.deleted_in_transaction(eidfrom):
+            _CheckSRelationOp.get_instance(self._cw).add_data((eidfrom, rtype))
+        if card[1] in '1+' and not session.deleted_in_transaction(eidto):
+            _CheckORelationOp.get_instance(self._cw).add_data((eidto, rtype))
-    def after_add_entity(self):
+class CheckCardinalityHookAfterAddEntity(IntegrityHook):
+    """check cardinalities are satisfied"""
+    __regid__ = 'checkcard_after_add_entity'
+    events = ('after_add_entity',)
+    def __call__(self):
         eid = self.entity.eid
         eschema = self.entity.e_schema
         for rschema, targetschemas, role in eschema.relation_definitions():
@@ -299,19 +316,33 @@
         session = self.session
         pendingeids = session.transaction_data.get('pendingeids', ())
         neweids = session.transaction_data.get('neweids', ())
+        eids_by_etype_rtype = {}
         for eid, rtype in self.get_data():
             # don't do anything if the entity is being created or deleted
             if not (eid in pendingeids or eid in neweids):
                 etype = session.describe(eid)[0]
-                session.execute(self.base_rql % (etype, rtype), {'x': eid})
+                key = (etype, rtype)
+                if key not in eids_by_etype_rtype:
+                    eids_by_etype_rtype[key] = [str(eid)]
+                else:
+                    eids_by_etype_rtype[key].append(str(eid))
+        for (etype, rtype), eids in eids_by_etype_rtype.iteritems():
+            # quite unexpectedly, not deleting too many entities at a time in
+            # this operation benefits to the exec speed (possibly on the RQL
+            # parsing side)
+            start = 0
+            incr = 500
+            while start < len(eids):
+                session.execute(self.base_rql % (etype, ','.join(eids[start:start+incr]), rtype))
+                start += incr
 class _DelayedDeleteSEntityOp(_DelayedDeleteOp):
     """delete orphan subject entity of a composite relation"""
-    base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT X %s Y'
+    base_rql = 'DELETE %s X WHERE X eid IN (%s), NOT X %s Y'
 class _DelayedDeleteOEntityOp(_DelayedDeleteOp):
     """check required object relation"""
-    base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT Y %s X'
+    base_rql = 'DELETE %s X WHERE X eid IN (%s), NOT Y %s X'
 class DeleteCompositeOrphanHook(hook.Hook):
--- a/hooks/	Tue Jan 25 10:01:19 2011 +0100
+++ b/hooks/	Tue Jan 25 12:09:59 2011 +0100
@@ -1212,11 +1212,11 @@
                       len(rset), etype)
             still_fti = list(schema[etype].indexable_attributes())
             for entity in rset.entities():
-                source.fti_unindex_entity(session, entity.eid)
+                source.fti_unindex_entities(session, [entity])
                 for container in entity.cw_adapt_to('IFTIndexable').fti_containers():
                     if still_fti or container is not entity:
-                        source.fti_unindex_entity(session, container.eid)
-                        source.fti_index_entity(session, container)
+                        source.fti_unindex_entities(session, [container])
+                        source.fti_index_entities(session, [container])
         if to_reindex:
             # Transaction has already been committed
--- a/server/	Tue Jan 25 10:01:19 2011 +0100
+++ b/server/	Tue Jan 25 12:09:59 2011 +0100
@@ -131,8 +131,8 @@
     # attribute to their current value
     source = repo.system_source
     for eschema in etypes:
-        for entity in session.execute('Any X WHERE X is %s' % eschema).entities():
-            source.fti_index_entity(session, entity)
+        rset = session.execute('Any X WHERE X is %s' % eschema)
+        source.fti_index_entities(session, rset.entities())
         if withpb:
--- a/server/	Tue Jan 25 10:01:19 2011 +0100
+++ b/server/	Tue Jan 25 12:09:59 2011 +0100
@@ -274,6 +274,14 @@
                     'session_open', 'session_close'))
+def _iter_kwargs(entities, kwargs):
+    if not entities:
+        yield kwargs
+    else:
+        for entity in entities:
+            kwargs['entity'] = entity
+            yield kwargs
 class HooksRegistry(CWRegistry):
     def initialization_completed(self):
@@ -288,20 +296,30 @@
         super(HooksRegistry, self).register(obj, **kwargs)
     def call_hooks(self, event, session=None, **kwargs):
+        """call `event` hooks for an entity or a list of entities (passed
+        respectively as the `entity` or ``entities`` keyword argument).
+        """
         kwargs['event'] = event
-        if session is None:
+        if session is None: # True for events such as server_start
             for hook in sorted(self.possible_objects(session, **kwargs),
                                key=lambda x: x.order):
+            if 'entities' in kwargs:
+                assert 'entity' not in kwargs, \
+                       'can\'t pass "entities" and "entity" arguments simultaneously'
+                entities = kwargs.pop('entities')
+            else:
+                entities = []
             # by default, hooks are executed with security turned off
             with security_enabled(session, read=False):
-                hooks = sorted(self.possible_objects(session, **kwargs),
-                               key=lambda x: x.order)
-                with security_enabled(session, write=False):
-                    for hook in hooks:
-                        #print hook.category, hook.__regid__
-                        hook()
+                for _kwargs in _iter_kwargs(entities, kwargs):
+                    hooks = sorted(self.possible_objects(session, **_kwargs),
+                                   key=lambda x: x.order)
+                    with security_enabled(session, write=False):
+                        for hook in hooks:
+                            #print hook.category, hook.__regid__
+                            hook()
 class HooksManager(object):
     def __init__(self, vreg):
--- a/server/	Tue Jan 25 10:01:19 2011 +0100
+++ b/server/	Tue Jan 25 12:09:59 2011 +0100
@@ -1097,6 +1097,17 @@
         self._delete_info(session, entity, sourceuri, extid, scleanup)
+    def delete_info_multi(self, session, entities, sourceuri, extids, scleanup=False):
+        """same as delete_info but accepts a list of entities and
+        extids with the same etype and belonging to the same source
+        """
+        # mark eid as being deleted in session info and setup cache update
+        # operation
+        op = hook.CleanupDeletedEidsCacheOp.get_instance(session)
+        for entity in entities:
+            op.add_data(entity.eid)
+        self._delete_info_multi(session, entities, sourceuri, extids, scleanup)
     def _delete_info(self, session, entity, sourceuri, extid, scleanup=False):
         """delete system information on deletion of an entity:
         * delete all remaining relations from/to this entity
@@ -1129,6 +1140,38 @@
                                    'from %s. RQL: %s', entity, sourceuri, rql)
         self.system_source.delete_info(session, entity, sourceuri, extid)
+    def _delete_info_multi(self, session, entities, sourceuri, extids, scleanup=False):
+        """same as _delete_info but accepts a list of entities with
+        the same etype and belinging to the same source.
+        """
+        pendingrtypes = session.transaction_data.get('pendingrtypes', ())
+        # delete remaining relations: if user can delete the entity, he can
+        # delete all its relations without security checking
+        assert entities and len(entities) == len(extids)
+        with security_enabled(session, read=False, write=False):
+            eids = [_e.eid for _e in entities]
+            in_eids = ','.join((str(eid) for eid in eids))
+            for rschema, _, role in entities[0].e_schema.relation_definitions():
+                rtype = rschema.type
+                if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
+                    continue
+                if role == 'subject':
+                    # don't skip inlined relation so they are regularly
+                    # deleted and so hooks are correctly called
+                    rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids)
+                else:
+                    rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids)
+                if scleanup:
+                    # source cleaning: only delete relations stored locally
+                    rql += ', NOT (Y cw_source S, S name %(source)s)'
+                try:
+                    session.execute(rql, {'source': sourceuri},
+                                    build_descr=False)
+                except:
+                    self.exception('error while cascading delete for entity %s '
+                                   'from %s. RQL: %s', entities, sourceuri, rql)
+        self.system_source.delete_info_multi(session, entities, sourceuri, extids)
     def locate_relation_source(self, session, subject, rtype, object):
         subjsource = self.source_from_eid(subject, session)
         objsource = self.source_from_eid(object, session)
@@ -1297,19 +1340,40 @@
             if orig_edited is not None:
                 entity.cw_edited = orig_edited
-    def glob_delete_entity(self, session, eid):
-        """delete an entity and all related entities from the repository"""
-        entity = session.entity_from_eid(eid)
-        etype, sourceuri, extid = self.type_and_source_from_eid(eid, session)
-        if server.DEBUG & server.DBG_REPO:
-            print 'DELETE entity', etype, eid
-        source = self.sources_by_uri[sourceuri]
-        if source.should_call_hooks:
-  'before_delete_entity', session, entity=entity)
-        self._delete_info(session, entity, sourceuri, extid)
-        source.delete_entity(session, entity)
-        if source.should_call_hooks:
-  'after_delete_entity', session, entity=entity)
+    def glob_delete_entities(self, session, eids):
+        """delete a list of  entities and all related entities from the repository"""
+        data_by_etype_source = {} # values are ([list of eids],
+                                  #             [list of extid],
+                                  #             [list of entities])
+        #
+        # WARNING: the way this dictionary is populated is heavily optimized
+        # and does not use setdefault on purpose. Unless a new release
+        # of the Python interpreter advertises large perf improvements
+        # in setdefault, this should not be changed without profiling.
+        for eid in eids:
+            etype, sourceuri, extid = self.type_and_source_from_eid(eid, session)
+            entity = session.entity_from_eid(eid, etype)
+            _key = (etype, sourceuri)
+            if _key not in data_by_etype_source:
+                data_by_etype_source[_key] = ([eid], [extid], [entity])
+            else:
+                _data = data_by_etype_source[_key]
+                _data[0].append(eid)
+                _data[1].append(extid)
+                _data[2].append(entity)
+        for (etype, sourceuri), (eids, extids, entities) in data_by_etype_source.iteritems():
+            if server.DEBUG & server.DBG_REPO:
+                print 'DELETE entities', etype, eids
+            #print 'DELETE entities', etype, len(eids)
+            source = self.sources_by_uri[sourceuri]
+            if source.should_call_hooks:
+      'before_delete_entity', session, entities=entities)
+            self._delete_info_multi(session, entities, sourceuri, extids) # xxx
+            source.delete_entities(session, entities)
+            if source.should_call_hooks:
+      'after_delete_entity', session, entities=entities)
         # don't clear cache here this is done in a hook on commit
     def glob_add_relation(self, session, subject, rtype, object):
--- a/server/sources/	Tue Jan 25 10:01:19 2011 +0100
+++ b/server/sources/	Tue Jan 25 12:09:59 2011 +0100
@@ -22,6 +22,7 @@
 from os.path import join, splitext
 from datetime import datetime, timedelta
 from logging import getLogger
+import itertools
 from cubicweb import set_log_methods, server
 from cubicweb.schema import VIRTUAL_RTYPES
@@ -372,6 +373,11 @@
         """update an entity in the source"""
         raise NotImplementedError()
+    def delete_entities(self, session, entities):
+        """delete several entities from the source"""
+        for entity in entities:
+            self.delete_entity(session, entity)
     def delete_entity(self, session, entity):
         """delete an entity from the source"""
         raise NotImplementedError()
@@ -401,12 +407,19 @@
         """mark entity as being modified, fulltext reindex if needed"""
         raise NotImplementedError()
-    def delete_info(self, session, entity, uri, extid, attributes, relations):
+    def delete_info(self, session, entity, uri, extid):
         """delete system information on deletion of an entity by transfering
         record from the entities table to the deleted_entities table
         raise NotImplementedError()
+    def delete_info_multi(self, session, entities, uri, extids):
+        """ame as delete_info but accepts a list of entities with
+        the same etype and belinging to the same source.
+        """
+        for entity, extid in itertools.izip(entities, extids):
+            self.delete_info(session, entity, uri, extid)
     def modified_entities(self, session, etypes, mtime):
         """return a 2-uple:
         * list of (etype, eid) of entities of the given types which have been
@@ -423,14 +436,13 @@
         raise NotImplementedError()
-    def fti_unindex_entity(self, session, eid):
-        """remove text content for entity with the given eid from the full text
-        index
+    def fti_unindex_entities(self, session, entities):
+        """remove text content for entities from the full text index
         raise NotImplementedError()
-    def fti_index_entity(self, session, entity):
-        """add text content of a created/modified entity to the full text index
+    def fti_index_entities(self, session, entities):
+        """add text content of created/modified entities to the full text index
         raise NotImplementedError()
--- a/server/sources/	Tue Jan 25 10:01:19 2011 +0100
+++ b/server/sources/	Tue Jan 25 12:09:59 2011 +0100
@@ -35,6 +35,7 @@
 from contextlib import contextmanager
 from os.path import abspath
 import re
+import itertools
 from logilab.common.compat import any
 from logilab.common.cache import Cache
@@ -547,23 +548,29 @@
         #    on the filesystem. To make the usage absolutely
         #    transparent, we'll have to reset to its binary
         #    value once the SQL query will be executed
-        restore_values = {}
-        etype = entity.__regid__
+        restore_values = []
+        if isinstance(entity, list):
+            entities = entity
+        else:
+            entities = [entity]
+        etype = entities[0].__regid__
         for attr, storage in self._storages.get(etype, {}).items():
-            try:
-                edited = entity.cw_edited
-            except AttributeError:
-                assert event == 'deleted'
-                getattr(storage, 'entity_deleted')(entity, attr)
-            else:
-                if attr in edited:
-                    handler = getattr(storage, 'entity_%s' % event)
-                    restore_values[attr] = handler(entity, attr)
+            for entity in entities:
+                try:
+                    edited = entity.cw_edited
+                except AttributeError:
+                    assert event == 'deleted'
+                    getattr(storage, 'entity_deleted')(entity, attr)
+                else:
+                    if attr in edited:
+                        handler = getattr(storage, 'entity_%s' % event)
+                        to_restore = handler(entity, attr)
+                        restore_values.append((entity, attr, to_restore))
             yield # 2/ execute the source's instructions
             # 3/ restore original values
-            for attr, value in restore_values.items():
+            for entity, attr, value in restore_values:
                 entity.cw_edited.edited_attribute(attr, value)
     def add_entity(self, session, entity):
@@ -918,7 +925,7 @@
         * transfer it to the deleted_entities table if the entity's type is
-        self.fti_unindex_entity(session, entity.eid)
+        self.fti_unindex_entities(session, [entity])
         attrs = {'eid': entity.eid}
         self.doexec(session, self.sqlgen.delete('entities', attrs), attrs)
         if not entity.__regid__ in self.multisources_etypes:
@@ -930,6 +937,27 @@
                  'source': uri, 'dtime':}
         self.doexec(session, self.sqlgen.insert('deleted_entities', attrs), attrs)
+    def delete_info_multi(self, session, entities, uri, extids):
+        """delete system information on deletion of an entity:
+        * update the fti
+        * remove record from the entities table
+        * transfer it to the deleted_entities table if the entity's type is
+          multi-sources
+        """
+        self.fti_unindex_entities(session, entities)
+        attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
+        self.doexec(session, self.sqlgen.delete_many('entities', attrs), attrs)
+        if entities[0].__regid__ not in self.multisources_etypes:
+            return
+        attrs = {'type': entities[0].__regid__,
+                 'source': uri, 'dtime':}
+        for entity, extid in itertools.izip(entities, extids):
+            if extid is not None:
+                assert isinstance(extid, str), type(extid)
+                extid = b64encode(extid)
+            attrs.update({'eid': entity.eid, 'extid': extid})
+            self.doexec(session, self.sqlgen.insert('deleted_entities', attrs), attrs)
     def modified_entities(self, session, etypes, mtime):
         """return a 2-uple:
         * list of (etype, eid) of entities of the given types which have been
@@ -1297,27 +1325,32 @@
-    def fti_unindex_entity(self, session, eid):
-        """remove text content for entity with the given eid from the full text
-        index
+    def fti_unindex_entities(self, session, entities):
+        """remove text content for entities from the full text index
+        cursor = session.pool['system']
+        cursor_unindex_object = self.dbhelper.cursor_unindex_object
-            self.dbhelper.cursor_unindex_object(eid, session.pool['system'])
+            for entity in entities:
+                cursor_unindex_object(entity.eid, cursor)
         except Exception: # let KeyboardInterrupt / SystemExit propagate
-            self.exception('error while unindexing %s', eid)
+            self.exception('error while unindexing %s', entity)
-    def fti_index_entity(self, session, entity):
-        """add text content of a created/modified entity to the full text index
+    def fti_index_entities(self, session, entities):
+        """add text content of created/modified entities to the full text index
-        self.debug('reindexing %r', entity.eid)
+        cursor_index_object = self.dbhelper.cursor_index_object
+        cursor = session.pool['system']
             # use cursor_index_object, not cursor_reindex_object since
             # unindexing done in the FTIndexEntityOp
-            self.dbhelper.cursor_index_object(entity.eid,
-                                              entity.cw_adapt_to('IFTIndexable'),
-                                              session.pool['system'])
+            for entity in entities:
+                cursor_index_object(entity.eid,
+                                    entity.cw_adapt_to('IFTIndexable'),
+                                    cursor)
         except Exception: # let KeyboardInterrupt / SystemExit propagate
-            self.exception('error while reindexing %s', entity)
+            self.exception('error while indexing %s', entity)
 class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation):
@@ -1333,17 +1366,17 @@
         source = session.repo.system_source
         pendingeids = session.transaction_data.get('pendingeids', ())
         done = session.transaction_data.setdefault('indexedeids', set())
+        to_reindex = set()
         for eid in self.get_data():
             if eid in pendingeids or eid in done:
                 # entity added and deleted in the same transaction or already
                 # processed
-                return
+                continue
             iftindexable = session.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
-            for container in iftindexable.fti_containers():
-                source.fti_unindex_entity(session, container.eid)
-                source.fti_index_entity(session, container)
+            to_reindex |= set(iftindexable.fti_containers())
+        source.fti_unindex_entities(session, to_reindex)
+        source.fti_index_entities(session, to_reindex)
 def sql_schema(driver):
     helper = get_db_helper(driver)
--- a/server/	Tue Jan 25 10:01:19 2011 +0100
+++ b/server/	Tue Jan 25 12:09:59 2011 +0100
@@ -647,15 +647,13 @@
         results = self.execute_child()
         todelete = frozenset(typed_eid(eid) for eid, in results)
         session = self.plan.session
-        delete = session.repo.glob_delete_entity
         # mark eids as being deleted in session info and setup cache update
         # operation (register pending eids before actual deletion to avoid
-        # multiple call to glob_delete_entity)
+        # multiple call to glob_delete_entities)
         op = CleanupDeletedEidsCacheOp.get_instance(session)
         actual = todelete - op._container
         op._container |= actual
-        for eid in actual:
-            delete(session, eid)
+        session.repo.glob_delete_entities(session, actual)
         return results
 class DeleteRelationsStep(Step):