cubicweb/dataimport/massive_store.py
changeset 11781 4ebd968f364c
parent 11780 307d96c0ab5a
child 11782 056004c17c71
equal deleted inserted replaced
11780:307d96c0ab5a 11781:4ebd968f364c
   104         while True:
   104         while True:
   105             last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range)
   105             last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range)
   106             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
   106             for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1):
   107                 yield eid
   107                 yield eid
   108 
   108 
       
   109     # master/slaves specific API
       
   110 
       
   111     def master_init_etype(self, etype):
       
   112         """Initialize database for insertion of entities of the given etype.
       
   113 
       
   114         This is expected to be called once, usually by the master store in master/slaves
       
   115         configuration.
       
   116         """
       
   117         self._drop_metadata_constraints_if_necessary()
       
   118         tablename = 'cw_%s' % etype.lower()
       
   119         self._dbh.drop_constraints(tablename)
       
   120         self._dbh.drop_indexes(tablename)
       
   121         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
       
   122                  '(retype text, type varchar(128))')
       
   123         self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
       
   124         self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename)
       
   125 
       
   126     def master_init_rtype(self, rtype):
       
   127         """Initialize database for insertion of relation of the given rtype.
       
   128 
       
   129         This is expected to be called once, usually by the master store in master/slaves
       
   130         configuration.
       
   131         """
       
   132         assert not self._cnx.vreg.schema.rschema(rtype).inlined
       
   133         self._drop_metadata_constraints_if_necessary()
       
   134         tablename = '%s_relation' % rtype.lower()
       
   135         self._dbh.drop_constraints(tablename)
       
   136         self._dbh.drop_indexes(tablename)
       
   137         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
       
   138                  '(retype text, type varchar(128))')
       
   139         self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
       
   140         self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename)
       
   141 
       
   142     def master_insert_etype_metadata(self, etype):
       
   143         """Massive insertion of meta data for a given etype, based on SQL statements.
       
   144 
       
   145         In master/slabes configuration, you'll usually want to call it from the master once all
       
   146         slaves have finished (at least slaves won't call it automatically, so that's your
       
   147         reponsability).
       
   148         """
       
   149         # insert standard metadata relations
       
   150         for rtype, eid in self.metagen.base_etype_rels(etype).items():
       
   151             self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
       
   152         # insert cw_source, is and is_instance_of relations (normally handled by the system source)
       
   153         self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
       
   154         eschema = self.schema[etype]
       
   155         self._insert_meta_relation(etype, eschema.eid, 'is_relation')
       
   156         for parent_eschema in chain(eschema.ancestors(), [eschema]):
       
   157             self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
       
   158         # finally insert records into the entities table
       
   159         self.sql("INSERT INTO entities (eid, type, extid) "
       
   160                  "SELECT cw_eid, '%s', extid FROM cw_%s "
       
   161                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   162                  % (etype, etype.lower()))
       
   163 
   109     # SQL utilities #########################################################
   164     # SQL utilities #########################################################
   110 
   165 
   111     def _drop_metadata_constraints_if_necessary(self):
   166     def _drop_metadata_constraints_if_necessary(self):
   112         """Drop constraints and indexes for the metadata tables if necessary."""
   167         """Drop constraints and indexes for the metadata tables if necessary."""
   113         if not self._constraints_dropped:
   168         if not self._constraints_dropped:
   140         """Given an entity type, attributes and inlined relations, returns the inserted entity's
   195         """Given an entity type, attributes and inlined relations, returns the inserted entity's
   141         eid.
   196         eid.
   142         """
   197         """
   143         if not self.slave_mode and etype not in self._initialized:
   198         if not self.slave_mode and etype not in self._initialized:
   144             self._initialized.add(etype)
   199             self._initialized.add(etype)
   145             self._drop_metadata_constraints_if_necessary()
   200             self.master_init_etype(etype)
   146             tablename = 'cw_%s' % etype.lower()
       
   147             self._dbh.drop_constraints(tablename)
       
   148             self._dbh.drop_indexes(tablename)
       
   149             self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
       
   150                      '(retype text, type varchar(128))')
       
   151             self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype})
       
   152         attrs = self.metagen.base_etype_attrs(etype)
   201         attrs = self.metagen.base_etype_attrs(etype)
   153         data = copy(attrs)  # base_etype_attrs is @cached, a copy is necessary
   202         data = copy(attrs)  # base_etype_attrs is @cached, a copy is necessary
   154         data.update(kwargs)
   203         data.update(kwargs)
   155         if 'eid' not in data:
   204         if 'eid' not in data:
   156             # If eid is not given and the eids sequence is set, use the value from the sequence
   205             # If eid is not given and the eids sequence is set, use the value from the sequence
   169         and ``eid_to``.
   218         and ``eid_to``.
   170 
   219 
   171         Relation must not be inlined.
   220         Relation must not be inlined.
   172         """
   221         """
   173         if not self.slave_mode and rtype not in self._initialized:
   222         if not self.slave_mode and rtype not in self._initialized:
   174             assert not self._cnx.vreg.schema.rschema(rtype).inlined
       
   175             self._initialized.add(rtype)
   223             self._initialized.add(rtype)
   176             self._drop_metadata_constraints_if_necessary()
   224             self.master_init_rtype(rtype)
   177             tablename = '%s_relation' % rtype.lower()
       
   178             self._dbh.drop_constraints(tablename)
       
   179             self._dbh.drop_indexes(tablename)
       
   180             self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)'
       
   181                      % tablename)
       
   182             self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized'
       
   183                      '(retype text, type varchar(128))')
       
   184             self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype})
       
   185         self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
   225         self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
   186 
   226 
   187     def flush(self):
   227     def flush(self):
   188         """Flush the data"""
   228         """Flush the data"""
   189         self.flush_entities()
   229         self.flush_entities()
   274             except Exception as exc:
   314             except Exception as exc:
   275                 self.on_rollback(exc, etype, data)
   315                 self.on_rollback(exc, etype, data)
   276             # Clear data cache
   316             # Clear data cache
   277             self._data_entities[etype] = []
   317             self._data_entities[etype] = []
   278             if not self.slave_mode:
   318             if not self.slave_mode:
   279                 self._insert_etype_metadata(etype)
   319                 self.master_insert_etype_metadata(etype)
   280 
   320 
   281     def _cleanup_relations(self, rtype):
   321     def _cleanup_relations(self, rtype):
   282         """ Cleanup rtype table """
   322         """ Cleanup rtype table """
   283         # Push into relation table while removing duplicate
   323         # Push into relation table while removing duplicate
   284         self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT '
   324         self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT '
   285                  'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T '
   325                  'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T '
   286                  'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE '
   326                  'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE '
   287                  'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype})
   327                  'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype})
   288         # Drop temporary relation table
   328         # Drop temporary relation table
   289         self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
   329         self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
   290 
       
   291     def _insert_etype_metadata(self, etype):
       
   292         """Massive insertion of meta data for a given etype, based on SQL statements.
       
   293         """
       
   294         # insert standard metadata relations
       
   295         for rtype, eid in self.metagen.base_etype_rels(etype).items():
       
   296             self._insert_meta_relation(etype, eid, '%s_relation' % rtype)
       
   297         # insert cw_source, is and is_instance_of relations (normally handled by the system source)
       
   298         self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation')
       
   299         eschema = self.schema[etype]
       
   300         self._insert_meta_relation(etype, eschema.eid, 'is_relation')
       
   301         for parent_eschema in chain(eschema.ancestors(), [eschema]):
       
   302             self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation')
       
   303         # finally insert records into the entities table
       
   304         self.sql("INSERT INTO entities (eid, type) "
       
   305                  "SELECT cw_eid, '%s' FROM cw_%s "
       
   306                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
       
   307                  % (etype, etype.lower()))
       
   308 
   330 
   309     def _insert_meta_relation(self, etype, eid_to, rtype):
   331     def _insert_meta_relation(self, etype, eid_to, rtype):
   310         self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
   332         self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
   311                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
   333                  "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
   312                  % (rtype, eid_to, etype.lower()))
   334                  % (rtype, eid_to, etype.lower()))