[dataimport] implement new store API on massive store
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Wed, 21 Oct 2015 16:32:11 +0200
changeset 10863 8e1f6de61300
parent 10862 7357b1485795
child 10864 b7f4acf0473b
[dataimport] implement new store API on massive store and deprecate the old one. Related to #5414760
dataimport/massive_store.py
dataimport/test/test_massive_store.py
--- a/dataimport/massive_store.py	Wed Oct 21 16:31:05 2015 +0200
+++ b/dataimport/massive_store.py	Wed Oct 21 16:32:11 2015 +0200
@@ -24,6 +24,7 @@
 
 from six.moves import range
 
+from logilab.common.deprecation import deprecated
 from yams.constraints import SizeConstraint
 
 from psycopg2 import ProgrammingError
@@ -156,7 +157,7 @@
                 'entities_id_seq', initial_value=self._eids_seq_start + 1))
             cnx.commit()
         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
-        # recreate then when self.cleanup() is called
+        # recreate then when self.finish() is called
         if not self.slave_mode and self.drop_index:
             self._drop_metatables_constraints()
         if source is None:
@@ -437,8 +438,11 @@
         kwargs.update((key, default_values[key]) for key in missing_keys)
         return kwargs
 
-    def create_entity(self, etype, **kwargs):
-        """ Create an entity
+    # store api ################################################################
+
+    def prepare_insert_entity(self, etype, **kwargs):
+        """Given an entity type, attributes and inlined relations, returns the inserted entity's
+        eid.
         """
         # Init the table if necessary
         self.init_etype_table(etype)
@@ -461,22 +465,85 @@
         kwargs = self.apply_size_constraints(etype, kwargs)
         # Apply default values
         kwargs = self.apply_default_values(etype, kwargs)
-        # Push data / Return entity
+        # Push data
         self._data_entities[etype].append(kwargs)
-        entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
-        entity.cw_attr_cache.update(kwargs)
-        if 'eid' in kwargs:
-            entity.eid = kwargs['eid']
-        return entity
+        # Return eid
+        return kwargs.get('eid')
 
-    ### RELATIONS CREATION ####################################################
-
-    def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
-        """ Compatibility with other stores
+    def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
+        """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
+        and ``eid_to``.
         """
         # Init the table if necessary
         self.init_relation_table(rtype)
-        self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid})
+        self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
+
+    def flush(self):
+        """Flush the data"""
+        self.flush_entities()
+        self.flush_internal_relations()
+        self.flush_relations()
+
+    def commit(self):
+        """Commit the database transaction."""
+        self.on_commit()
+        super(MassiveObjectStore, self).commit()
+
+    def finish(self):
+        """Remove temporary tables and columns."""
+        self.logger.info("Start cleaning")
+        if self.slave_mode:
+            raise RuntimeError('Store cleanup is not allowed in slave mode')
+        self.logger.info("Start cleaning")
+        # Cleanup relations tables
+        for etype in self._initialized['init_uri_eid']:
+            self.sql('DROP TABLE uri_eid_%s' % etype.lower())
+        # Remove relations tables
+        for rtype in self._initialized['uri_rtypes']:
+            if not self._cnx.repo.schema.rschema(rtype).inlined:
+                self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
+            else:
+                self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
+        self.commit()
+        # Get all the initialized etypes/rtypes
+        if self._dbh.table_exists('dataio_initialized'):
+            crs = self.sql('SELECT retype, type FROM dataio_initialized')
+            for retype, _type in crs.fetchall():
+                self.logger.info('Cleanup for %s' % retype)
+                if _type == 'etype':
+                    # Cleanup entities tables - Recreate indexes
+                    self._cleanup_entities(retype)
+                elif _type == 'rtype':
+                    # Cleanup relations tables
+                    self._cleanup_relations(retype)
+                self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
+                         {'e': retype})
+        # Create meta constraints (entities, is_instance_of, ...)
+        self._create_metatables_constraints()
+        self.commit()
+        # Delete the meta data table
+        for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
+            if self._dbh.table_exists(table_name):
+                self.sql('DROP TABLE %s' % table_name)
+        self.commit()
+
+    @deprecated('[3.22] use prepare_insert_entity instead')
+    def create_entity(self, etype, **kwargs):
+        """ Create an entity
+        """
+        eid = self.prepare_insert_entity(etype, **kwargs)
+        entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
+        entity.cw_attr_cache.update(kwargs)
+        entity.eid = eid
+        return entity
+
+    @deprecated('[3.22] use prepare_insert_relation instead')
+    def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
+        self.prepare_insert_relation(subj_eid, rtype, obj_eid, *args, **kwargs)
+
+    @deprecated('[3.22] use finish instead')
+    def cleanup(self):
+        self.finish()
 
 
     ### FLUSH #################################################################
@@ -492,17 +559,6 @@
         else:
             raise exc
 
-    def commit(self):
-        self.on_commit()
-        super(MassiveObjectStore, self).commit()
-
-    def flush(self):
-        """ Flush the data
-        """
-        self.flush_entities()
-        self.flush_internal_relations()
-        self.flush_relations()
-
     def flush_internal_relations(self):
         """ Flush the relations data
         """
@@ -621,45 +677,6 @@
             tablename = '%s_relation' % rtype.lower()
             self.reapply_constraint_index(tablename)
 
-    def cleanup(self):
-        """ Remove temporary tables and columns
-        """
-        self.logger.info("Start cleaning")
-        if self.slave_mode:
-            raise RuntimeError('Store cleanup is not allowed in slave mode')
-        self.logger.info("Start cleaning")
-        # Cleanup relations tables
-        for etype in self._initialized['init_uri_eid']:
-            self.sql('DROP TABLE uri_eid_%s' % etype.lower())
-        # Remove relations tables
-        for rtype in self._initialized['uri_rtypes']:
-            if not self._cnx.repo.schema.rschema(rtype).inlined:
-                self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
-            else:
-                self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
-        self.commit()
-        # Get all the initialized etypes/rtypes
-        if self._dbh.table_exists('dataio_initialized'):
-            crs = self.sql('SELECT retype, type FROM dataio_initialized')
-            for retype, _type in crs.fetchall():
-                self.logger.info('Cleanup for %s' % retype)
-                if _type == 'etype':
-                    # Cleanup entities tables - Recreate indexes
-                    self._cleanup_entities(retype)
-                elif _type == 'rtype':
-                    # Cleanup relations tables
-                    self._cleanup_relations(retype)
-                self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
-                         {'e': retype})
-        # Create meta constraints (entities, is_instance_of, ...)
-        self._create_metatables_constraints()
-        self.commit()
-        # Delete the meta data table
-        for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
-            if self._dbh.table_exists(table_name):
-                self.sql('DROP TABLE %s' % table_name)
-        self.commit()
-
     def insert_massive_meta_data(self, etype):
         """ Massive insertion of meta data for a given etype, based on SQL statements.
         """
--- a/dataimport/test/test_massive_store.py	Wed Oct 21 16:31:05 2015 +0200
+++ b/dataimport/test/test_massive_store.py	Wed Oct 21 16:32:11 2015 +0200
@@ -79,7 +79,7 @@
                       'cwuri':  u'http://sws.geonames.org/%s/' % int(infos[0]),
                       'geonameid': int(infos[0]),
                       }
-            store.create_entity('Location', **entity)
+            store.prepare_insert_entity('Location', **entity)
 
     def test_autoflush_metadata(self):
         with self.admin_access.repo_cnx() as cnx:
@@ -87,7 +87,7 @@
                                  {'t': 'Location'})
             self.assertEqual(len(crs.fetchall()), 0)
             store = MassiveObjectStore(cnx, autoflush_metadata=True)
-            store.create_entity('Location', name=u'toto')
+            store.prepare_insert_entity('Location', name=u'toto')
             store.flush()
             store.commit()
             store.cleanup()
@@ -104,7 +104,7 @@
 #            self.assertEqual(len(crs.fetchall()), 0)
 #        with self.admin_access.repo_cnx() as cnx:
 #            store = MassiveObjectStore(cnx, autoflush_metadata=False)
-#            store.create_entity('Location', name=u'toto')
+#            store.prepare_insert_entity('Location', name=u'toto')
 #            store.flush()
 #            store.commit()
 #            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
@@ -119,8 +119,8 @@
     def test_massimport_etype_metadata(self):
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx)
-            timezone = store.create_entity('TimeZone')
-            store.create_entity('Location', timezone=timezone.eid)
+            timezone_eid = store.prepare_insert_entity('TimeZone')
+            store.prepare_insert_entity('Location', timezone=timezone_eid)
             store.flush()
             store.commit()
             eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
@@ -167,7 +167,7 @@
     def test_eids_seq_range(self):
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
-            store.create_entity('Location', name=u'toto')
+            store.prepare_insert_entity('Location', name=u'toto')
             store.flush()
             cnx.commit()
         with self.admin_access.repo_cnx() as cnx:
@@ -177,23 +177,22 @@
     def test_eid_entity(self):
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
-            entity = store.create_entity('Location', name=u'toto')
+            eid = store.prepare_insert_entity('Location', name=u'toto')
             store.flush()
-            self.assertGreater(entity.eid, 50000)
+            self.assertGreater(eid, 50000)
 
     def test_eid_entity_2(self):
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
-            entity = store.create_entity('Location', name=u'toto', eid=10000)
+            eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000)
             store.flush()
-        with self.admin_access.repo_cnx() as cnx:
-            self.assertEqual(entity.eid, 10000)
+        self.assertEqual(eid, 10000)
 
     def test_on_commit_callback(self):
         counter = itertools.count()
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter))
-            store.create_entity('Location', name=u'toto')
+            store.prepare_insert_entity('Location', name=u'toto')
             store.flush()
             store.commit()
         self.assertGreaterEqual(next(counter), 1)
@@ -202,7 +201,7 @@
         counter = itertools.count()
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
-            store.create_entity('Location', nm='toto')
+            store.prepare_insert_entity('Location', nm='toto')
             store.flush()
             store.commit()
         self.assertGreaterEqual(next(counter), 1)