improve massive deletion performance
change hooks.integrity._DelayedDeleteOp implementation to give it a chance of
processing the entities by chunks of reasonnable size (500 entities at a time)
adapt ssplanner.DeleteEntitiesStep to call a variant of glob_delete_entity with several entities.
That variant calls all the before_delete_entities hooks in one go, then
performs the deletion, and then calls all the after_delete_entities hooks. The
deletion is performed by grouping together entities by etype and by source.
adapt the HooksManager to call the hooks on a list of entities instead of on a single entity.
adapt the sources to be able to delete several entities of the same etype at once.
changed the source fti_(un)index_entity methods to fti_(un)index_entities taking a collection of entities.
--- a/hooks/integrity.py Tue Jan 25 10:01:19 2011 +0100
+++ b/hooks/integrity.py Tue Jan 25 12:09:59 2011 +0100
@@ -110,15 +110,32 @@
category = 'integrity'
-class CheckCardinalityHook(IntegrityHook):
+class CheckCardinalityHookBeforeDeleteRelation(IntegrityHook):
"""check cardinalities are satisfied"""
- __regid__ = 'checkcard'
- events = ('after_add_entity', 'before_delete_relation')
+ __regid__ = 'checkcard_before_delete_relation'
+ events = ('before_delete_relation',)
def __call__(self):
- getattr(self, self.event)()
+ rtype = self.rtype
+ if rtype in DONT_CHECK_RTYPES_ON_DEL:
+ return
+ session = self._cw
+ eidfrom, eidto = self.eidfrom, self.eidto
+ pendingrdefs = session.transaction_data.get('pendingrdefs', ())
+ if (session.describe(eidfrom)[0], rtype, session.describe(eidto)[0]) in pendingrdefs:
+ return
+ card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality')
+ if card[0] in '1+' and not session.deleted_in_transaction(eidfrom):
+ _CheckSRelationOp.get_instance(self._cw).add_data((eidfrom, rtype))
+ if card[1] in '1+' and not session.deleted_in_transaction(eidto):
+ _CheckORelationOp.get_instance(self._cw).add_data((eidto, rtype))
- def after_add_entity(self):
+class CheckCardinalityHookAfterAddEntity(IntegrityHook):
+ """check cardinalities are satisfied"""
+ __regid__ = 'checkcard_after_add_entity'
+ events = ('after_add_entity',)
+
+ def __call__(self):
eid = self.entity.eid
eschema = self.entity.e_schema
for rschema, targetschemas, role in eschema.relation_definitions():
@@ -299,19 +316,33 @@
session = self.session
pendingeids = session.transaction_data.get('pendingeids', ())
neweids = session.transaction_data.get('neweids', ())
+ eids_by_etype_rtype = {}
for eid, rtype in self.get_data():
# don't do anything if the entity is being created or deleted
if not (eid in pendingeids or eid in neweids):
etype = session.describe(eid)[0]
- session.execute(self.base_rql % (etype, rtype), {'x': eid})
+ key = (etype, rtype)
+ if key not in eids_by_etype_rtype:
+ eids_by_etype_rtype[key] = [str(eid)]
+ else:
+ eids_by_etype_rtype[key].append(str(eid))
+ for (etype, rtype), eids in eids_by_etype_rtype.iteritems():
+ # quite unexpectedly, not deleting too many entities at a time in
+ # this operation benefits to the exec speed (possibly on the RQL
+ # parsing side)
+ start = 0
+ incr = 500
+ while start < len(eids):
+ session.execute(self.base_rql % (etype, ','.join(eids[start:start+incr]), rtype))
+ start += incr
class _DelayedDeleteSEntityOp(_DelayedDeleteOp):
"""delete orphan subject entity of a composite relation"""
- base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT X %s Y'
+ base_rql = 'DELETE %s X WHERE X eid IN (%s), NOT X %s Y'
class _DelayedDeleteOEntityOp(_DelayedDeleteOp):
"""check required object relation"""
- base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT Y %s X'
+ base_rql = 'DELETE %s X WHERE X eid IN (%s), NOT Y %s X'
class DeleteCompositeOrphanHook(hook.Hook):
--- a/hooks/syncschema.py Tue Jan 25 10:01:19 2011 +0100
+++ b/hooks/syncschema.py Tue Jan 25 12:09:59 2011 +0100
@@ -1212,11 +1212,11 @@
len(rset), etype)
still_fti = list(schema[etype].indexable_attributes())
for entity in rset.entities():
- source.fti_unindex_entity(session, entity.eid)
+ source.fti_unindex_entities(session, [entity])
for container in entity.cw_adapt_to('IFTIndexable').fti_containers():
if still_fti or container is not entity:
- source.fti_unindex_entity(session, container.eid)
- source.fti_index_entity(session, container)
+ source.fti_unindex_entities(session, [container])
+ source.fti_index_entities(session, [container])
if to_reindex:
# Transaction has already been committed
session.pool.commit()
--- a/server/checkintegrity.py Tue Jan 25 10:01:19 2011 +0100
+++ b/server/checkintegrity.py Tue Jan 25 12:09:59 2011 +0100
@@ -131,8 +131,8 @@
# attribute to their current value
source = repo.system_source
for eschema in etypes:
- for entity in session.execute('Any X WHERE X is %s' % eschema).entities():
- source.fti_index_entity(session, entity)
+ rset = session.execute('Any X WHERE X is %s' % eschema)
+ source.fti_index_entities(session, rset.entities())
if withpb:
pb.update()
--- a/server/hook.py Tue Jan 25 10:01:19 2011 +0100
+++ b/server/hook.py Tue Jan 25 12:09:59 2011 +0100
@@ -274,6 +274,14 @@
'session_open', 'session_close'))
ALL_HOOKS = ENTITIES_HOOKS | RELATIONS_HOOKS | SYSTEM_HOOKS
+def _iter_kwargs(entities, kwargs):
+ if not entities:
+ yield kwargs
+ else:
+ for entity in entities:
+ kwargs['entity'] = entity
+ yield kwargs
+
class HooksRegistry(CWRegistry):
def initialization_completed(self):
@@ -288,20 +296,30 @@
super(HooksRegistry, self).register(obj, **kwargs)
def call_hooks(self, event, session=None, **kwargs):
+ """call `event` hooks for an entity or a list of entities (passed
+ respectively as the `entity` or ``entities`` keyword argument).
+ """
kwargs['event'] = event
- if session is None:
+ if session is None: # True for events such as server_start
for hook in sorted(self.possible_objects(session, **kwargs),
key=lambda x: x.order):
hook()
else:
+ if 'entities' in kwargs:
+ assert 'entity' not in kwargs, \
+ 'can\'t pass "entities" and "entity" arguments simultaneously'
+ entities = kwargs.pop('entities')
+ else:
+ entities = []
# by default, hooks are executed with security turned off
with security_enabled(session, read=False):
- hooks = sorted(self.possible_objects(session, **kwargs),
- key=lambda x: x.order)
- with security_enabled(session, write=False):
- for hook in hooks:
- #print hook.category, hook.__regid__
- hook()
+ for _kwargs in _iter_kwargs(entities, kwargs):
+ hooks = sorted(self.possible_objects(session, **_kwargs),
+ key=lambda x: x.order)
+ with security_enabled(session, write=False):
+ for hook in hooks:
+ #print hook.category, hook.__regid__
+ hook()
class HooksManager(object):
def __init__(self, vreg):
--- a/server/repository.py Tue Jan 25 10:01:19 2011 +0100
+++ b/server/repository.py Tue Jan 25 12:09:59 2011 +0100
@@ -1097,6 +1097,17 @@
hook.CleanupDeletedEidsCacheOp.get_instance(session).add_data(entity.eid)
self._delete_info(session, entity, sourceuri, extid, scleanup)
+ def delete_info_multi(self, session, entities, sourceuri, extids, scleanup=False):
+ """same as delete_info but accepts a list of entities and
+ extids with the same etype and belonging to the same source
+ """
+ # mark eid as being deleted in session info and setup cache update
+ # operation
+ op = hook.CleanupDeletedEidsCacheOp.get_instance(session)
+ for entity in entities:
+ op.add_data(entity.eid)
+ self._delete_info_multi(session, entities, sourceuri, extids, scleanup)
+
def _delete_info(self, session, entity, sourceuri, extid, scleanup=False):
"""delete system information on deletion of an entity:
* delete all remaining relations from/to this entity
@@ -1129,6 +1140,38 @@
'from %s. RQL: %s', entity, sourceuri, rql)
self.system_source.delete_info(session, entity, sourceuri, extid)
+ def _delete_info_multi(self, session, entities, sourceuri, extids, scleanup=False):
+ """same as _delete_info but accepts a list of entities with
+ the same etype and belinging to the same source.
+ """
+ pendingrtypes = session.transaction_data.get('pendingrtypes', ())
+ # delete remaining relations: if user can delete the entity, he can
+ # delete all its relations without security checking
+ assert entities and len(entities) == len(extids)
+ with security_enabled(session, read=False, write=False):
+ eids = [_e.eid for _e in entities]
+ in_eids = ','.join((str(eid) for eid in eids))
+ for rschema, _, role in entities[0].e_schema.relation_definitions():
+ rtype = rschema.type
+ if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes:
+ continue
+ if role == 'subject':
+ # don't skip inlined relation so they are regularly
+ # deleted and so hooks are correctly called
+ rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids)
+ else:
+ rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids)
+ if scleanup:
+ # source cleaning: only delete relations stored locally
+ rql += ', NOT (Y cw_source S, S name %(source)s)'
+ try:
+ session.execute(rql, {'source': sourceuri},
+ build_descr=False)
+ except:
+ self.exception('error while cascading delete for entity %s '
+ 'from %s. RQL: %s', entities, sourceuri, rql)
+ self.system_source.delete_info_multi(session, entities, sourceuri, extids)
+
def locate_relation_source(self, session, subject, rtype, object):
subjsource = self.source_from_eid(subject, session)
objsource = self.source_from_eid(object, session)
@@ -1297,19 +1340,40 @@
if orig_edited is not None:
entity.cw_edited = orig_edited
- def glob_delete_entity(self, session, eid):
- """delete an entity and all related entities from the repository"""
- entity = session.entity_from_eid(eid)
- etype, sourceuri, extid = self.type_and_source_from_eid(eid, session)
- if server.DEBUG & server.DBG_REPO:
- print 'DELETE entity', etype, eid
- source = self.sources_by_uri[sourceuri]
- if source.should_call_hooks:
- self.hm.call_hooks('before_delete_entity', session, entity=entity)
- self._delete_info(session, entity, sourceuri, extid)
- source.delete_entity(session, entity)
- if source.should_call_hooks:
- self.hm.call_hooks('after_delete_entity', session, entity=entity)
+
+ def glob_delete_entities(self, session, eids):
+ """delete a list of entities and all related entities from the repository"""
+ data_by_etype_source = {} # values are ([list of eids],
+ # [list of extid],
+ # [list of entities])
+ #
+ # WARNING: the way this dictionary is populated is heavily optimized
+ # and does not use setdefault on purpose. Unless a new release
+ # of the Python interpreter advertises large perf improvements
+ # in setdefault, this should not be changed without profiling.
+
+ for eid in eids:
+ etype, sourceuri, extid = self.type_and_source_from_eid(eid, session)
+ entity = session.entity_from_eid(eid, etype)
+ _key = (etype, sourceuri)
+ if _key not in data_by_etype_source:
+ data_by_etype_source[_key] = ([eid], [extid], [entity])
+ else:
+ _data = data_by_etype_source[_key]
+ _data[0].append(eid)
+ _data[1].append(extid)
+ _data[2].append(entity)
+ for (etype, sourceuri), (eids, extids, entities) in data_by_etype_source.iteritems():
+ if server.DEBUG & server.DBG_REPO:
+ print 'DELETE entities', etype, eids
+ #print 'DELETE entities', etype, len(eids)
+ source = self.sources_by_uri[sourceuri]
+ if source.should_call_hooks:
+ self.hm.call_hooks('before_delete_entity', session, entities=entities)
+ self._delete_info_multi(session, entities, sourceuri, extids) # xxx
+ source.delete_entities(session, entities)
+ if source.should_call_hooks:
+ self.hm.call_hooks('after_delete_entity', session, entities=entities)
# don't clear cache here this is done in a hook on commit
def glob_add_relation(self, session, subject, rtype, object):
--- a/server/sources/__init__.py Tue Jan 25 10:01:19 2011 +0100
+++ b/server/sources/__init__.py Tue Jan 25 12:09:59 2011 +0100
@@ -22,6 +22,7 @@
from os.path import join, splitext
from datetime import datetime, timedelta
from logging import getLogger
+import itertools
from cubicweb import set_log_methods, server
from cubicweb.schema import VIRTUAL_RTYPES
@@ -372,6 +373,11 @@
"""update an entity in the source"""
raise NotImplementedError()
+ def delete_entities(self, session, entities):
+ """delete several entities from the source"""
+ for entity in entities:
+ self.delete_entity(session, entity)
+
def delete_entity(self, session, entity):
"""delete an entity from the source"""
raise NotImplementedError()
@@ -401,12 +407,19 @@
"""mark entity as being modified, fulltext reindex if needed"""
raise NotImplementedError()
- def delete_info(self, session, entity, uri, extid, attributes, relations):
+ def delete_info(self, session, entity, uri, extid):
"""delete system information on deletion of an entity by transfering
record from the entities table to the deleted_entities table
"""
raise NotImplementedError()
+ def delete_info_multi(self, session, entities, uri, extids):
+ """ame as delete_info but accepts a list of entities with
+ the same etype and belinging to the same source.
+ """
+ for entity, extid in itertools.izip(entities, extids):
+ self.delete_info(session, entity, uri, extid)
+
def modified_entities(self, session, etypes, mtime):
"""return a 2-uple:
* list of (etype, eid) of entities of the given types which have been
@@ -423,14 +436,13 @@
"""
raise NotImplementedError()
- def fti_unindex_entity(self, session, eid):
- """remove text content for entity with the given eid from the full text
- index
+ def fti_unindex_entities(self, session, entities):
+ """remove text content for entities from the full text index
"""
raise NotImplementedError()
- def fti_index_entity(self, session, entity):
- """add text content of a created/modified entity to the full text index
+ def fti_index_entities(self, session, entities):
+ """add text content of created/modified entities to the full text index
"""
raise NotImplementedError()
--- a/server/sources/native.py Tue Jan 25 10:01:19 2011 +0100
+++ b/server/sources/native.py Tue Jan 25 12:09:59 2011 +0100
@@ -35,6 +35,7 @@
from contextlib import contextmanager
from os.path import abspath
import re
+import itertools
from logilab.common.compat import any
from logilab.common.cache import Cache
@@ -547,23 +548,29 @@
# on the filesystem. To make the entity.data usage absolutely
# transparent, we'll have to reset entity.data to its binary
# value once the SQL query will be executed
- restore_values = {}
- etype = entity.__regid__
+ restore_values = []
+ if isinstance(entity, list):
+ entities = entity
+ else:
+ entities = [entity]
+ etype = entities[0].__regid__
for attr, storage in self._storages.get(etype, {}).items():
- try:
- edited = entity.cw_edited
- except AttributeError:
- assert event == 'deleted'
- getattr(storage, 'entity_deleted')(entity, attr)
- else:
- if attr in edited:
- handler = getattr(storage, 'entity_%s' % event)
- restore_values[attr] = handler(entity, attr)
+ for entity in entities:
+ try:
+ edited = entity.cw_edited
+ except AttributeError:
+ assert event == 'deleted'
+ getattr(storage, 'entity_deleted')(entity, attr)
+ else:
+ if attr in edited:
+ handler = getattr(storage, 'entity_%s' % event)
+ to_restore = handler(entity, attr)
+ restore_values.append((entity, attr, to_restore))
try:
yield # 2/ execute the source's instructions
finally:
# 3/ restore original values
- for attr, value in restore_values.items():
+ for entity, attr, value in restore_values:
entity.cw_edited.edited_attribute(attr, value)
def add_entity(self, session, entity):
@@ -918,7 +925,7 @@
* transfer it to the deleted_entities table if the entity's type is
multi-sources
"""
- self.fti_unindex_entity(session, entity.eid)
+ self.fti_unindex_entities(session, [entity])
attrs = {'eid': entity.eid}
self.doexec(session, self.sqlgen.delete('entities', attrs), attrs)
if not entity.__regid__ in self.multisources_etypes:
@@ -930,6 +937,27 @@
'source': uri, 'dtime': datetime.now()}
self.doexec(session, self.sqlgen.insert('deleted_entities', attrs), attrs)
+ def delete_info_multi(self, session, entities, uri, extids):
+ """delete system information on deletion of an entity:
+ * update the fti
+ * remove record from the entities table
+ * transfer it to the deleted_entities table if the entity's type is
+ multi-sources
+ """
+ self.fti_unindex_entities(session, entities)
+ attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])}
+ self.doexec(session, self.sqlgen.delete_many('entities', attrs), attrs)
+ if entities[0].__regid__ not in self.multisources_etypes:
+ return
+ attrs = {'type': entities[0].__regid__,
+ 'source': uri, 'dtime': datetime.now()}
+ for entity, extid in itertools.izip(entities, extids):
+ if extid is not None:
+ assert isinstance(extid, str), type(extid)
+ extid = b64encode(extid)
+ attrs.update({'eid': entity.eid, 'extid': extid})
+ self.doexec(session, self.sqlgen.insert('deleted_entities', attrs), attrs)
+
def modified_entities(self, session, etypes, mtime):
"""return a 2-uple:
* list of (etype, eid) of entities of the given types which have been
@@ -1297,27 +1325,32 @@
"""
FTIndexEntityOp.get_instance(session).add_data(entity.eid)
- def fti_unindex_entity(self, session, eid):
- """remove text content for entity with the given eid from the full text
- index
+ def fti_unindex_entities(self, session, entities):
+ """remove text content for entities from the full text index
"""
+ cursor = session.pool['system']
+ cursor_unindex_object = self.dbhelper.cursor_unindex_object
try:
- self.dbhelper.cursor_unindex_object(eid, session.pool['system'])
+ for entity in entities:
+ cursor_unindex_object(entity.eid, cursor)
except Exception: # let KeyboardInterrupt / SystemExit propagate
- self.exception('error while unindexing %s', eid)
+ self.exception('error while unindexing %s', entity)
+
- def fti_index_entity(self, session, entity):
- """add text content of a created/modified entity to the full text index
+ def fti_index_entities(self, session, entities):
+ """add text content of created/modified entities to the full text index
"""
- self.debug('reindexing %r', entity.eid)
+ cursor_index_object = self.dbhelper.cursor_index_object
+ cursor = session.pool['system']
try:
# use cursor_index_object, not cursor_reindex_object since
# unindexing done in the FTIndexEntityOp
- self.dbhelper.cursor_index_object(entity.eid,
- entity.cw_adapt_to('IFTIndexable'),
- session.pool['system'])
+ for entity in entities:
+ cursor_index_object(entity.eid,
+ entity.cw_adapt_to('IFTIndexable'),
+ cursor)
except Exception: # let KeyboardInterrupt / SystemExit propagate
- self.exception('error while reindexing %s', entity)
+ self.exception('error while indexing %s', entity)
class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation):
@@ -1333,17 +1366,17 @@
source = session.repo.system_source
pendingeids = session.transaction_data.get('pendingeids', ())
done = session.transaction_data.setdefault('indexedeids', set())
+ to_reindex = set()
for eid in self.get_data():
if eid in pendingeids or eid in done:
# entity added and deleted in the same transaction or already
# processed
- return
+ continue
done.add(eid)
iftindexable = session.entity_from_eid(eid).cw_adapt_to('IFTIndexable')
- for container in iftindexable.fti_containers():
- source.fti_unindex_entity(session, container.eid)
- source.fti_index_entity(session, container)
-
+ to_reindex |= set(iftindexable.fti_containers())
+ source.fti_unindex_entities(session, to_reindex)
+ source.fti_index_entities(session, to_reindex)
def sql_schema(driver):
helper = get_db_helper(driver)
--- a/server/ssplanner.py Tue Jan 25 10:01:19 2011 +0100
+++ b/server/ssplanner.py Tue Jan 25 12:09:59 2011 +0100
@@ -647,15 +647,13 @@
results = self.execute_child()
todelete = frozenset(typed_eid(eid) for eid, in results)
session = self.plan.session
- delete = session.repo.glob_delete_entity
# mark eids as being deleted in session info and setup cache update
# operation (register pending eids before actual deletion to avoid
- # multiple call to glob_delete_entity)
+ # multiple call to glob_delete_entities)
op = CleanupDeletedEidsCacheOp.get_instance(session)
actual = todelete - op._container
op._container |= actual
- for eid in actual:
- delete(session, eid)
+ session.repo.glob_delete_entities(session, actual)
return results
class DeleteRelationsStep(Step):