[dataimport] test and fix external source support for the massive store
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Tue, 02 Feb 2016 12:10:42 +0100
changeset 11328 9f2d7da47526
parent 11327 901243e41152
child 11329 a8cab8fb54ba
[dataimport] test and fix external source support for the massive store
cubicweb/dataimport/massive_store.py
cubicweb/dataimport/test/test_massive_store.py
cubicweb/dataimport/test/test_stores.py
--- a/cubicweb/dataimport/massive_store.py	Tue Jun 21 13:51:19 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py	Tue Feb 02 12:10:42 2016 +0100
@@ -22,6 +22,7 @@
 from collections import defaultdict
 from io import StringIO
 from itertools import chain
+from base64 import b64encode
 
 from six.moves import range
 
@@ -281,6 +282,7 @@
             self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
                      '(retype text, type varchar(128))')
             self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
+            self.sql('ALTER TABLE cw_%s ADD COLUMN extid VARCHAR(256)' % etype.lower())
         attrs = self.metagen.base_etype_attrs(etype)
         data = copy(attrs)  # base_etype_attrs is @cached, a copy is necessary
         data.update(kwargs)
@@ -292,6 +294,10 @@
         default_values = self.default_values[etype]
         missing_keys = set(default_values) - set(data)
         data.update((key, default_values[key]) for key in missing_keys)
+        extid = self.metagen.entity_extid(etype, data['eid'], data)
+        if extid is not None:
+            extid = b64encode(extid).decode('ascii')
+        data['extid'] = extid
         self.metagen.init_entity_attrs(etype, data['eid'], data)
         self._data_entities[etype].append(data)
         return data['eid']
@@ -338,7 +344,9 @@
             cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
             for retype, _type in cu.fetchall():
                 self.logger.info('Cleanup for %s' % retype)
-                if _type == 'rtype':
+                if _type == 'etype':
+                    self.sql('ALTER TABLE cw_%s DROP COLUMN extid' % retype)
+                elif _type == 'rtype':
                     # Cleanup relations tables
                     self._cleanup_relations(retype)
                 self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
@@ -404,7 +412,8 @@
             if not buf:
                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
                 raise ValueError('Error in buffer creation for etype %s' % etype)
-            columns = ['cw_%s' % attr for attr in columns]
+            columns = ['cw_%s' % attr if attr != 'extid' else attr
+                       for attr in columns]
             cursor = self._cnx.cnxset.cu
             try:
                 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
@@ -461,9 +470,9 @@
             self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
         # finally insert records into the entities table
         self.sql("INSERT INTO entities (eid, type, asource, extid) "
-                 "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
+                 "SELECT cw_eid, '%s', '%s', extid FROM cw_%s "
                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
-                 % (etype, etype.lower()))
+                 % (etype, self.metagen.source.uri, etype.lower()))
 
     def _insert_meta_relation(self, etype, eid_to, rtype):
         self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
--- a/cubicweb/dataimport/test/test_massive_store.py	Tue Jun 21 13:51:19 2016 +0200
+++ b/cubicweb/dataimport/test/test_massive_store.py	Tue Feb 02 12:10:42 2016 +0100
@@ -18,11 +18,13 @@
 
 import itertools
 
-from cubicweb.dataimport import ucsvreader
 from cubicweb.devtools import testlib, PostgresApptestConfiguration
 from cubicweb.devtools import startpgcluster, stoppgcluster
+from cubicweb.dataimport import ucsvreader, stores
 from cubicweb.dataimport.massive_store import MassiveObjectStore, PGHelper
 
+from test_stores import NoHookRQLObjectStoreWithCustomMDGenStoreTC
+
 
 def setUpModule():
     startpgcluster(__file__)
@@ -32,6 +34,16 @@
     stoppgcluster(__file__)
 
 
+class MassiveObjectStoreWithCustomMDGenStoreTC(NoHookRQLObjectStoreWithCustomMDGenStoreTC):
+    configcls = PostgresApptestConfiguration
+
+    def store_impl(self, cnx):
+        source = cnx.create_entity('CWSource', type=u'datafeed', name=u'test', url=u'test')
+        cnx.commit()
+        metagen = stores.MetadataGenerator(cnx, source=cnx.repo.sources_by_eid[source.eid])
+        return MassiveObjectStore(cnx, metagen=metagen)
+
+
 class MassImportSimpleTC(testlib.CubicWebTC):
     configcls = PostgresApptestConfiguration
     appid = 'data-massimport'
@@ -183,20 +195,20 @@
     def test_on_commit_callback(self):
         counter = itertools.count()
         with self.admin_access.repo_cnx() as cnx:
-            store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter))
+            store = MassiveObjectStore(cnx, on_commit_callback=lambda: next(counter))
             store.prepare_insert_entity('Location', name=u'toto')
             store.flush()
             store.commit()
-        self.assertGreaterEqual(next(counter), 1)
+        self.assertEqual(next(counter), 1)
 
     def test_on_rollback_callback(self):
         counter = itertools.count()
         with self.admin_access.repo_cnx() as cnx:
             store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
             store.prepare_insert_entity('Location', nm='toto')
+            store.commit()  # commit modification to the database before flush
             store.flush()
-            store.commit()
-        self.assertGreaterEqual(next(counter), 1)
+        self.assertEqual(next(counter), 1)
 
     def test_slave_mode_indexes(self):
         with self.admin_access.repo_cnx() as cnx:
--- a/cubicweb/dataimport/test/test_stores.py	Tue Jun 21 13:51:19 2016 +0200
+++ b/cubicweb/dataimport/test/test_stores.py	Tue Feb 02 12:10:42 2016 +0100
@@ -39,7 +39,9 @@
             group_eid = store.prepare_insert_entity('CWGroup', **self.insert_group_attrs)
             user_eid = store.prepare_insert_entity('CWUser', **self.insert_user_attrs)
             store.prepare_insert_relation(user_eid, 'in_group', group_eid)
+            store.flush()
             store.commit()
+            store.finish()
             user = cnx.execute('CWUser X WHERE X login "lgn"').one()
             self.assertEqual(user_eid, user.eid)
             self.assertTrue(user.creation_date)