--- a/cubicweb/dataimport/massive_store.py Tue Oct 04 13:14:35 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py Wed Oct 05 09:38:05 2016 +0200
@@ -17,12 +17,14 @@
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-import logging
+from base64 import b64encode
from copy import copy
from collections import defaultdict
from itertools import chain
-from base64 import b64encode
+import logging
+from uuid import uuid4
+from six import text_type
from six.moves import range
from cubicweb.dataimport import stores, pgstore
@@ -72,6 +74,8 @@
- `eids_seq_range`: size of eid range reserved by the store for each batch
"""
super(MassiveObjectStore, self).__init__(cnx)
+
+ self.uuid = text_type(uuid4()).replace('-', '')
self.on_commit_callback = on_commit_callback
self.on_rollback_callback = on_rollback_callback
self.slave_mode = slave_mode
@@ -103,6 +107,17 @@
# master/slaves specific API
+ def master_init(self):
+ """Initialize database for massive insertion.
+
+ This is expected to be called once, by the master store in master/slaves configuration.
+ """
+ assert not self.slave_mode
+ if self not in self._initialized:
+ self.sql('CREATE TABLE cwmassive_initialized'
+ '(retype text, type varchar(128), uuid varchar(32))')
+ self._initialized.append(self)
+
def master_init_etype(self, etype):
"""Initialize database for insertion of entities of the given etype.
@@ -114,25 +129,9 @@
self._dbh.drop_constraints(tablename)
self._dbh.drop_indexes(tablename)
self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
- '(retype text, type varchar(128))')
- self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
- self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename)
-
- def master_init_rtype(self, rtype):
- """Initialize database for insertion of relation of the given rtype.
-
- This is expected to be called once, usually by the master store in master/slaves
- configuration.
- """
- assert not self._cnx.vreg.schema.rschema(rtype).inlined
- self._drop_metadata_constraints_if_necessary()
- tablename = '%s_relation' % rtype.lower()
- self._dbh.drop_constraints(tablename)
- self._dbh.drop_indexes(tablename)
- self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
- '(retype text, type varchar(128))')
- self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
- self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename)
+ '(retype text, type varchar(128), uuid varchar(32))')
+ self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
+ {'e': etype, 'uuid': self.uuid})
def master_insert_etype_metadata(self, etype):
"""Massive insertion of meta data for a given etype, based on SQL statements.
@@ -214,9 +213,16 @@
Relation must not be inlined.
"""
- if not self.slave_mode and rtype not in self._initialized:
+ if rtype not in self._initialized:
+ if not self.slave_mode:
+ self.master_init()
+ assert not self._cnx.vreg.schema.rschema(rtype).inlined
self._initialized.add(rtype)
- self.master_init_rtype(rtype)
+ tablename = '%s_relation' % rtype.lower()
+ tmp_tablename = '%s_%s' % (tablename, self.uuid)
+ self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
+ {'r': rtype, 'uuid': self.uuid})
+ self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename)
self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
def flush(self):
@@ -236,14 +242,29 @@
self.logger.info("Start cleaning")
# Get all the initialized etypes/rtypes
if self._dbh.table_exists('cwmassive_initialized'):
- cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
- for retype, _type in cu.fetchall():
- self.logger.info('Cleanup for %s' % retype)
+ cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
+ relations = defaultdict(list)
+ for retype, _type, uuid in cu.fetchall():
if _type == 'rtype':
- # Cleanup relations tables
- self._cleanup_relations(retype)
- self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
- {'e': retype})
+ relations[retype].append(uuid)
+ # get back relation data from the temporary tables
+ for rtype, uuids in relations.items():
+ tablename = '%s_relation' % rtype.lower()
+ self._dbh.drop_constraints(tablename)
+ self._dbh.drop_indexes(tablename)
+ for uuid in uuids:
+ tmp_tablename = '%s_%s' % (tablename, uuid)
+ # XXX no index on the original relation table, EXISTS subquery may be sloooow
+ self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT '
+ 'T.eid_from, T.eid_to FROM %(tmp_table)s AS T '
+ 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
+ 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
+ % {'table': tablename, 'tmp_table': tmp_tablename})
+ # Drop temporary relation table and record from cwmassive_initialized
+ self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
+ self.sql('DELETE FROM cwmassive_initialized '
+ 'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
+ {'rtype': retype, 'uuid': uuid})
self._dbh.restore_indexes_and_constraints()
# Delete the meta data table
self.sql('DROP TABLE IF EXISTS cwmassive_initialized')
@@ -274,8 +295,9 @@
raise ValueError
cursor = self._cnx.cnxset.cu
# Push into the tmp table
- cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
- null='NULL', columns=('eid_from', 'eid_to'))
+ tablename = '%s_relation' % rtype.lower()
+ tmp_tablename = '%s_%s' % (tablename, self.uuid)
+ cursor.copy_from(buf, tmp_tablename, null='NULL', columns=('eid_from', 'eid_to'))
# Clear data cache
self._data_relations[rtype] = []
@@ -313,16 +335,6 @@
if not self.slave_mode:
self.master_insert_etype_metadata(etype)
- def _cleanup_relations(self, rtype):
- """ Cleanup rtype table """
- # Push into relation table while removing duplicate
- self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT '
- 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T '
- 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE '
- 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype})
- # Drop temporary relation table
- self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
-
def _insert_meta_relation(self, etype, eid_to, rtype):
self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
"WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"