--- a/cubicweb/dataimport/massive_store.py Tue Sep 27 12:01:24 2016 +0200
+++ b/cubicweb/dataimport/massive_store.py Tue Sep 27 12:23:19 2016 +0200
@@ -20,7 +20,6 @@
import logging
from copy import copy
from collections import defaultdict
-from io import StringIO
from itertools import chain
from base64 import b64encode
@@ -43,41 +42,20 @@
the indexes.
- This store can only insert relations that are not inlined (i.e.,
- which do *not* have inlined=True in their definition in the schema), unless they are
- specified as entity attributes.
+ which do *not* have inlined=True in their definition in the schema),
+ unless they are specified as entity attributes.
It should be used as follows:
store = MassiveObjectStore(cnx)
- store.init_rtype_table('Person', 'lives_in', 'Location')
- ...
-
- store.prepare_insert_entity('Person', subj_iid_attribute=person_iid, ...)
- store.prepare_insert_entity('Location', obj_iid_attribute=location_iid, ...)
- ...
-
- # subj_iid_attribute and obj_iid_attribute are argument names
- # chosen by the user (e.g. "cwuri"). These names can be identical.
- # person_iid and location_iid are unique IDs and depend on the data
- # (e.g URI).
+ eid_p = store.prepare_insert_entity('Person',
+ cwuri='http://dbpedia.org/toto',
+ name='Toto')
+ eid_loc = store.prepare_insert_entity('Location',
+ cwuri='http://geonames.org/11111',
+ name='Somewhere')
+ store.prepare_insert_relation(eid_p, 'lives_in', eid_loc)
store.flush()
- store.relate_by_iid(person_iid, 'lives_in', location_iid)
- # For example:
- store.prepare_insert_entity('Person',
- cwuri='http://dbpedia.org/toto',
- name='Toto')
- store.prepare_insert_entity('Location',
- uri='http://geonames.org/11111',
- name='Somewhere')
- store.flush()
- store.relate_by_iid('http://dbpedia.org/toto',
- 'lives_in',
- 'http://geonames.org/11111')
- # Finally
- store.convert_relations('Person', 'lives_in', 'Location',
- 'subj_iid_attribute', 'obj_iid_attribute')
- # For the previous example:
- store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri')
...
store.commit()
store.finish()
@@ -85,8 +63,6 @@
Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself
if desired.
"""
- # max size of the iid, used to create the iid_eid conversion table
- iid_maxsize = 1024
def __init__(self, cnx,
on_commit_callback=None, on_rollback_callback=None,
@@ -120,14 +96,6 @@
self._data_entities = defaultdict(list)
self._data_relations = defaultdict(list)
self._initialized = set()
- # uri handling
- self._data_uri_relations = defaultdict(list)
- # etypes for which we have a uri_eid_%(etype)s table
- self._init_uri_eid = set()
- # etypes for which we have a uri_eid_%(e)s_idx index
- self._uri_eid_inserted = set()
- # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
- self._uri_rtypes = set()
if not self.slave_mode:
# drop constraint and metadata table, they will be recreated when self.finish() is
@@ -144,112 +112,6 @@
for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
yield eid
- # URI related things #######################################################
-
- def init_rtype_table(self, etype_from, rtype, etype_to):
- """ Build temporary table for standard rtype """
- # Create an uri_eid table for each etype for a better control of which etype is concerned by
- # a particular possibly multivalued relation.
- for etype in (etype_from, etype_to):
- if etype and etype not in self._init_uri_eid:
- self._init_uri_eid.add(etype)
- self.sql('CREATE TABLE IF NOT EXISTS uri_eid_%(e)s'
- '(uri character varying(%(size)s), eid integer)'
- % {'e': etype.lower(), 'size': self.iid_maxsize})
- if rtype not in self._uri_rtypes:
- # Create the temporary table
- if not self.schema.rschema(rtype).inlined:
- self.sql('CREATE TABLE IF NOT EXISTS %(r)s_relation_iid_tmp'
- '(uri_from character varying(%(s)s), uri_to character varying(%(s)s))'
- % {'r': rtype, 's': self.iid_maxsize})
- self._uri_rtypes.add(rtype)
- else:
- self.logger.warning("inlined relation %s: cannot insert it", rtype)
-
- def relate_by_iid(self, iid_from, rtype, iid_to):
- """Add new relation based on the internal id (iid)
- of the entities (not the eid)"""
- # Push data
- if isinstance(iid_from, unicode):
- iid_from = iid_from.encode('utf-8')
- if isinstance(iid_to, unicode):
- iid_to = iid_to.encode('utf-8')
- self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})
-
- def flush_relations(self):
- """ Flush the relations data
- """
- for rtype, data in self._data_uri_relations.items():
- if not data:
- self.logger.info('No data for rtype %s', rtype)
- buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data]))
- if not buf:
- self.logger.info('Empty Buffer for rtype %s', rtype)
- continue
- cursor = self._cnx.cnxset.cu
- if not self.schema.rschema(rtype).inlined:
- cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(),
- null='NULL', columns=('uri_from', 'uri_to'))
- else:
- self.logger.warning("inlined relation %s: cannot insert it", rtype)
- buf.close()
- # Clear data cache
- self._data_uri_relations[rtype] = []
-
- def fill_uri_eid_table(self, etype, uri_label):
- """ Fill the uri_eid table
- """
- if etype not in self._uri_eid_inserted:
- self._uri_eid_inserted.add(etype)
- self.logger.info('Fill uri_eid for etype %s', etype)
- self.sql('INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s'
- % {'l': uri_label, 'e': etype.lower()})
- self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s(uri)'
- % {'e': etype.lower()})
-
- def convert_relations(self, etype_from, rtype, etype_to,
- uri_label_from='cwuri', uri_label_to='cwuri'):
- """ Flush the converted relations
- """
- # Always flush relations to be sure
- self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to)
- self.flush_relations()
- if uri_label_from:
- self.fill_uri_eid_table(etype_from, uri_label_from)
- if uri_label_to:
- self.fill_uri_eid_table(etype_to, uri_label_to)
- if self.schema.rschema(rtype).inlined:
- self.logger.warning("Can't insert inlined relation %s", rtype)
- return
- if uri_label_from and uri_label_to:
- sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid
- FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2
- WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS (
- SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid);
- '''
- elif uri_label_to:
- sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
- CAST(T.uri_from AS INTEGER), O1.eid
- FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1
- WHERE O1.uri=T.uri_to AND NOT EXISTS (
- SELECT 1 FROM %(r)s_relation AS TT WHERE
- TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid);
- '''
- elif uri_label_from:
- sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to
- O1.eid, CAST(T.uri_to AS INTEGER)
- FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1
- WHERE O1.uri=T.uri_from AND NOT EXISTS (
- SELECT 1 FROM %(r)s_relation AS TT WHERE
- TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER));
- '''
- try:
- self.sql(sql % {'r': rtype.lower(),
- 'et': etype_to.lower() if etype_to else u'',
- 'ef': etype_from.lower() if etype_from else u''})
- except Exception as ex:
- self.logger.error("Can't insert relation %s: %s", rtype, ex)
-
# SQL utilities #########################################################
def _drop_all_constraints(self):
@@ -324,7 +186,6 @@
def flush(self):
"""Flush the data"""
self.flush_entities()
- self.flush_internal_relations()
self.flush_relations()
def commit(self):
@@ -337,12 +198,6 @@
if self.slave_mode:
raise RuntimeError('Store cleanup is not allowed in slave mode')
self.logger.info("Start cleaning")
- # Cleanup relations tables
- for etype in self._init_uri_eid:
- self.sql('DROP TABLE uri_eid_%s' % etype.lower())
- # Remove relations tables
- for rtype in self._uri_rtypes:
- self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
# Get all the initialized etypes/rtypes
if self._dbh.table_exists('cwmassive_initialized'):
cu = self.sql('SELECT retype, type FROM cwmassive_initialized')
@@ -373,9 +228,8 @@
else:
raise exc
- def flush_internal_relations(self):
- """ Flush the relations data
- """
+ def flush_relations(self):
+ """Flush the relations data."""
for rtype, data in self._data_relations.items():
if not data:
# There is no data for these etype for this flush round.
@@ -392,8 +246,7 @@
self._data_relations[rtype] = []
def flush_entities(self):
- """ Flush the entities data
- """
+ """Flush the entities data."""
for etype, data in self._data_entities.items():
if not data:
# There is no data for these etype for this flush round.