# HG changeset patch # User Sylvain Thénault # Date 1454411442 -3600 # Node ID 9f2d7da47526595d3325403b676a3e900bb3448b # Parent 901243e411528008f76507f6d01ee3b1c06cdf84 [dataimport] test and fix external source support for the massive store diff -r 901243e41152 -r 9f2d7da47526 cubicweb/dataimport/massive_store.py --- a/cubicweb/dataimport/massive_store.py Tue Jun 21 13:51:19 2016 +0200 +++ b/cubicweb/dataimport/massive_store.py Tue Feb 02 12:10:42 2016 +0100 @@ -22,6 +22,7 @@ from collections import defaultdict from io import StringIO from itertools import chain +from base64 import b64encode from six.moves import range @@ -281,6 +282,7 @@ self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' '(retype text, type varchar(128))') self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) + self.sql('ALTER TABLE cw_%s ADD COLUMN extid VARCHAR(256)' % etype.lower()) attrs = self.metagen.base_etype_attrs(etype) data = copy(attrs) # base_etype_attrs is @cached, a copy is necessary data.update(kwargs) @@ -292,6 +294,10 @@ default_values = self.default_values[etype] missing_keys = set(default_values) - set(data) data.update((key, default_values[key]) for key in missing_keys) + extid = self.metagen.entity_extid(etype, data['eid'], data) + if extid is not None: + extid = b64encode(extid).decode('ascii') + data['extid'] = extid self.metagen.init_entity_attrs(etype, data['eid'], data) self._data_entities[etype].append(data) return data['eid'] @@ -338,7 +344,9 @@ cu = self.sql('SELECT retype, type FROM cwmassive_initialized') for retype, _type in cu.fetchall(): self.logger.info('Cleanup for %s' % retype) - if _type == 'rtype': + if _type == 'etype': + self.sql('ALTER TABLE cw_%s DROP COLUMN extid' % retype) + elif _type == 'rtype': # Cleanup relations tables self._cleanup_relations(retype) self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s', @@ -404,7 +412,8 @@ if not buf: # The buffer is empty. This is probably due to error in _create_copyfrom_buffer raise ValueError('Error in buffer creation for etype %s' % etype) - columns = ['cw_%s' % attr for attr in columns] + columns = ['cw_%s' % attr if attr != 'extid' else attr + for attr in columns] cursor = self._cnx.cnxset.cu try: cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns) @@ -461,9 +470,9 @@ self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') # finally insert records into the entities table self.sql("INSERT INTO entities (eid, type, asource, extid) " - "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " + "SELECT cw_eid, '%s', '%s', extid FROM cw_%s " "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" - % (etype, etype.lower())) + % (etype, self.metagen.source.uri, etype.lower())) def _insert_meta_relation(self, etype, eid_to, rtype): self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " diff -r 901243e41152 -r 9f2d7da47526 cubicweb/dataimport/test/test_massive_store.py --- a/cubicweb/dataimport/test/test_massive_store.py Tue Jun 21 13:51:19 2016 +0200 +++ b/cubicweb/dataimport/test/test_massive_store.py Tue Feb 02 12:10:42 2016 +0100 @@ -18,11 +18,13 @@ import itertools -from cubicweb.dataimport import ucsvreader from cubicweb.devtools import testlib, PostgresApptestConfiguration from cubicweb.devtools import startpgcluster, stoppgcluster +from cubicweb.dataimport import ucsvreader, stores from cubicweb.dataimport.massive_store import MassiveObjectStore, PGHelper +from test_stores import NoHookRQLObjectStoreWithCustomMDGenStoreTC + def setUpModule(): startpgcluster(__file__) @@ -32,6 +34,16 @@ stoppgcluster(__file__) +class MassiveObjectStoreWithCustomMDGenStoreTC(NoHookRQLObjectStoreWithCustomMDGenStoreTC): + configcls = PostgresApptestConfiguration + + def store_impl(self, cnx): + source = cnx.create_entity('CWSource', type=u'datafeed', name=u'test', url=u'test') + cnx.commit() + metagen = stores.MetadataGenerator(cnx, source=cnx.repo.sources_by_eid[source.eid]) + return MassiveObjectStore(cnx, metagen=metagen) + + class MassImportSimpleTC(testlib.CubicWebTC): configcls = PostgresApptestConfiguration appid = 'data-massimport' @@ -183,20 +195,20 @@ def test_on_commit_callback(self): counter = itertools.count() with self.admin_access.repo_cnx() as cnx: - store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter)) + store = MassiveObjectStore(cnx, on_commit_callback=lambda: next(counter)) store.prepare_insert_entity('Location', name=u'toto') store.flush() store.commit() - self.assertGreaterEqual(next(counter), 1) + self.assertEqual(next(counter), 1) def test_on_rollback_callback(self): counter = itertools.count() with self.admin_access.repo_cnx() as cnx: store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter)) store.prepare_insert_entity('Location', nm='toto') + store.commit() # commit modification to the database before flush store.flush() - store.commit() - self.assertGreaterEqual(next(counter), 1) + self.assertEqual(next(counter), 1) def test_slave_mode_indexes(self): with self.admin_access.repo_cnx() as cnx: diff -r 901243e41152 -r 9f2d7da47526 cubicweb/dataimport/test/test_stores.py --- a/cubicweb/dataimport/test/test_stores.py Tue Jun 21 13:51:19 2016 +0200 +++ b/cubicweb/dataimport/test/test_stores.py Tue Feb 02 12:10:42 2016 +0100 @@ -39,7 +39,9 @@ group_eid = store.prepare_insert_entity('CWGroup', **self.insert_group_attrs) user_eid = store.prepare_insert_entity('CWUser', **self.insert_user_attrs) store.prepare_insert_relation(user_eid, 'in_group', group_eid) + store.flush() store.commit() + store.finish() user = cnx.execute('CWUser X WHERE X login "lgn"').one() self.assertEqual(user_eid, user.eid) self.assertTrue(user.creation_date)