cubicweb/dataimport/massive_store.py
changeset 11326 06eeac9389a3
parent 11325 a29443fbd1f2
child 11328 9f2d7da47526
equal deleted inserted replaced
11325:a29443fbd1f2 11326:06eeac9389a3
    16 #
    16 #
    17 # You should have received a copy of the GNU Lesser General Public License along
    17 # You should have received a copy of the GNU Lesser General Public License along
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    19 
    19 
    20 import logging
    20 import logging
    21 from datetime import datetime
    21 from copy import copy
    22 from collections import defaultdict
    22 from collections import defaultdict
    23 from io import StringIO
    23 from io import StringIO
    24 from itertools import chain
    24 from itertools import chain
    25 
    25 
    26 from six.moves import range
    26 from six.moves import range
    27 
    27 
    28 import pytz
       
    29 
       
    30 from yams.constraints import SizeConstraint
    28 from yams.constraints import SizeConstraint
    31 
    29 
    32 from cubicweb.schema import PURE_VIRTUAL_RTYPES
    30 from cubicweb.schema import PURE_VIRTUAL_RTYPES
    33 from cubicweb.server.schema2sql import rschema_has_table
    31 from cubicweb.server.schema2sql import rschema_has_table
    34 from cubicweb.server.sqlutils import SQL_PREFIX
       
    35 from cubicweb.dataimport import stores, pgstore
    32 from cubicweb.dataimport import stores, pgstore
    36 from cubicweb.utils import make_uid
       
    37 
    33 
    38 
    34 
    39 class MassiveObjectStore(stores.RQLObjectStore):
    35 class MassiveObjectStore(stores.RQLObjectStore):
    40     """
    36     """
    41     Store for massive import of data, with delayed insertion of meta data.
    37     Store for massive import of data, with delayed insertion of meta data.
    90     iid_maxsize = 1024
    86     iid_maxsize = 1024
    91 
    87 
    92     def __init__(self, cnx,
    88     def __init__(self, cnx,
    93                  on_commit_callback=None, on_rollback_callback=None,
    89                  on_commit_callback=None, on_rollback_callback=None,
    94                  slave_mode=False,
    90                  slave_mode=False,
    95                  source=None,
    91                  eids_seq_range=10000,
    96                  eids_seq_range=10000):
    92                  metagen=None):
    97         """ Create a MassiveObject store, with the following attributes:
    93         """ Create a MassiveObject store, with the following attributes:
    98 
    94 
    99         - cnx: CubicWeb cnx
    95         - cnx: CubicWeb cnx
   100         - eids_seq_range: size of eid range reserved by the store for each batch
    96         - eids_seq_range: size of eid range reserved by the store for each batch
   101         """
    97         """
   102         super(MassiveObjectStore, self).__init__(cnx)
    98         super(MassiveObjectStore, self).__init__(cnx)
   103         self.on_commit_callback = on_commit_callback
    99         self.on_commit_callback = on_commit_callback
   104         self.on_rollback_callback = on_rollback_callback
   100         self.on_rollback_callback = on_rollback_callback
   105         self.slave_mode = slave_mode
   101         self.slave_mode = slave_mode
   106         self.eids_seq_range = eids_seq_range
   102         self.eids_seq_range = eids_seq_range
       
   103         if metagen is None:
       
   104             metagen = stores.MetadataGenerator(cnx)
       
   105         self.metagen = metagen
   107 
   106 
   108         self.logger = logging.getLogger('dataimport.massive_store')
   107         self.logger = logging.getLogger('dataimport.massive_store')
   109         self.sql = cnx.system_sql
   108         self.sql = cnx.system_sql
   110         self.schema = self._cnx.vreg.schema
   109         self.schema = self._cnx.vreg.schema
   111         self.default_values = get_default_values(self.schema)
   110         self.default_values = get_default_values(self.schema)
   125         # etypes for which we have a uri_eid_%(e)s_idx index
   124         # etypes for which we have a uri_eid_%(e)s_idx index
   126         self._uri_eid_inserted = set()
   125         self._uri_eid_inserted = set()
   127         # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
   126         # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
   128         self._uri_rtypes = set()
   127         self._uri_rtypes = set()
   129 
   128 
   130         self._now = datetime.now(pytz.utc)
       
   131         self._default_cwuri = make_uid('_auto_generated')
       
   132 
       
   133         if not self.slave_mode:
   129         if not self.slave_mode:
   134             # drop constraint and metadata table, they will be recreated when self.finish() is
   130             # drop constraint and metadata table, they will be recreated when self.finish() is
   135             # called
   131             # called
   136             self._drop_all_constraints()
   132             self._drop_all_constraints()
   137             self._drop_metatables_constraints()
   133             self._drop_metatables_constraints()
   138         if source is None:
   134 
   139             source = cnx.repo.system_source
   135     def _get_eid_gen(self):
   140         self.source = source
   136         """ Function getting the next eid. This is done by preselecting
       
   137         a given number of eids from the 'entities_id_seq', and then
       
   138         storing them"""
       
   139         while True:
       
   140             last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range)
       
   141             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
       
   142                 yield eid
   141 
   143 
   142     # URI related things #######################################################
   144     # URI related things #######################################################
   143 
   145 
   144     def init_rtype_table(self, etype_from, rtype, etype_to):
   146     def init_rtype_table(self, etype_from, rtype, etype_to):
   145         """ Build temporary table for standard rtype """
   147         """ Build temporary table for standard rtype """
   265     def restart_eid_sequence(self, start_eid):
   267     def restart_eid_sequence(self, start_eid):
   266         self.sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
   268         self.sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
   267             'entities_id_seq', initial_value=start_eid))
   269             'entities_id_seq', initial_value=start_eid))
   268         self._cnx.commit()
   270         self._cnx.commit()
   269 
   271 
   270     # ENTITIES CREATION #####################################################
       
   271 
       
   272     def _get_eid_gen(self):
       
   273         """ Function getting the next eid. This is done by preselecting
       
   274         a given number of eids from the 'entities_id_seq', and then
       
   275         storing them"""
       
   276         while True:
       
   277             last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range)
       
   278             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
       
   279                 yield eid
       
   280 
       
   281     def _apply_default_values(self, etype, kwargs):
       
   282         """Apply the default values for a given etype, attribute and value."""
       
   283         default_values = self.default_values[etype]
       
   284         missing_keys = set(default_values) - set(kwargs)
       
   285         kwargs.update((key, default_values[key]) for key in missing_keys)
       
   286 
       
   287     # store api ################################################################
   272     # store api ################################################################
   288 
   273 
   289     def prepare_insert_entity(self, etype, **kwargs):
   274     def prepare_insert_entity(self, etype, **kwargs):
   290         """Given an entity type, attributes and inlined relations, returns the inserted entity's
   275         """Given an entity type, attributes and inlined relations, returns the inserted entity's
   291         eid.
   276         eid.
   294             self._initialized.add(etype)
   279             self._initialized.add(etype)
   295             self._dbh.drop_indexes('cw_%s' % etype.lower())
   280             self._dbh.drop_indexes('cw_%s' % etype.lower())
   296             self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
   281             self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
   297                      '(retype text, type varchar(128))')
   282                      '(retype text, type varchar(128))')
   298             self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
   283             self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
   299         # Add meta data if not given
   284         attrs = self.metagen.base_etype_attrs(etype)
   300         if 'modification_date' not in kwargs:
   285         data = copy(attrs)  # base_etype_attrs is @cached, a copy is necessary
   301             kwargs['modification_date'] = self._now
   286         data.update(kwargs)
   302         if 'creation_date' not in kwargs:
   287         if 'eid' not in data:
   303             kwargs['creation_date'] = self._now
   288             # If eid is not given and the eids sequence is set, use the value from the sequence
   304         if 'cwuri' not in kwargs:
   289             eid = self.get_next_eid()
   305             kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri)
   290             data['eid'] = eid
   306             self._count_cwuri += 1
   291         # XXX default values could be set once for all in base entity
   307         if 'eid' not in kwargs:
   292         default_values = self.default_values[etype]
   308             # If eid is not given and the eids sequence is set,
   293         missing_keys = set(default_values) - set(data)
   309             # use the value from the sequence
   294         data.update((key, default_values[key]) for key in missing_keys)
   310             kwargs['eid'] = self.get_next_eid()
   295         self.metagen.init_entity_attrs(etype, data['eid'], data)
   311         self._apply_default_values(etype, kwargs)
   296         self._data_entities[etype].append(data)
   312         self._data_entities[etype].append(kwargs)
   297         return data['eid']
   313         return kwargs.get('eid')
       
   314 
   298 
   315     def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
   299     def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
   316         """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
   300         """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
   317         and ``eid_to``.
   301         and ``eid_to``.
   318         """
   302         """
   464         self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
   448         self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
   465 
   449 
   466     def insert_massive_metadata(self, etype):
   450     def insert_massive_metadata(self, etype):
   467         """ Massive insertion of meta data for a given etype, based on SQL statements.
   451         """ Massive insertion of meta data for a given etype, based on SQL statements.
   468         """
   452         """
   469         self._insert_meta_relation(etype, self._cnx.user.eid, 'created_by_relation')
   453         # insert standard metadata relations
   470         self._insert_meta_relation(etype, self._cnx.user.eid, 'owned_by_relation')
   454         for rtype, eid in self.metagen.base_etype_rels(etype).items():
   471         self._insert_meta_relation(etype, self.source.eid, 'cw_source_relation')
   455             self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
   472         eschema = self.schema[etype].eid
   456         # insert cw_source, is and is_instance_of relations (normally handled by the system source)
       
   457         self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
       
   458         eschema = self.schema[etype]
   473         self._insert_meta_relation(etype, eschema.eid, 'is_relation')
   459         self._insert_meta_relation(etype, eschema.eid, 'is_relation')
   474         for parent_eschema in eschema.ancestors() + [eschema]:
   460         for parent_eschema in chain(eschema.ancestors(), [eschema]):
   475             self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
   461             self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
       
   462         # finally insert records into the entities table
   476         self.sql("INSERT INTO entities (eid, type, asource, extid) "
   463         self.sql("INSERT INTO entities (eid, type, asource, extid) "
   477                  "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
   464                  "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
   478                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
   465                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
   479                  % (etype, etype.lower()))
   466                  % (etype, etype.lower()))
   480 
   467