[massive store] Reintroduce methods that are necessary to properly handle master/slave configuration
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 30 Sep 2016 17:34:59 +0200
changeset 11781 4ebd968f364c
parent 11780 307d96c0ab5a
child 11782 056004c17c71
[massive store] Reintroduce methods that are necessary to properly handle master/slave configuration Related to #15538303
cubicweb/dataimport/massive_store.py
cubicweb/dataimport/test/test_massive_store.py
--- a/cubicweb/dataimport/massive_store.py	Wed Sep 28 09:02:14 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py	Fri Sep 30 17:34:59 2016 +0200
@@ -106,6 +106,61 @@
             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
                 yield eid
 
+    # master/slaves specific API
+
+    def master_init_etype(self, etype):
+        """Initialize database for insertion of entities of the given etype.
+
+        This is expected to be called once, usually by the master store in master/slaves
+        configuration.
+        """
+        self._drop_metadata_constraints_if_necessary()
+        tablename = 'cw_%s' % etype.lower()
+        self._dbh.drop_constraints(tablename)
+        self._dbh.drop_indexes(tablename)
+        self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
+                 '(retype text, type varchar(128))')
+        self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
+        self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename)
+
+    def master_init_rtype(self, rtype):
+        """Initialize database for insertion of relation of the given rtype.
+
+        This is expected to be called once, usually by the master store in master/slaves
+        configuration.
+        """
+        assert not self._cnx.vreg.schema.rschema(rtype).inlined
+        self._drop_metadata_constraints_if_necessary()
+        tablename = '%s_relation' % rtype.lower()
+        self._dbh.drop_constraints(tablename)
+        self._dbh.drop_indexes(tablename)
+        self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
+                 '(retype text, type varchar(128))')
+        self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
+        self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename)
+
+    def master_insert_etype_metadata(self, etype):
+        """Massive insertion of meta data for a given etype, based on SQL statements.
+
+        In master/slabes configuration, you'll usually want to call it from the master once all
+        slaves have finished (at least slaves won't call it automatically, so that's your
+        reponsability).
+        """
+        # insert standard metadata relations
+        for rtype, eid in self.metagen.base_etype_rels(etype).items():
+            self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
+        # insert cw_source, is and is_instance_of relations (normally handled by the system source)
+        self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
+        eschema = self.schema[etype]
+        self._insert_meta_relation(etype, eschema.eid, 'is_relation')
+        for parent_eschema in chain(eschema.ancestors(), [eschema]):
+            self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
+        # finally insert records into the entities table
+        self.sql("INSERT INTO entities (eid, type, extid) "
+                 "SELECT cw_eid, '%s', extid FROM cw_%s "
+                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
+                 % (etype, etype.lower()))
+
     # SQL utilities #########################################################
 
     def _drop_metadata_constraints_if_necessary(self):
@@ -142,13 +197,7 @@
         """
         if not self.slave_mode and etype not in self._initialized:
             self._initialized.add(etype)
-            self._drop_metadata_constraints_if_necessary()
-            tablename = 'cw_%s' % etype.lower()
-            self._dbh.drop_constraints(tablename)
-            self._dbh.drop_indexes(tablename)
-            self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
-                     '(retype text, type varchar(128))')
-            self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
+            self.master_init_etype(etype)
         attrs = self.metagen.base_etype_attrs(etype)
         data = copy(attrs)  # base_etype_attrs is @cached, a copy is necessary
         data.update(kwargs)
@@ -171,17 +220,8 @@
         Relation must not be inlined.
         """
         if not self.slave_mode and rtype not in self._initialized:
-            assert not self._cnx.vreg.schema.rschema(rtype).inlined
             self._initialized.add(rtype)
-            self._drop_metadata_constraints_if_necessary()
-            tablename = '%s_relation' % rtype.lower()
-            self._dbh.drop_constraints(tablename)
-            self._dbh.drop_indexes(tablename)
-            self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)'
-                     % tablename)
-            self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
-                     '(retype text, type varchar(128))')
-            self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
+            self.master_init_rtype(rtype)
         self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
 
     def flush(self):
@@ -276,7 +316,7 @@
             # Clear data cache
             self._data_entities[etype] = []
             if not self.slave_mode:
-                self._insert_etype_metadata(etype)
+                self.master_insert_etype_metadata(etype)
 
     def _cleanup_relations(self, rtype):
         """ Cleanup rtype table """
@@ -288,24 +328,6 @@
         # Drop temporary relation table
         self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
 
-    def _insert_etype_metadata(self, etype):
-        """Massive insertion of meta data for a given etype, based on SQL statements.
-        """
-        # insert standard metadata relations
-        for rtype, eid in self.metagen.base_etype_rels(etype).items():
-            self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
-        # insert cw_source, is and is_instance_of relations (normally handled by the system source)
-        self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
-        eschema = self.schema[etype]
-        self._insert_meta_relation(etype, eschema.eid, 'is_relation')
-        for parent_eschema in chain(eschema.ancestors(), [eschema]):
-            self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
-        # finally insert records into the entities table
-        self.sql("INSERT INTO entities (eid, type) "
-                 "SELECT cw_eid, '%s' FROM cw_%s "
-                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
-                 % (etype, etype.lower()))
-
     def _insert_meta_relation(self, etype, eid_to, rtype):
         self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
--- a/cubicweb/dataimport/test/test_massive_store.py	Wed Sep 28 09:02:14 2016 +0200
+++ b/cubicweb/dataimport/test/test_massive_store.py	Fri Sep 30 17:34:59 2016 +0200
@@ -138,7 +138,7 @@
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx)
 
-            store._drop_constraints()
+            store._drop_metadata_constraints()
             indexes = all_indexes(cnx)
             self.assertIn('entities_pkey', indexes)
             self.assertNotIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
@@ -159,8 +159,8 @@
         with self.admin_access.repo_cnx() as cnx:
             metagen = stores.MetadataGenerator(cnx, meta_skipped=('owned_by',))
             store = MassiveObjectStore(cnx, metagen=metagen)
+            store._drop_metadata_constraints()
 
-            store._drop_constraints()
             indexes = all_indexes(cnx)
             self.assertIn(build_index_name('owned_by_relation', ['eid_from', 'eid_to'], 'key_'),
                           indexes)