cubicweb/dataimport/massive_store.py
changeset 11789 71df2811b422
parent 11788 8e1fb9445d75
child 11790 04607da552ac
equal deleted inserted replaced
11788:8e1fb9445d75 11789:71df2811b422
    26 
    26 
    27 from six import text_type
    27 from six import text_type
    28 from six.moves import range
    28 from six.moves import range
    29 
    29 
    30 from cubicweb.dataimport import stores, pgstore
    30 from cubicweb.dataimport import stores, pgstore
       
    31 from cubicweb.server.schema2sql import eschema_sql_def
    31 
    32 
    32 
    33 
    33 class MassiveObjectStore(stores.RQLObjectStore):
    34 class MassiveObjectStore(stores.RQLObjectStore):
    34     """Store for massive import of data, with delayed insertion of meta data.
    35     """Store for massive import of data, with delayed insertion of meta data.
    35 
    36 
    87         self.logger = logging.getLogger('dataimport.massive_store')
    88         self.logger = logging.getLogger('dataimport.massive_store')
    88         self.sql = cnx.system_sql
    89         self.sql = cnx.system_sql
    89         self.schema = cnx.vreg.schema
    90         self.schema = cnx.vreg.schema
    90         self.default_values = get_default_values(self.schema)
    91         self.default_values = get_default_values(self.schema)
    91         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
    92         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
       
    93         self._source_dbhelper = cnx.repo.system_source.dbhelper
    92         self._dbh = PGHelper(cnx)
    94         self._dbh = PGHelper(cnx)
    93 
    95 
    94         self._data_entities = defaultdict(list)
    96         self._data_entities = defaultdict(list)
    95         self._data_relations = defaultdict(list)
    97         self._data_relations = defaultdict(list)
    96         self._initialized = set()
    98         self._initialized = {}
    97         self._constraints_dropped = self.slave_mode
       
    98 
    99 
    99     def _get_eid_gen(self):
   100     def _get_eid_gen(self):
   100         """ Function getting the next eid. This is done by preselecting
   101         """ Function getting the next eid. This is done by preselecting
   101         a given number of eids from the 'entities_id_seq', and then
   102         a given number of eids from the 'entities_id_seq', and then
   102         storing them"""
   103         storing them"""
   114         """
   115         """
   115         assert not self.slave_mode
   116         assert not self.slave_mode
   116         if self not in self._initialized:
   117         if self not in self._initialized:
   117             self.sql('CREATE TABLE cwmassive_initialized'
   118             self.sql('CREATE TABLE cwmassive_initialized'
   118                      '(retype text, type varchar(128), uuid varchar(32))')
   119                      '(retype text, type varchar(128), uuid varchar(32))')
   119             self._initialized.append(self)
   120             self._initialized[self] = None
   120 
       
   121     def master_init_etype(self, etype):
       
   122         """Initialize database for insertion of entities of the given etype.
       
   123 
       
   124         This is expected to be called once, usually by the master store in master/slaves
       
   125         configuration.
       
   126         """
       
   127         self._drop_metadata_constraints_if_necessary()
       
   128         tablename = 'cw_%s' % etype.lower()
       
   129         self._dbh.drop_constraints(tablename)
       
   130         self._dbh.drop_indexes(tablename)
       
   131         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
       
   132                  '(retype text, type varchar(128), uuid varchar(32))')
       
   133         self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
       
   134                  {'e': etype, 'uuid': self.uuid})
       
   135 
       
   136     def master_insert_etype_metadata(self, etype):
       
   137         """Massive insertion of meta data for a given etype, based on SQL statements.
       
   138 
       
   139         In master/slabes configuration, you'll usually want to call it from the master once all
       
   140         slaves have finished (at least slaves won't call it automatically, so that's your
       
   141         reponsability).
       
   142         """
       
   143         # insert standard metadata relations
       
   144         for rtype, eid in self.metagen.base_etype_rels(etype).items():
       
   145             self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
       
   146         # insert cw_source, is and is_instance_of relations (normally handled by the system source)
       
   147         self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
       
   148         eschema = self.schema[etype]
       
   149         self._insert_meta_relation(etype, eschema.eid, 'is_relation')
       
   150         for parent_eschema in chain(eschema.ancestors(), [eschema]):
       
   151             self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
       
   152         # finally insert records into the entities table
       
   153         self.sql("INSERT INTO entities (eid, type, extid) "
       
   154                  "SELECT cw_eid, '%s', extid FROM cw_%s "
       
   155                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   156                  % (etype, etype.lower()))
       
   157 
   121 
   158     # SQL utilities #########################################################
   122     # SQL utilities #########################################################
   159 
       
   160     def _drop_metadata_constraints_if_necessary(self):
       
   161         """Drop constraints and indexes for the metadata tables if necessary."""
       
   162         if not self._constraints_dropped:
       
   163             self._drop_metadata_constraints()
       
   164             self._constraints_dropped = True
       
   165 
   123 
   166     def _drop_metadata_constraints(self):
   124     def _drop_metadata_constraints(self):
   167         """Drop constraints and indexes for the metadata tables.
   125         """Drop constraints and indexes for the metadata tables.
   168 
   126 
   169         They will be recreated by the `finish` method.
   127         They will be recreated by the `finish` method.
   187 
   145 
   188     def prepare_insert_entity(self, etype, **data):
   146     def prepare_insert_entity(self, etype, **data):
   189         """Given an entity type, attributes and inlined relations, returns the inserted entity's
   147         """Given an entity type, attributes and inlined relations, returns the inserted entity's
   190         eid.
   148         eid.
   191         """
   149         """
   192         if not self.slave_mode and etype not in self._initialized:
   150         if etype not in self._initialized:
   193             self._initialized.add(etype)
   151             if not self.slave_mode:
   194             self.master_init_etype(etype)
   152                 self.master_init()
       
   153             tablename = 'cw_%s' % etype.lower()
       
   154             tmp_tablename = '%s_%s' % (tablename, self.uuid)
       
   155             self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)",
       
   156                      {'e': etype, 'uuid': self.uuid})
       
   157             attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
       
   158             self.sql('CREATE TABLE %s(%s);' % (tmp_tablename,
       
   159                                                ', '.join('cw_%s %s' % (column, sqltype)
       
   160                                                          for column, sqltype in attr_defs)))
       
   161             self._initialized[etype] = [attr for attr, _ in attr_defs]
       
   162 
   195         if 'eid' not in data:
   163         if 'eid' not in data:
   196             # If eid is not given and the eids sequence is set, use the value from the sequence
   164             # If eid is not given and the eids sequence is set, use the value from the sequence
   197             eid = self.get_next_eid()
   165             eid = self.get_next_eid()
   198             data['eid'] = eid
   166             data['eid'] = eid
   199         self._data_entities[etype].append(data)
   167         self._data_entities[etype].append(data)
   207         """
   175         """
   208         if rtype not in self._initialized:
   176         if rtype not in self._initialized:
   209             if not self.slave_mode:
   177             if not self.slave_mode:
   210                 self.master_init()
   178                 self.master_init()
   211             assert not self._cnx.vreg.schema.rschema(rtype).inlined
   179             assert not self._cnx.vreg.schema.rschema(rtype).inlined
   212             self._initialized.add(rtype)
   180             self._initialized[rtype] = None
   213             tablename = '%s_relation' % rtype.lower()
   181             tablename = '%s_relation' % rtype.lower()
   214             tmp_tablename = '%s_%s' % (tablename, self.uuid)
   182             tmp_tablename = '%s_%s' % (tablename, self.uuid)
   215             self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
   183             self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)",
   216                      {'r': rtype, 'uuid': self.uuid})
   184                      {'r': rtype, 'uuid': self.uuid})
   217             self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename)
   185             self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename)
   232         assert not self.slave_mode, 'finish method should only be called by the master store'
   200         assert not self.slave_mode, 'finish method should only be called by the master store'
   233         self.logger.info("Start cleaning")
   201         self.logger.info("Start cleaning")
   234         # Get all the initialized etypes/rtypes
   202         # Get all the initialized etypes/rtypes
   235         if self._dbh.table_exists('cwmassive_initialized'):
   203         if self._dbh.table_exists('cwmassive_initialized'):
   236             cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
   204             cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized')
       
   205             entities = defaultdict(list)
   237             relations = defaultdict(list)
   206             relations = defaultdict(list)
   238             for retype, _type, uuid in cu.fetchall():
   207             for retype, _type, uuid in cu.fetchall():
   239                 if _type == 'rtype':
   208                 if _type == 'rtype':
   240                     relations[retype].append(uuid)
   209                     relations[retype].append(uuid)
       
   210                 else:  # _type = 'etype'
       
   211                     entities[retype].append(uuid)
       
   212             # if there is some entities to insert, delete constraint on metadata tables once for all
       
   213             if entities:
       
   214                 self._drop_metadata_constraints()
       
   215             # get back entity data from the temporary tables
       
   216             for etype, uuids in entities.items():
       
   217                 tablename = 'cw_%s' % etype.lower()
       
   218                 attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype])
       
   219                 columns = ','.join('cw_%s' % attr for attr, _ in attr_defs)
       
   220                 self._dbh.drop_constraints(tablename)
       
   221                 self._dbh.drop_indexes(tablename)
       
   222                 for uuid in uuids:
       
   223                     tmp_tablename = '%s_%s' % (tablename, uuid)
       
   224                     self.sql('INSERT INTO %(table)s(%(columns)s) '
       
   225                              'SELECT %(columns)s FROM %(tmp_table)s'
       
   226                              % {'table': tablename, 'tmp_table': tmp_tablename,
       
   227                                 'columns': columns})
       
   228                     self._insert_etype_metadata(etype, tmp_tablename)
       
   229                     self._tmp_data_cleanup(tmp_tablename, etype, uuid)
   241             # get back relation data from the temporary tables
   230             # get back relation data from the temporary tables
   242             for rtype, uuids in relations.items():
   231             for rtype, uuids in relations.items():
   243                 tablename = '%s_relation' % rtype.lower()
   232                 tablename = '%s_relation' % rtype.lower()
   244                 self._dbh.drop_constraints(tablename)
   233                 self._dbh.drop_constraints(tablename)
   245                 self._dbh.drop_indexes(tablename)
   234                 self._dbh.drop_indexes(tablename)
   249                     self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT '
   238                     self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT '
   250                              'T.eid_from, T.eid_to FROM %(tmp_table)s AS T '
   239                              'T.eid_from, T.eid_to FROM %(tmp_table)s AS T '
   251                              'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
   240                              'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE '
   252                              'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
   241                              'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);'
   253                              % {'table': tablename, 'tmp_table': tmp_tablename})
   242                              % {'table': tablename, 'tmp_table': tmp_tablename})
   254                     # Drop temporary relation table and record from cwmassive_initialized
   243                     self._tmp_data_cleanup(tmp_tablename, rtype, uuid)
   255                     self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
       
   256                     self.sql('DELETE FROM cwmassive_initialized '
       
   257                              'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
       
   258                              {'rtype': retype, 'uuid': uuid})
       
   259         # restore all deleted indexes and constraints
   244         # restore all deleted indexes and constraints
   260         self._dbh.restore_indexes_and_constraints()
   245         self._dbh.restore_indexes_and_constraints()
   261         # delete the meta data table
   246         # delete the meta data table
   262         self.sql('DROP TABLE IF EXISTS cwmassive_initialized')
   247         self.sql('DROP TABLE IF EXISTS cwmassive_initialized')
   263         self.commit()
   248         self.commit()
       
   249 
       
   250     def _insert_etype_metadata(self, etype, tmp_tablename):
       
   251         """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`.
       
   252         """
       
   253         # insert standard metadata relations
       
   254         for rtype, eid in self.metagen.base_etype_rels(etype).items():
       
   255             self._insert_meta_relation(tmp_tablename, rtype, eid)
       
   256         # insert cw_source, is and is_instance_of relations (normally handled by the system source)
       
   257         self._insert_meta_relation(tmp_tablename, 'cw_source', self.metagen.source.eid)
       
   258         eschema = self.schema[etype]
       
   259         self._insert_meta_relation(tmp_tablename, 'is', eschema.eid)
       
   260         for parent_eschema in chain(eschema.ancestors(), [eschema]):
       
   261             self._insert_meta_relation(tmp_tablename, 'is_instance_of', parent_eschema.eid)
       
   262         # finally insert records into the entities table
       
   263         self.sql("INSERT INTO entities(eid, type) "
       
   264                  "SELECT cw_eid, '%s' FROM %s "
       
   265                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   266                  % (etype, tmp_tablename))
       
   267 
       
   268     def _insert_meta_relation(self, tmp_tablename, rtype, eid_to):
       
   269         self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s "
       
   270                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   271                  % (rtype, eid_to, tmp_tablename))
       
   272 
       
   273     def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid):
       
   274         """Drop temporary relation table and record from cwmassive_initialized."""
       
   275         self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename})
       
   276         self.sql('DELETE FROM cwmassive_initialized '
       
   277                  'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
       
   278                  {'rtype': ertype, 'uuid': uuid})
   264 
   279 
   265     # FLUSH #################################################################
   280     # FLUSH #################################################################
   266 
   281 
   267     def on_commit(self):
   282     def on_commit(self):
   268         if self.on_commit_callback:
   283         if self.on_commit_callback:
   294         metagen = self.metagen
   309         metagen = self.metagen
   295         for etype, data in self._data_entities.items():
   310         for etype, data in self._data_entities.items():
   296             if not data:
   311             if not data:
   297                 # There is no data for these etype for this flush round.
   312                 # There is no data for these etype for this flush round.
   298                 continue
   313                 continue
   299             # XXX It may be interresting to directly infer the columns' names from the schema
   314             attrs = self._initialized[etype]
   300             # XXX For now, the _create_copyfrom_buffer does a "row[column]"
   315             _base_data = dict.fromkeys(attrs)
   301             # which can lead to a key error.
       
   302             # Thus we should create dictionary with all the keys.
       
   303             columns = set()
       
   304             for d in data:
       
   305                 columns.update(d)
       
   306             _base_data = dict.fromkeys(columns)
       
   307             _base_data.update(self.default_values[etype])
   316             _base_data.update(self.default_values[etype])
   308             _base_data.update(metagen.base_etype_attrs(etype))
   317             _base_data.update(metagen.base_etype_attrs(etype))
   309             _data = []
   318             _data = []
   310             for d in data:
   319             for d in data:
       
   320                 # do this first on `d`, because it won't fill keys associated to None as provided by
       
   321                 # `_base_data`
       
   322                 metagen.init_entity_attrs(etype, d['eid'], d)
       
   323                 # XXX warn/raise if there is some key not in attrs?
   311                 _d = _base_data.copy()
   324                 _d = _base_data.copy()
   312                 _d.update(d)
   325                 _d.update(d)
   313                 metagen.init_entity_attrs(etype, _d['eid'], _d)
       
   314                 _data.append(_d)
   326                 _data.append(_d)
   315             buf = pgstore._create_copyfrom_buffer(_data, columns)
   327             buf = pgstore._create_copyfrom_buffer(_data, attrs)
   316             columns = ['cw_%s' % attr for attr in columns]
   328             tablename = 'cw_%s' % etype.lower()
       
   329             tmp_tablename = '%s_%s' % (tablename, self.uuid)
       
   330             columns = ['cw_%s' % attr for attr in attrs]
   317             cursor = self._cnx.cnxset.cu
   331             cursor = self._cnx.cnxset.cu
   318             try:
   332             try:
   319                 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
   333                 cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns)
   320             except Exception as exc:
   334             except Exception as exc:
   321                 self.on_rollback(exc, etype, data)
   335                 self.on_rollback(exc, etype, data)
   322             # Clear data cache
   336             # Clear data cache
   323             self._data_entities[etype] = []
   337             self._data_entities[etype] = []
   324             if not self.slave_mode:
       
   325                 self.master_insert_etype_metadata(etype)
       
   326 
       
   327     def _insert_meta_relation(self, etype, eid_to, rtype):
       
   328         self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
       
   329                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   330                  % (rtype, eid_to, etype.lower()))
       
   331 
   338 
   332 
   339 
   333 def get_default_values(schema):
   340 def get_default_values(schema):
   334     """analyzes yams ``schema`` and returns the list of default values.
   341     """analyzes yams ``schema`` and returns the list of default values.
   335 
   342