[repo] optimize massive insertion/deletion by using the new set_operation function stable
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Mon, 29 Mar 2010 13:34:06 +0200
branchstable
changeset 5060 ee3b856e1406
parent 5059 1d5c81588144
child 5061 cdab5220eac0
[repo] optimize massive insertion/deletion by using the new set_operation function Idea is that on massive insertion, cost of handling the list of operation become non negligeable, so we should minimize the number of operations in that list. The set_operation function ease usage of operation associated to data in session.transaction_data, and we only add the operation when data set isn't initialized yet, else we simply add data to the set. The operation then simply process accumulated data.
hooks/integrity.py
server/hook.py
--- a/hooks/integrity.py	Mon Mar 29 13:28:41 2010 +0200
+++ b/hooks/integrity.py	Mon Mar 29 13:34:06 2010 +0200
@@ -17,6 +17,7 @@
 from cubicweb.selectors import implements
 from cubicweb.uilib import soup2xhtml
 from cubicweb.server import hook
+from cubicweb.server.hook import set_operation
 
 # special relations that don't have to be checked for integrity, usually
 # because they are handled internally by hooks (so we trust ourselves)
@@ -62,41 +63,40 @@
     """checking relation cardinality has to be done after commit in
     case the relation is being replaced
     """
-    eid, rtype = None, None
+    role = key = base_rql = None
 
     def precommit_event(self):
-        # recheck pending eids
-        if self.session.deleted_in_transaction(self.eid):
-            return
-        if self.rtype in self.session.transaction_data.get('pendingrtypes', ()):
-            return
-        if self.session.execute(*self._rql()).rowcount < 1:
-            etype = self.session.describe(self.eid)[0]
-            _ = self.session._
-            msg = _('at least one relation %(rtype)s is required on %(etype)s (%(eid)s)')
-            msg %= {'rtype': _(self.rtype), 'etype': _(etype), 'eid': self.eid}
-            qname = role_name(self.rtype, self.role)
-            raise ValidationError(self.eid, {qname: msg})
-
-    def commit_event(self):
-        pass
-
-    def _rql(self):
-        raise NotImplementedError()
+        session =self.session
+        pendingeids = session.transaction_data.get('pendingeids', ())
+        pendingrtypes = session.transaction_data.get('pendingrtypes', ())
+        # poping key is not optional: if further operation trigger new deletion
+        # of relation, we'll need a new operation
+        for eid, rtype in session.transaction_data.pop(self.key):
+            # recheck pending eids / relation types
+            if eid in pendingeids:
+                continue
+            if rtype in pendingrtypes:
+                continue
+            if not session.execute(self.base_rql % rtype, {'x': eid}, 'x'):
+                etype = session.describe(eid)[0]
+                _ = session._
+                msg = _('at least one relation %(rtype)s is required on '
+                        '%(etype)s (%(eid)s)')
+                msg %= {'rtype': _(rtype), 'etype': _(etype), 'eid': eid}
+                raise ValidationError(eid, {role_name(rtype, self.role): msg})
 
 
 class _CheckSRelationOp(_CheckRequiredRelationOperation):
     """check required subject relation"""
     role = 'subject'
-    def _rql(self):
-        return 'Any O WHERE S eid %%(x)s, S %s O' % self.rtype, {'x': self.eid}, 'x'
-
+    key = '_cwisrel'
+    base_rql = 'Any O WHERE S eid %%(x)s, S %s O'
 
 class _CheckORelationOp(_CheckRequiredRelationOperation):
     """check required object relation"""
     role = 'object'
-    def _rql(self):
-        return 'Any S WHERE O eid %%(x)s, S %s O' % self.rtype, {'x': self.eid}, 'x'
+    key = '_cwiorel'
+    base_rql = 'Any S WHERE O eid %%(x)s, S %s O'
 
 
 class IntegrityHook(hook.Hook):
@@ -112,14 +112,6 @@
     def __call__(self):
         getattr(self, self.event)()
 
-    def checkrel_if_necessary(self, opcls, rtype, eid):
-        """check an equivalent operation has not already been added"""
-        for op in self._cw.pending_operations:
-            if isinstance(op, opcls) and op.rtype == rtype and op.eid == eid:
-                break
-        else:
-            opcls(self._cw, rtype=rtype, eid=eid)
-
     def after_add_entity(self):
         eid = self.entity.eid
         eschema = self.entity.e_schema
@@ -127,10 +119,14 @@
             # skip automatically handled relations
             if rschema.type in DONT_CHECK_RTYPES_ON_ADD:
                 continue
-            opcls = role == 'subject' and _CheckSRelationOp or _CheckORelationOp
             rdef = rschema.role_rdef(eschema, targetschemas[0], role)
             if rdef.role_cardinality(role) in '1+':
-                self.checkrel_if_necessary(opcls, rschema.type, eid)
+                if role == 'subject':
+                    set_operation(self._cw, '_cwisrel', (eid, rschema.type),
+                                  _CheckSRelationOp)
+                else:
+                    set_operation(self._cw, '_cwiorel', (eid, rschema.type),
+                                  _CheckORelationOp)
 
     def before_delete_relation(self):
         rtype = self.rtype
@@ -138,14 +134,16 @@
             return
         session = self._cw
         eidfrom, eidto = self.eidfrom, self.eidto
-        card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality')
         pendingrdefs = session.transaction_data.get('pendingrdefs', ())
         if (session.describe(eidfrom)[0], rtype, session.describe(eidto)[0]) in pendingrdefs:
             return
+        card = session.schema_rproperty(rtype, eidfrom, eidto, 'cardinality')
         if card[0] in '1+' and not session.deleted_in_transaction(eidfrom):
-            self.checkrel_if_necessary(_CheckSRelationOp, rtype, eidfrom)
+            set_operation(self._cw, '_cwisrel', (eidfrom, rtype),
+                          _CheckSRelationOp)
         if card[1] in '1+' and not session.deleted_in_transaction(eidto):
-            self.checkrel_if_necessary(_CheckORelationOp, rtype, eidto)
+            set_operation(self._cw, '_cwiorel', (eidto, rtype),
+                          _CheckORelationOp)
 
 
 class _CheckConstraintsOp(hook.LateOperation):
@@ -291,19 +289,32 @@
 # not really integrity check, they maintain consistency on changes
 
 class _DelayedDeleteOp(hook.Operation):
-    """delete the object of composite relation except if the relation
-    has actually been redirected to another composite
+    """delete the object of composite relation except if the relation has
+    actually been redirected to another composite
     """
+    key = base_rql = None
 
     def precommit_event(self):
         session = self.session
-        # don't do anything if the entity is being created or deleted
-        if not (session.deleted_in_transaction(self.eid) or
-                session.added_in_transaction(self.eid)):
-            etype = session.describe(self.eid)[0]
-            session.execute('DELETE %s X WHERE X eid %%(x)s, NOT %s'
-                            % (etype, self.relation),
-                            {'x': self.eid}, 'x')
+        pendingeids = session.transaction_data.get('pendingeids', ())
+        neweids = session.transaction_data.get('neweids', ())
+        # poping key is not optional: if further operation trigger new deletion
+        # of composite relation, we'll need a new operation
+        for eid, rtype in session.transaction_data.pop(self.key):
+            # don't do anything if the entity is being created or deleted
+            if not (eid in pendingeids or eid in neweids):
+                etype = session.describe(eid)[0]
+                session.execute(self.base_rql % (etype, rtype), {'x': eid}, 'x')
+
+class _DelayedDeleteSEntityOp(_DelayedDeleteOp):
+    """delete orphan subject entity of a composite relation"""
+    key = '_cwiscomp'
+    base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT X %s Y'
+
+class _DelayedDeleteOEntityOp(_DelayedDeleteOp):
+    """check required object relation"""
+    key = '_cwiocomp'
+    base_rql = 'DELETE %s X WHERE X eid %%(x)s, NOT Y %s X'
 
 
 class DeleteCompositeOrphanHook(hook.Hook):
@@ -323,8 +334,8 @@
         composite = self._cw.schema_rproperty(self.rtype, self.eidfrom, self.eidto,
                                               'composite')
         if composite == 'subject':
-            _DelayedDeleteOp(self._cw, eid=self.eidto,
-                             relation='Y %s X' % self.rtype)
+            set_operation(self._cw, '_cwiocomp', (self.eidto, self.rtype),
+                          _DelayedDeleteOEntityOp)
         elif composite == 'object':
-            _DelayedDeleteOp(self._cw, eid=self.eidfrom,
-                             relation='X %s Y' % self.rtype)
+            set_operation(self._cw, '_cwiscomp', (self.eidfrom, self.rtype),
+                          _DelayedDeleteSEntityOp)
--- a/server/hook.py	Mon Mar 29 13:28:41 2010 +0200
+++ b/server/hook.py	Mon Mar 29 13:34:06 2010 +0200
@@ -450,6 +450,25 @@
 set_log_methods(Operation, getLogger('cubicweb.session'))
 
 
+def set_operation(session, datakey, value, opcls, **opkwargs):
+    """Search for session.transaction_data[`datakey`] (expected to be a set):
+
+    * if found, simply append `value`
+
+    * else, initialize it to set([`value`]) and instantiate the given `opcls`
+      operation class with additional keyword arguments.
+
+    You should use this instead of creating on operation for each `value`,
+    since handling operations becomes coslty on massive data import.
+    """
+    try:
+        session.transaction_data[datakey].add(value)
+    except KeyError:
+        print 'init', datakey
+        opcls(session, *opkwargs)
+        session.transaction_data[datakey] = set((value,))
+
+
 class LateOperation(Operation):
     """special operation which should be called after all possible (ie non late)
     operations