cubicweb/dataimport/massive_store.py
changeset 11784 c1aa50a88de3
parent 11783 8865c9e55575
child 11785 0cea67f41d0c
--- a/cubicweb/dataimport/massive_store.py	Tue Oct 04 13:14:35 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py	Wed Oct 05 09:38:05 2016 +0200
@@ -17,12 +17,14 @@
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 
-import logging
+from base64 import b64encode
 from copy import copy
 from collections import defaultdict
 from itertools import chain
-from base64 import b64encode
+import logging
+from uuid import uuid4
 
+from six import text_type
 from six.moves import range
 
 from cubicweb.dataimport import stores, pgstore
@@ -72,6 +74,8 @@
         - `eids_seq_range`: size of eid range reserved by the store for each batch
         """
         super(MassiveObjectStore, self).__init__(cnx)
+
+        self.uuid = text_type(uuid4()).replace('-', '')
         self.on_commit_callback = on_commit_callback
         self.on_rollback_callback = on_rollback_callback
         self.slave_mode = slave_mode
@@ -103,6 +107,17 @@
 
     # master/slaves specific API
 
+    def master_init(self):
+        """Initialize database for massive insertion.
+
+        This is expected to be called once, by the master store in master/slaves configuration.
+        """
+        assert not self.slave_mode
+        if self not in self._initialized:
+            self.sql('CREATE TABLE cwmassive_initialized'
+                     '(retype text, type varchar(128), uuid varchar(32))')
+            self._initialized.append(self)
+
     def master_init_etype(self, etype):
         """Initialize database for insertion of entities of the given etype.
 
@@ -114,25 +129,9 @@
         self._dbh.drop_constraints(tablename)
         self._dbh.drop_indexes(tablename)
         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
-                 '(retype text, type varchar(128))')
-        self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
-        self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename)
-
-    def master_init_rtype(self, rtype):
-        """Initialize database for insertion of relation of the given rtype.
-
-        This is expected to be called once, usually by the master store in master/slaves
-        configuration.
-        """
-        assert not self._cnx.vreg.schema.rschema(rtype).inlined
-        self._drop_metadata_constraints_if_necessary()
-        tablename = '%s_relation' % rtype.lower()
-        self._dbh.drop_constraints(tablename)
-        self._dbh.drop_indexes(tablename)
-        self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
-                 '(retype text, type varchar(128))')
-        self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
-        self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename)
+                 '(retype text, type varchar(128), uuid varchar(32))')
+        self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
+                 {'e': etype, 'uuid': self.uuid})
 
     def master_insert_etype_metadata(self, etype):
         """Massive insertion of meta data for a given etype, based on SQL statements.
@@ -214,9 +213,16 @@
 
         Relation must not be inlined.
         """
-        if not self.slave_mode and rtype not in self._initialized:
+        if rtype not in self._initialized:
+            if not self.slave_mode:
+                self.master_init()
+            assert not self._cnx.vreg.schema.rschema(rtype).inlined
             self._initialized.add(rtype)
-            self.master_init_rtype(rtype)
+            tablename = '%s_relation' % rtype.lower()
+            tmp_tablename = '%s_%s' % (tablename, self.uuid)
+            self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
+                     {'r': rtype, 'uuid': self.uuid})
+            self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename)
         self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
 
     def flush(self):
@@ -236,14 +242,29 @@
         self.logger.info("Start cleaning")
         # Get all the initialized etypes/rtypes
         if self._dbh.table_exists('cwmassive_initialized'):
-            cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
-            for retype, _type in cu.fetchall():
-                self.logger.info('Cleanup for %s' % retype)
+            cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
+            relations = defaultdict(list)
+            for retype, _type, uuid in cu.fetchall():
                 if _type == 'rtype':
-                    # Cleanup relations tables
-                    self._cleanup_relations(retype)
-                self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
-                         {'e': retype})
+                    relations[retype].append(uuid)
+            # get back relation data from the temporary tables
+            for rtype, uuids in relations.items():
+                tablename = '%s_relation' % rtype.lower()
+                self._dbh.drop_constraints(tablename)
+                self._dbh.drop_indexes(tablename)
+                for uuid in uuids:
+                    tmp_tablename = '%s_%s' % (tablename, uuid)
+                    # XXX no index on the original relation table, EXISTS subquery may be sloooow
+                    self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT '
+                             'T.eid_from, T.eid_to FROM %(tmp_table)s AS T '
+                             'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
+                             'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
+                             % {'table': tablename, 'tmp_table': tmp_tablename})
+                    # Drop temporary relation table and record from cwmassive_initialized
+                    self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
+                    self.sql('DELETE FROM cwmassive_initialized '
+                             'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
+                             {'rtype': retype, 'uuid': uuid})
         self._dbh.restore_indexes_and_constraints()
         # Delete the meta data table
         self.sql('DROP TABLE IF EXISTS cwmassive_initialized')
@@ -274,8 +295,9 @@
                 raise ValueError
             cursor = self._cnx.cnxset.cu
             # Push into the tmp table
-            cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
-                             null='NULL', columns=('eid_from', 'eid_to'))
+            tablename = '%s_relation' % rtype.lower()
+            tmp_tablename = '%s_%s' % (tablename, self.uuid)
+            cursor.copy_from(buf, tmp_tablename, null='NULL', columns=('eid_from', 'eid_to'))
             # Clear data cache
             self._data_relations[rtype] = []
 
@@ -313,16 +335,6 @@
             if not self.slave_mode:
                 self.master_insert_etype_metadata(etype)
 
-    def _cleanup_relations(self, rtype):
-        """ Cleanup rtype table """
-        # Push into relation table while removing duplicate
-        self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT '
-                 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T '
-                 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE '
-                 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype})
-        # Drop temporary relation table
-        self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
-
     def _insert_meta_relation(self, etype, eid_to, rtype):
         self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"