# HG changeset patch # User Alexandre Fayolle # Date 1295953799 -3600 # Node ID 37668bf302f51ad8c6e96b63b42b454f0108cdd8 # Parent c02e5ba433669b409389ce6e29813f12503c6737 improve massive deletion performance change hooks.integrity._DelayedDeleteOp implementation to give it a chance of processing the entities by chunks of reasonnable size (500 entities at a time) adapt ssplanner.DeleteEntitiesStep to call a variant of glob_delete_entity with several entities. That variant calls all the before_delete_entities hooks in one go, then performs the deletion, and then calls all the after_delete_entities hooks. The deletion is performed by grouping together entities by etype and by source. adapt the HooksManager to call the hooks on a list of entities instead of on a single entity. adapt the sources to be able to delete several entities of the same etype at once. changed the source fti_(un)index_entity methods to fti_(un)index_entities taking a collection of entities. diff -r c02e5ba43366 -r 37668bf302f5 hooks/integrity.py --- a/hooks/integrity.py Tue Jan 25 10:01:19 2011 +0100 +++ b/hooks/integrity.py Tue Jan 25 12:09:59 2011 +0100 @@ -110,15 +110,32 @@ category = 'integrity' -class CheckCardinalityHook(IntegrityHook): +class CheckCardinalityHookBeforeDeleteRelation(IntegrityHook): """check cardinalities are satisfied""" - __regid__ = 'checkcard' - events = ('after_add_entity', 'before_delete_relation') + __regid__ = 'checkcard_before_delete_relation' + events = ('before_delete_relation',) def __call__(self): - getattr(self, self.event)() + rtype = self.rtype + if rtype in DONT_CHECK_RTYPES_ON_DEL: + return + session = self._cw + eidfrom, eidto = self.eidfrom, self.eidto + pendingrdefs = session.transaction_data.get('pendingrdefs', ()) + if (session.describe(eidfrom)[0], rtype, session.describe(eidto)[0]) in pendingrdefs: + return + card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality') + if card[0] in '1+' and not session.deleted_in_transaction(eidfrom): + _CheckSRelationOp.get_instance(self._cw).add_data((eidfrom, rtype)) + if card[1] in '1+' and not session.deleted_in_transaction(eidto): + _CheckORelationOp.get_instance(self._cw).add_data((eidto, rtype)) - def after_add_entity(self): +class CheckCardinalityHookAfterAddEntity(IntegrityHook): + """check cardinalities are satisfied""" + __regid__ = 'checkcard_after_add_entity' + events = ('after_add_entity',) + + def __call__(self): eid = self.entity.eid eschema = self.entity.e_schema for rschema, targetschemas, role in eschema.relation_definitions(): @@ -299,19 +316,33 @@ session = self.session pendingeids = session.transaction_data.get('pendingeids', ()) neweids = session.transaction_data.get('neweids', ()) + eids_by_etype_rtype = {} for eid, rtype in self.get_data(): # don't do anything if the entity is being created or deleted if not (eid in pendingeids or eid in neweids): etype = session.describe(eid)[0] - session.execute(self.base_rql % (etype, rtype), {'x': eid}) + key = (etype, rtype) + if key not in eids_by_etype_rtype: + eids_by_etype_rtype[key] = [str(eid)] + else: + eids_by_etype_rtype[key].append(str(eid)) + for (etype, rtype), eids in eids_by_etype_rtype.iteritems(): + # quite unexpectedly, not deleting too many entities at a time in + # this operation benefits to the exec speed (possibly on the RQL + # parsing side) + start = 0 + incr = 500 + while start < len(eids): + session.execute(self.base_rql % (etype, ','.join(eids[start:start+incr]), rtype)) + start += incr class _DelayedDeleteSEntityOp(_DelayedDeleteOp): """delete orphan subject entity of a composite relation""" - base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT X %s Y' + base_rql = 'DELETE %s X WHERE X eid IN (%s), NOT X %s Y' class _DelayedDeleteOEntityOp(_DelayedDeleteOp): """check required object relation""" - base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT Y %s X' + base_rql = 'DELETE %s X WHERE X eid IN (%s), NOT Y %s X' class DeleteCompositeOrphanHook(hook.Hook): diff -r c02e5ba43366 -r 37668bf302f5 hooks/syncschema.py --- a/hooks/syncschema.py Tue Jan 25 10:01:19 2011 +0100 +++ b/hooks/syncschema.py Tue Jan 25 12:09:59 2011 +0100 @@ -1212,11 +1212,11 @@ len(rset), etype) still_fti = list(schema[etype].indexable_attributes()) for entity in rset.entities(): - source.fti_unindex_entity(session, entity.eid) + source.fti_unindex_entities(session, [entity]) for container in entity.cw_adapt_to('IFTIndexable').fti_containers(): if still_fti or container is not entity: - source.fti_unindex_entity(session, container.eid) - source.fti_index_entity(session, container) + source.fti_unindex_entities(session, [container]) + source.fti_index_entities(session, [container]) if to_reindex: # Transaction has already been committed session.pool.commit() diff -r c02e5ba43366 -r 37668bf302f5 server/checkintegrity.py --- a/server/checkintegrity.py Tue Jan 25 10:01:19 2011 +0100 +++ b/server/checkintegrity.py Tue Jan 25 12:09:59 2011 +0100 @@ -131,8 +131,8 @@ # attribute to their current value source = repo.system_source for eschema in etypes: - for entity in session.execute('Any X WHERE X is %s' % eschema).entities(): - source.fti_index_entity(session, entity) + rset = session.execute('Any X WHERE X is %s' % eschema) + source.fti_index_entities(session, rset.entities()) if withpb: pb.update() diff -r c02e5ba43366 -r 37668bf302f5 server/hook.py --- a/server/hook.py Tue Jan 25 10:01:19 2011 +0100 +++ b/server/hook.py Tue Jan 25 12:09:59 2011 +0100 @@ -274,6 +274,14 @@ 'session_open', 'session_close')) ALL_HOOKS = ENTITIES_HOOKS | RELATIONS_HOOKS | SYSTEM_HOOKS +def _iter_kwargs(entities, kwargs): + if not entities: + yield kwargs + else: + for entity in entities: + kwargs['entity'] = entity + yield kwargs + class HooksRegistry(CWRegistry): def initialization_completed(self): @@ -288,20 +296,30 @@ super(HooksRegistry, self).register(obj, **kwargs) def call_hooks(self, event, session=None, **kwargs): + """call `event` hooks for an entity or a list of entities (passed + respectively as the `entity` or ``entities`` keyword argument). + """ kwargs['event'] = event - if session is None: + if session is None: # True for events such as server_start for hook in sorted(self.possible_objects(session, **kwargs), key=lambda x: x.order): hook() else: + if 'entities' in kwargs: + assert 'entity' not in kwargs, \ + 'can\'t pass "entities" and "entity" arguments simultaneously' + entities = kwargs.pop('entities') + else: + entities = [] # by default, hooks are executed with security turned off with security_enabled(session, read=False): - hooks = sorted(self.possible_objects(session, **kwargs), - key=lambda x: x.order) - with security_enabled(session, write=False): - for hook in hooks: - #print hook.category, hook.__regid__ - hook() + for _kwargs in _iter_kwargs(entities, kwargs): + hooks = sorted(self.possible_objects(session, **_kwargs), + key=lambda x: x.order) + with security_enabled(session, write=False): + for hook in hooks: + #print hook.category, hook.__regid__ + hook() class HooksManager(object): def __init__(self, vreg): diff -r c02e5ba43366 -r 37668bf302f5 server/repository.py --- a/server/repository.py Tue Jan 25 10:01:19 2011 +0100 +++ b/server/repository.py Tue Jan 25 12:09:59 2011 +0100 @@ -1097,6 +1097,17 @@ hook.CleanupDeletedEidsCacheOp.get_instance(session).add_data(entity.eid) self._delete_info(session, entity, sourceuri, extid, scleanup) + def delete_info_multi(self, session, entities, sourceuri, extids, scleanup=False): + """same as delete_info but accepts a list of entities and + extids with the same etype and belonging to the same source + """ + # mark eid as being deleted in session info and setup cache update + # operation + op = hook.CleanupDeletedEidsCacheOp.get_instance(session) + for entity in entities: + op.add_data(entity.eid) + self._delete_info_multi(session, entities, sourceuri, extids, scleanup) + def _delete_info(self, session, entity, sourceuri, extid, scleanup=False): """delete system information on deletion of an entity: * delete all remaining relations from/to this entity @@ -1129,6 +1140,38 @@ 'from %s. RQL: %s', entity, sourceuri, rql) self.system_source.delete_info(session, entity, sourceuri, extid) + def _delete_info_multi(self, session, entities, sourceuri, extids, scleanup=False): + """same as _delete_info but accepts a list of entities with + the same etype and belinging to the same source. + """ + pendingrtypes = session.transaction_data.get('pendingrtypes', ()) + # delete remaining relations: if user can delete the entity, he can + # delete all its relations without security checking + assert entities and len(entities) == len(extids) + with security_enabled(session, read=False, write=False): + eids = [_e.eid for _e in entities] + in_eids = ','.join((str(eid) for eid in eids)) + for rschema, _, role in entities[0].e_schema.relation_definitions(): + rtype = rschema.type + if rtype in schema.VIRTUAL_RTYPES or rtype in pendingrtypes: + continue + if role == 'subject': + # don't skip inlined relation so they are regularly + # deleted and so hooks are correctly called + rql = 'DELETE X %s Y WHERE X eid IN (%s)' % (rtype, in_eids) + else: + rql = 'DELETE Y %s X WHERE X eid IN (%s)' % (rtype, in_eids) + if scleanup: + # source cleaning: only delete relations stored locally + rql += ', NOT (Y cw_source S, S name %(source)s)' + try: + session.execute(rql, {'source': sourceuri}, + build_descr=False) + except: + self.exception('error while cascading delete for entity %s ' + 'from %s. RQL: %s', entities, sourceuri, rql) + self.system_source.delete_info_multi(session, entities, sourceuri, extids) + def locate_relation_source(self, session, subject, rtype, object): subjsource = self.source_from_eid(subject, session) objsource = self.source_from_eid(object, session) @@ -1297,19 +1340,40 @@ if orig_edited is not None: entity.cw_edited = orig_edited - def glob_delete_entity(self, session, eid): - """delete an entity and all related entities from the repository""" - entity = session.entity_from_eid(eid) - etype, sourceuri, extid = self.type_and_source_from_eid(eid, session) - if server.DEBUG & server.DBG_REPO: - print 'DELETE entity', etype, eid - source = self.sources_by_uri[sourceuri] - if source.should_call_hooks: - self.hm.call_hooks('before_delete_entity', session, entity=entity) - self._delete_info(session, entity, sourceuri, extid) - source.delete_entity(session, entity) - if source.should_call_hooks: - self.hm.call_hooks('after_delete_entity', session, entity=entity) + + def glob_delete_entities(self, session, eids): + """delete a list of entities and all related entities from the repository""" + data_by_etype_source = {} # values are ([list of eids], + # [list of extid], + # [list of entities]) + # + # WARNING: the way this dictionary is populated is heavily optimized + # and does not use setdefault on purpose. Unless a new release + # of the Python interpreter advertises large perf improvements + # in setdefault, this should not be changed without profiling. + + for eid in eids: + etype, sourceuri, extid = self.type_and_source_from_eid(eid, session) + entity = session.entity_from_eid(eid, etype) + _key = (etype, sourceuri) + if _key not in data_by_etype_source: + data_by_etype_source[_key] = ([eid], [extid], [entity]) + else: + _data = data_by_etype_source[_key] + _data[0].append(eid) + _data[1].append(extid) + _data[2].append(entity) + for (etype, sourceuri), (eids, extids, entities) in data_by_etype_source.iteritems(): + if server.DEBUG & server.DBG_REPO: + print 'DELETE entities', etype, eids + #print 'DELETE entities', etype, len(eids) + source = self.sources_by_uri[sourceuri] + if source.should_call_hooks: + self.hm.call_hooks('before_delete_entity', session, entities=entities) + self._delete_info_multi(session, entities, sourceuri, extids) # xxx + source.delete_entities(session, entities) + if source.should_call_hooks: + self.hm.call_hooks('after_delete_entity', session, entities=entities) # don't clear cache here this is done in a hook on commit def glob_add_relation(self, session, subject, rtype, object): diff -r c02e5ba43366 -r 37668bf302f5 server/sources/__init__.py --- a/server/sources/__init__.py Tue Jan 25 10:01:19 2011 +0100 +++ b/server/sources/__init__.py Tue Jan 25 12:09:59 2011 +0100 @@ -22,6 +22,7 @@ from os.path import join, splitext from datetime import datetime, timedelta from logging import getLogger +import itertools from cubicweb import set_log_methods, server from cubicweb.schema import VIRTUAL_RTYPES @@ -372,6 +373,11 @@ """update an entity in the source""" raise NotImplementedError() + def delete_entities(self, session, entities): + """delete several entities from the source""" + for entity in entities: + self.delete_entity(session, entity) + def delete_entity(self, session, entity): """delete an entity from the source""" raise NotImplementedError() @@ -401,12 +407,19 @@ """mark entity as being modified, fulltext reindex if needed""" raise NotImplementedError() - def delete_info(self, session, entity, uri, extid, attributes, relations): + def delete_info(self, session, entity, uri, extid): """delete system information on deletion of an entity by transfering record from the entities table to the deleted_entities table """ raise NotImplementedError() + def delete_info_multi(self, session, entities, uri, extids): + """ame as delete_info but accepts a list of entities with + the same etype and belinging to the same source. + """ + for entity, extid in itertools.izip(entities, extids): + self.delete_info(session, entity, uri, extid) + def modified_entities(self, session, etypes, mtime): """return a 2-uple: * list of (etype, eid) of entities of the given types which have been @@ -423,14 +436,13 @@ """ raise NotImplementedError() - def fti_unindex_entity(self, session, eid): - """remove text content for entity with the given eid from the full text - index + def fti_unindex_entities(self, session, entities): + """remove text content for entities from the full text index """ raise NotImplementedError() - def fti_index_entity(self, session, entity): - """add text content of a created/modified entity to the full text index + def fti_index_entities(self, session, entities): + """add text content of created/modified entities to the full text index """ raise NotImplementedError() diff -r c02e5ba43366 -r 37668bf302f5 server/sources/native.py --- a/server/sources/native.py Tue Jan 25 10:01:19 2011 +0100 +++ b/server/sources/native.py Tue Jan 25 12:09:59 2011 +0100 @@ -35,6 +35,7 @@ from contextlib import contextmanager from os.path import abspath import re +import itertools from logilab.common.compat import any from logilab.common.cache import Cache @@ -547,23 +548,29 @@ # on the filesystem. To make the entity.data usage absolutely # transparent, we'll have to reset entity.data to its binary # value once the SQL query will be executed - restore_values = {} - etype = entity.__regid__ + restore_values = [] + if isinstance(entity, list): + entities = entity + else: + entities = [entity] + etype = entities[0].__regid__ for attr, storage in self._storages.get(etype, {}).items(): - try: - edited = entity.cw_edited - except AttributeError: - assert event == 'deleted' - getattr(storage, 'entity_deleted')(entity, attr) - else: - if attr in edited: - handler = getattr(storage, 'entity_%s' % event) - restore_values[attr] = handler(entity, attr) + for entity in entities: + try: + edited = entity.cw_edited + except AttributeError: + assert event == 'deleted' + getattr(storage, 'entity_deleted')(entity, attr) + else: + if attr in edited: + handler = getattr(storage, 'entity_%s' % event) + to_restore = handler(entity, attr) + restore_values.append((entity, attr, to_restore)) try: yield # 2/ execute the source's instructions finally: # 3/ restore original values - for attr, value in restore_values.items(): + for entity, attr, value in restore_values: entity.cw_edited.edited_attribute(attr, value) def add_entity(self, session, entity): @@ -918,7 +925,7 @@ * transfer it to the deleted_entities table if the entity's type is multi-sources """ - self.fti_unindex_entity(session, entity.eid) + self.fti_unindex_entities(session, [entity]) attrs = {'eid': entity.eid} self.doexec(session, self.sqlgen.delete('entities', attrs), attrs) if not entity.__regid__ in self.multisources_etypes: @@ -930,6 +937,27 @@ 'source': uri, 'dtime': datetime.now()} self.doexec(session, self.sqlgen.insert('deleted_entities', attrs), attrs) + def delete_info_multi(self, session, entities, uri, extids): + """delete system information on deletion of an entity: + * update the fti + * remove record from the entities table + * transfer it to the deleted_entities table if the entity's type is + multi-sources + """ + self.fti_unindex_entities(session, entities) + attrs = {'eid': '(%s)' % ','.join([str(_e.eid) for _e in entities])} + self.doexec(session, self.sqlgen.delete_many('entities', attrs), attrs) + if entities[0].__regid__ not in self.multisources_etypes: + return + attrs = {'type': entities[0].__regid__, + 'source': uri, 'dtime': datetime.now()} + for entity, extid in itertools.izip(entities, extids): + if extid is not None: + assert isinstance(extid, str), type(extid) + extid = b64encode(extid) + attrs.update({'eid': entity.eid, 'extid': extid}) + self.doexec(session, self.sqlgen.insert('deleted_entities', attrs), attrs) + def modified_entities(self, session, etypes, mtime): """return a 2-uple: * list of (etype, eid) of entities of the given types which have been @@ -1297,27 +1325,32 @@ """ FTIndexEntityOp.get_instance(session).add_data(entity.eid) - def fti_unindex_entity(self, session, eid): - """remove text content for entity with the given eid from the full text - index + def fti_unindex_entities(self, session, entities): + """remove text content for entities from the full text index """ + cursor = session.pool['system'] + cursor_unindex_object = self.dbhelper.cursor_unindex_object try: - self.dbhelper.cursor_unindex_object(eid, session.pool['system']) + for entity in entities: + cursor_unindex_object(entity.eid, cursor) except Exception: # let KeyboardInterrupt / SystemExit propagate - self.exception('error while unindexing %s', eid) + self.exception('error while unindexing %s', entity) + - def fti_index_entity(self, session, entity): - """add text content of a created/modified entity to the full text index + def fti_index_entities(self, session, entities): + """add text content of created/modified entities to the full text index """ - self.debug('reindexing %r', entity.eid) + cursor_index_object = self.dbhelper.cursor_index_object + cursor = session.pool['system'] try: # use cursor_index_object, not cursor_reindex_object since # unindexing done in the FTIndexEntityOp - self.dbhelper.cursor_index_object(entity.eid, - entity.cw_adapt_to('IFTIndexable'), - session.pool['system']) + for entity in entities: + cursor_index_object(entity.eid, + entity.cw_adapt_to('IFTIndexable'), + cursor) except Exception: # let KeyboardInterrupt / SystemExit propagate - self.exception('error while reindexing %s', entity) + self.exception('error while indexing %s', entity) class FTIndexEntityOp(hook.DataOperationMixIn, hook.LateOperation): @@ -1333,17 +1366,17 @@ source = session.repo.system_source pendingeids = session.transaction_data.get('pendingeids', ()) done = session.transaction_data.setdefault('indexedeids', set()) + to_reindex = set() for eid in self.get_data(): if eid in pendingeids or eid in done: # entity added and deleted in the same transaction or already # processed - return + continue done.add(eid) iftindexable = session.entity_from_eid(eid).cw_adapt_to('IFTIndexable') - for container in iftindexable.fti_containers(): - source.fti_unindex_entity(session, container.eid) - source.fti_index_entity(session, container) - + to_reindex |= set(iftindexable.fti_containers()) + source.fti_unindex_entities(session, to_reindex) + source.fti_index_entities(session, to_reindex) def sql_schema(driver): helper = get_db_helper(driver) diff -r c02e5ba43366 -r 37668bf302f5 server/ssplanner.py --- a/server/ssplanner.py Tue Jan 25 10:01:19 2011 +0100 +++ b/server/ssplanner.py Tue Jan 25 12:09:59 2011 +0100 @@ -647,15 +647,13 @@ results = self.execute_child() todelete = frozenset(typed_eid(eid) for eid, in results) session = self.plan.session - delete = session.repo.glob_delete_entity # mark eids as being deleted in session info and setup cache update # operation (register pending eids before actual deletion to avoid - # multiple call to glob_delete_entity) + # multiple call to glob_delete_entities) op = CleanupDeletedEidsCacheOp.get_instance(session) actual = todelete - op._container op._container |= actual - for eid in actual: - delete(session, eid) + session.repo.glob_delete_entities(session, actual) return results class DeleteRelationsStep(Step):