[dataimport] implement new store API on massive store
and deprecate the old one.
Related to #5414760
--- a/dataimport/massive_store.py Wed Oct 21 16:31:05 2015 +0200
+++ b/dataimport/massive_store.py Wed Oct 21 16:32:11 2015 +0200
@@ -24,6 +24,7 @@
from six.moves import range
+from logilab.common.deprecation import deprecated
from yams.constraints import SizeConstraint
from psycopg2 import ProgrammingError
@@ -156,7 +157,7 @@
'entities_id_seq', initial_value=self._eids_seq_start + 1))
cnx.commit()
self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
- # recreate then when self.cleanup() is called
+ # recreate then when self.finish() is called
if not self.slave_mode and self.drop_index:
self._drop_metatables_constraints()
if source is None:
@@ -437,8 +438,11 @@
kwargs.update((key, default_values[key]) for key in missing_keys)
return kwargs
- def create_entity(self, etype, **kwargs):
- """ Create an entity
+ # store api ################################################################
+
+ def prepare_insert_entity(self, etype, **kwargs):
+ """Given an entity type, attributes and inlined relations, returns the inserted entity's
+ eid.
"""
# Init the table if necessary
self.init_etype_table(etype)
@@ -461,22 +465,85 @@
kwargs = self.apply_size_constraints(etype, kwargs)
# Apply default values
kwargs = self.apply_default_values(etype, kwargs)
- # Push data / Return entity
+ # Push data
self._data_entities[etype].append(kwargs)
- entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
- entity.cw_attr_cache.update(kwargs)
- if 'eid' in kwargs:
- entity.eid = kwargs['eid']
- return entity
+ # Return eid
+ return kwargs.get('eid')
- ### RELATIONS CREATION ####################################################
-
- def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
- """ Compatibility with other stores
+ def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
+ """Insert into the database a relation ``rtype`` between entities with eids ``eid_from``
+ and ``eid_to``.
"""
# Init the table if necessary
self.init_relation_table(rtype)
- self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid})
+ self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
+
+ def flush(self):
+ """Flush the data"""
+ self.flush_entities()
+ self.flush_internal_relations()
+ self.flush_relations()
+
+ def commit(self):
+ """Commit the database transaction."""
+ self.on_commit()
+ super(MassiveObjectStore, self).commit()
+
+ def finish(self):
+ """Remove temporary tables and columns."""
+ self.logger.info("Start cleaning")
+ if self.slave_mode:
+ raise RuntimeError('Store cleanup is not allowed in slave mode')
+ self.logger.info("Start cleaning")
+ # Cleanup relations tables
+ for etype in self._initialized['init_uri_eid']:
+ self.sql('DROP TABLE uri_eid_%s' % etype.lower())
+ # Remove relations tables
+ for rtype in self._initialized['uri_rtypes']:
+ if not self._cnx.repo.schema.rschema(rtype).inlined:
+ self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
+ else:
+ self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
+ self.commit()
+ # Get all the initialized etypes/rtypes
+ if self._dbh.table_exists('dataio_initialized'):
+ crs = self.sql('SELECT retype, type FROM dataio_initialized')
+ for retype, _type in crs.fetchall():
+ self.logger.info('Cleanup for %s' % retype)
+ if _type == 'etype':
+ # Cleanup entities tables - Recreate indexes
+ self._cleanup_entities(retype)
+ elif _type == 'rtype':
+ # Cleanup relations tables
+ self._cleanup_relations(retype)
+ self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
+ {'e': retype})
+ # Create meta constraints (entities, is_instance_of, ...)
+ self._create_metatables_constraints()
+ self.commit()
+ # Delete the meta data table
+ for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
+ if self._dbh.table_exists(table_name):
+ self.sql('DROP TABLE %s' % table_name)
+ self.commit()
+
+ @deprecated('[3.22] use prepare_insert_entity instead')
+ def create_entity(self, etype, **kwargs):
+ """ Create an entity
+ """
+ eid = self.prepare_insert_entity(etype, **kwargs)
+ entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
+ entity.cw_attr_cache.update(kwargs)
+ entity.eid = eid
+ return entity
+
+ @deprecated('[3.22] use prepare_insert_relation instead')
+ def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
+ self.prepare_insert_relation(subj_eid, rtype, obj_eid, *args, **kwargs)
+
+ @deprecated('[3.22] use finish instead')
+ def cleanup(self):
+ self.finish()
### FLUSH #################################################################
@@ -492,17 +559,6 @@
else:
raise exc
- def commit(self):
- self.on_commit()
- super(MassiveObjectStore, self).commit()
-
- def flush(self):
- """ Flush the data
- """
- self.flush_entities()
- self.flush_internal_relations()
- self.flush_relations()
-
def flush_internal_relations(self):
""" Flush the relations data
"""
@@ -621,45 +677,6 @@
tablename = '%s_relation' % rtype.lower()
self.reapply_constraint_index(tablename)
- def cleanup(self):
- """ Remove temporary tables and columns
- """
- self.logger.info("Start cleaning")
- if self.slave_mode:
- raise RuntimeError('Store cleanup is not allowed in slave mode')
- self.logger.info("Start cleaning")
- # Cleanup relations tables
- for etype in self._initialized['init_uri_eid']:
- self.sql('DROP TABLE uri_eid_%s' % etype.lower())
- # Remove relations tables
- for rtype in self._initialized['uri_rtypes']:
- if not self._cnx.repo.schema.rschema(rtype).inlined:
- self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
- else:
- self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
- self.commit()
- # Get all the initialized etypes/rtypes
- if self._dbh.table_exists('dataio_initialized'):
- crs = self.sql('SELECT retype, type FROM dataio_initialized')
- for retype, _type in crs.fetchall():
- self.logger.info('Cleanup for %s' % retype)
- if _type == 'etype':
- # Cleanup entities tables - Recreate indexes
- self._cleanup_entities(retype)
- elif _type == 'rtype':
- # Cleanup relations tables
- self._cleanup_relations(retype)
- self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
- {'e': retype})
- # Create meta constraints (entities, is_instance_of, ...)
- self._create_metatables_constraints()
- self.commit()
- # Delete the meta data table
- for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
- if self._dbh.table_exists(table_name):
- self.sql('DROP TABLE %s' % table_name)
- self.commit()
-
def insert_massive_meta_data(self, etype):
""" Massive insertion of meta data for a given etype, based on SQL statements.
"""
--- a/dataimport/test/test_massive_store.py Wed Oct 21 16:31:05 2015 +0200
+++ b/dataimport/test/test_massive_store.py Wed Oct 21 16:32:11 2015 +0200
@@ -79,7 +79,7 @@
'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]),
'geonameid': int(infos[0]),
}
- store.create_entity('Location', **entity)
+ store.prepare_insert_entity('Location', **entity)
def test_autoflush_metadata(self):
with self.admin_access.repo_cnx() as cnx:
@@ -87,7 +87,7 @@
{'t': 'Location'})
self.assertEqual(len(crs.fetchall()), 0)
store = MassiveObjectStore(cnx, autoflush_metadata=True)
- store.create_entity('Location', name=u'toto')
+ store.prepare_insert_entity('Location', name=u'toto')
store.flush()
store.commit()
store.cleanup()
@@ -104,7 +104,7 @@
# self.assertEqual(len(crs.fetchall()), 0)
# with self.admin_access.repo_cnx() as cnx:
# store = MassiveObjectStore(cnx, autoflush_metadata=False)
-# store.create_entity('Location', name=u'toto')
+# store.prepare_insert_entity('Location', name=u'toto')
# store.flush()
# store.commit()
# crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
@@ -119,8 +119,8 @@
def test_massimport_etype_metadata(self):
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx)
- timezone = store.create_entity('TimeZone')
- store.create_entity('Location', timezone=timezone.eid)
+ timezone_eid = store.prepare_insert_entity('TimeZone')
+ store.prepare_insert_entity('Location', timezone=timezone_eid)
store.flush()
store.commit()
eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
@@ -167,7 +167,7 @@
def test_eids_seq_range(self):
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
- store.create_entity('Location', name=u'toto')
+ store.prepare_insert_entity('Location', name=u'toto')
store.flush()
cnx.commit()
with self.admin_access.repo_cnx() as cnx:
@@ -177,23 +177,22 @@
def test_eid_entity(self):
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
- entity = store.create_entity('Location', name=u'toto')
+ eid = store.prepare_insert_entity('Location', name=u'toto')
store.flush()
- self.assertGreater(entity.eid, 50000)
+ self.assertGreater(eid, 50000)
def test_eid_entity_2(self):
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
- entity = store.create_entity('Location', name=u'toto', eid=10000)
+ eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000)
store.flush()
- with self.admin_access.repo_cnx() as cnx:
- self.assertEqual(entity.eid, 10000)
+ self.assertEqual(eid, 10000)
def test_on_commit_callback(self):
counter = itertools.count()
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter))
- store.create_entity('Location', name=u'toto')
+ store.prepare_insert_entity('Location', name=u'toto')
store.flush()
store.commit()
self.assertGreaterEqual(next(counter), 1)
@@ -202,7 +201,7 @@
counter = itertools.count()
with self.admin_access.repo_cnx() as cnx:
store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
- store.create_entity('Location', nm='toto')
+ store.prepare_insert_entity('Location', nm='toto')
store.flush()
store.commit()
self.assertGreaterEqual(next(counter), 1)