# HG changeset patch # User Sylvain Thénault # Date 1445437931 -7200 # Node ID 8e1f6de61300b5032a7b615cd8274bb96d2ca173 # Parent 7357b14857951ea0e83ee32a983abd3c7f98129b [dataimport] implement new store API on massive store and deprecate the old one. Related to #5414760 diff -r 7357b1485795 -r 8e1f6de61300 dataimport/massive_store.py --- a/dataimport/massive_store.py Wed Oct 21 16:31:05 2015 +0200 +++ b/dataimport/massive_store.py Wed Oct 21 16:32:11 2015 +0200 @@ -24,6 +24,7 @@ from six.moves import range +from logilab.common.deprecation import deprecated from yams.constraints import SizeConstraint from psycopg2 import ProgrammingError @@ -156,7 +157,7 @@ 'entities_id_seq', initial_value=self._eids_seq_start + 1)) cnx.commit() self.get_next_eid = lambda g=self._get_eid_gen(): next(g) - # recreate then when self.cleanup() is called + # recreate then when self.finish() is called if not self.slave_mode and self.drop_index: self._drop_metatables_constraints() if source is None: @@ -437,8 +438,11 @@ kwargs.update((key, default_values[key]) for key in missing_keys) return kwargs - def create_entity(self, etype, **kwargs): - """ Create an entity + # store api ################################################################ + + def prepare_insert_entity(self, etype, **kwargs): + """Given an entity type, attributes and inlined relations, returns the inserted entity's + eid. """ # Init the table if necessary self.init_etype_table(etype) @@ -461,22 +465,85 @@ kwargs = self.apply_size_constraints(etype, kwargs) # Apply default values kwargs = self.apply_default_values(etype, kwargs) - # Push data / Return entity + # Push data self._data_entities[etype].append(kwargs) - entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx) - entity.cw_attr_cache.update(kwargs) - if 'eid' in kwargs: - entity.eid = kwargs['eid'] - return entity + # Return eid + return kwargs.get('eid') - ### RELATIONS CREATION #################################################### - - def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs): - """ Compatibility with other stores + def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs): + """Insert into the database a relation ``rtype`` between entities with eids ``eid_from`` + and ``eid_to``. """ # Init the table if necessary self.init_relation_table(rtype) - self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid}) + self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to}) + + def flush(self): + """Flush the data""" + self.flush_entities() + self.flush_internal_relations() + self.flush_relations() + + def commit(self): + """Commit the database transaction.""" + self.on_commit() + super(MassiveObjectStore, self).commit() + + def finish(self): + """Remove temporary tables and columns.""" + self.logger.info("Start cleaning") + if self.slave_mode: + raise RuntimeError('Store cleanup is not allowed in slave mode') + self.logger.info("Start cleaning") + # Cleanup relations tables + for etype in self._initialized['init_uri_eid']: + self.sql('DROP TABLE uri_eid_%s' % etype.lower()) + # Remove relations tables + for rtype in self._initialized['uri_rtypes']: + if not self._cnx.repo.schema.rschema(rtype).inlined: + self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) + else: + self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) + self.commit() + # Get all the initialized etypes/rtypes + if self._dbh.table_exists('dataio_initialized'): + crs = self.sql('SELECT retype, type FROM dataio_initialized') + for retype, _type in crs.fetchall(): + self.logger.info('Cleanup for %s' % retype) + if _type == 'etype': + # Cleanup entities tables - Recreate indexes + self._cleanup_entities(retype) + elif _type == 'rtype': + # Cleanup relations tables + self._cleanup_relations(retype) + self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s', + {'e': retype}) + # Create meta constraints (entities, is_instance_of, ...) + self._create_metatables_constraints() + self.commit() + # Delete the meta data table + for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'): + if self._dbh.table_exists(table_name): + self.sql('DROP TABLE %s' % table_name) + self.commit() + + @deprecated('[3.22] use prepare_insert_entity instead') + def create_entity(self, etype, **kwargs): + """ Create an entity + """ + eid = self.prepare_insert_entity(etype, **kwargs) + entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx) + entity.cw_attr_cache.update(kwargs) + entity.eid = eid + return entity + + @deprecated('[3.22] use prepare_insert_relation instead') + def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs): + self.prepare_insert_relation(subj_eid, rtype, obj_eid, *args, **kwargs) + + @deprecated('[3.22] use finish instead') + def cleanup(self): + self.finish() ### FLUSH ################################################################# @@ -492,17 +559,6 @@ else: raise exc - def commit(self): - self.on_commit() - super(MassiveObjectStore, self).commit() - - def flush(self): - """ Flush the data - """ - self.flush_entities() - self.flush_internal_relations() - self.flush_relations() - def flush_internal_relations(self): """ Flush the relations data """ @@ -621,45 +677,6 @@ tablename = '%s_relation' % rtype.lower() self.reapply_constraint_index(tablename) - def cleanup(self): - """ Remove temporary tables and columns - """ - self.logger.info("Start cleaning") - if self.slave_mode: - raise RuntimeError('Store cleanup is not allowed in slave mode') - self.logger.info("Start cleaning") - # Cleanup relations tables - for etype in self._initialized['init_uri_eid']: - self.sql('DROP TABLE uri_eid_%s' % etype.lower()) - # Remove relations tables - for rtype in self._initialized['uri_rtypes']: - if not self._cnx.repo.schema.rschema(rtype).inlined: - self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) - else: - self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) - self.commit() - # Get all the initialized etypes/rtypes - if self._dbh.table_exists('dataio_initialized'): - crs = self.sql('SELECT retype, type FROM dataio_initialized') - for retype, _type in crs.fetchall(): - self.logger.info('Cleanup for %s' % retype) - if _type == 'etype': - # Cleanup entities tables - Recreate indexes - self._cleanup_entities(retype) - elif _type == 'rtype': - # Cleanup relations tables - self._cleanup_relations(retype) - self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s', - {'e': retype}) - # Create meta constraints (entities, is_instance_of, ...) - self._create_metatables_constraints() - self.commit() - # Delete the meta data table - for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'): - if self._dbh.table_exists(table_name): - self.sql('DROP TABLE %s' % table_name) - self.commit() - def insert_massive_meta_data(self, etype): """ Massive insertion of meta data for a given etype, based on SQL statements. """ diff -r 7357b1485795 -r 8e1f6de61300 dataimport/test/test_massive_store.py --- a/dataimport/test/test_massive_store.py Wed Oct 21 16:31:05 2015 +0200 +++ b/dataimport/test/test_massive_store.py Wed Oct 21 16:32:11 2015 +0200 @@ -79,7 +79,7 @@ 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), 'geonameid': int(infos[0]), } - store.create_entity('Location', **entity) + store.prepare_insert_entity('Location', **entity) def test_autoflush_metadata(self): with self.admin_access.repo_cnx() as cnx: @@ -87,7 +87,7 @@ {'t': 'Location'}) self.assertEqual(len(crs.fetchall()), 0) store = MassiveObjectStore(cnx, autoflush_metadata=True) - store.create_entity('Location', name=u'toto') + store.prepare_insert_entity('Location', name=u'toto') store.flush() store.commit() store.cleanup() @@ -104,7 +104,7 @@ # self.assertEqual(len(crs.fetchall()), 0) # with self.admin_access.repo_cnx() as cnx: # store = MassiveObjectStore(cnx, autoflush_metadata=False) -# store.create_entity('Location', name=u'toto') +# store.prepare_insert_entity('Location', name=u'toto') # store.flush() # store.commit() # crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', @@ -119,8 +119,8 @@ def test_massimport_etype_metadata(self): with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx) - timezone = store.create_entity('TimeZone') - store.create_entity('Location', timezone=timezone.eid) + timezone_eid = store.prepare_insert_entity('TimeZone') + store.prepare_insert_entity('Location', timezone=timezone_eid) store.flush() store.commit() eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' @@ -167,7 +167,7 @@ def test_eids_seq_range(self): with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) - store.create_entity('Location', name=u'toto') + store.prepare_insert_entity('Location', name=u'toto') store.flush() cnx.commit() with self.admin_access.repo_cnx() as cnx: @@ -177,23 +177,22 @@ def test_eid_entity(self): with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) - entity = store.create_entity('Location', name=u'toto') + eid = store.prepare_insert_entity('Location', name=u'toto') store.flush() - self.assertGreater(entity.eid, 50000) + self.assertGreater(eid, 50000) def test_eid_entity_2(self): with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) - entity = store.create_entity('Location', name=u'toto', eid=10000) + eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000) store.flush() - with self.admin_access.repo_cnx() as cnx: - self.assertEqual(entity.eid, 10000) + self.assertEqual(eid, 10000) def test_on_commit_callback(self): counter = itertools.count() with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter)) - store.create_entity('Location', name=u'toto') + store.prepare_insert_entity('Location', name=u'toto') store.flush() store.commit() self.assertGreaterEqual(next(counter), 1) @@ -202,7 +201,7 @@ counter = itertools.count() with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter)) - store.create_entity('Location', nm='toto') + store.prepare_insert_entity('Location', nm='toto') store.flush() store.commit() self.assertGreaterEqual(next(counter), 1)