dataimport/massive_store.py
changeset 10869 575982c948a9
parent 10867 ca73ee6d24ad
child 10870 9dedf464596b
equal deleted inserted replaced
10868:ffb5b6c25cec 10869:575982c948a9
    82        store.cleanup()
    82        store.cleanup()
    83     """
    83     """
    84 
    84 
    85     def __init__(self, cnx, autoflush_metadata=True,
    85     def __init__(self, cnx, autoflush_metadata=True,
    86                  replace_sep='', commit_at_flush=True,
    86                  replace_sep='', commit_at_flush=True,
    87                  drop_index=True,
       
    88                  pg_schema='public',
    87                  pg_schema='public',
    89                  iid_maxsize=1024, uri_param_name='rdf:about',
    88                  iid_maxsize=1024, uri_param_name='rdf:about',
    90                  eids_seq_range=10000, eids_seq_start=None,
    89                  eids_seq_range=10000, eids_seq_start=None,
    91                  on_commit_callback=None, on_rollback_callback=None,
    90                  on_commit_callback=None, on_rollback_callback=None,
    92                  slave_mode=False,
    91                  slave_mode=False,
    98                               Automatically flush the metadata after
    97                               Automatically flush the metadata after
    99                               each flush()
    98                               each flush()
   100         - replace_sep: String. Replace separator used for
    99         - replace_sep: String. Replace separator used for
   101                        (COPY FROM) buffer creation.
   100                        (COPY FROM) buffer creation.
   102         - commit_at_flush: Boolean. Commit after each flush().
   101         - commit_at_flush: Boolean. Commit after each flush().
   103         - drop_index: Boolean. Drop SQL index before COPY FROM
       
   104         - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time
   102         - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time
   105                                by the store (default is 10000).
   103                                by the store (default is 10000).
   106                                If None, the sequence eids is attached to each entity tables
   104                                If None, the sequence eids is attached to each entity tables
   107                                (backward compatibility with the 0.2.0).
   105                                (backward compatibility with the 0.2.0).
   108         - eids_seq_start: Int. Set the eids sequence value (if None, nothing is done).
   106         - eids_seq_start: Int. Set the eids sequence value (if None, nothing is done).
   127                             }
   125                             }
   128         self.sql = self._cnx.system_sql
   126         self.sql = self._cnx.system_sql
   129         self.logger = logging.getLogger('dataio.massiveimport')
   127         self.logger = logging.getLogger('dataio.massiveimport')
   130         self.autoflush_metadata = autoflush_metadata
   128         self.autoflush_metadata = autoflush_metadata
   131         self.replace_sep = replace_sep
   129         self.replace_sep = replace_sep
   132         self.drop_index = drop_index
       
   133         self.slave_mode = slave_mode
   130         self.slave_mode = slave_mode
   134         self.size_constraints = get_size_constraints(cnx.vreg.schema)
   131         self.size_constraints = get_size_constraints(cnx.vreg.schema)
   135         self.default_values = get_default_values(cnx.vreg.schema)
   132         self.default_values = get_default_values(cnx.vreg.schema)
   136         self._dbh = PGHelper(self._cnx, pg_schema or 'public')
   133         self._dbh = PGHelper(self._cnx, pg_schema or 'public')
   137         self._data_entities = defaultdict(list)
   134         self._data_entities = defaultdict(list)
   152             self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
   149             self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
   153                 'entities_id_seq', initial_value=self._eids_seq_start + 1))
   150                 'entities_id_seq', initial_value=self._eids_seq_start + 1))
   154             cnx.commit()
   151             cnx.commit()
   155         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
   152         self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
   156         # recreate then when self.finish() is called
   153         # recreate then when self.finish() is called
   157         if not self.slave_mode and self.drop_index:
   154         if not self.slave_mode:
   158             self._drop_metatables_constraints()
   155             self._drop_metatables_constraints()
   159         if source is None:
   156         if source is None:
   160             source = cnx.repo.system_source
   157             source = cnx.repo.system_source
   161         self.source = source
   158         self.source = source
   162         self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
   159         self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN'))
   350         """ Create all the constraints for the meta data"""
   347         """ Create all the constraints for the meta data"""
   351         for tablename in ('entities',
   348         for tablename in ('entities',
   352                           'created_by_relation', 'owned_by_relation',
   349                           'created_by_relation', 'owned_by_relation',
   353                           'is_instance_of_relation', 'is_relation'):
   350                           'is_instance_of_relation', 'is_relation'):
   354             # Indexes and constraints
   351             # Indexes and constraints
   355             if self.drop_index:
   352             self.reapply_constraint_index(tablename)
   356                 self.reapply_constraint_index(tablename)
       
   357 
   353 
   358     def init_relation_table(self, rtype):
   354     def init_relation_table(self, rtype):
   359         """ Get and remove all indexes for performance sake """
   355         """ Get and remove all indexes for performance sake """
   360         # Create temporary table
   356         # Create temporary table
   361         if not self.slave_mode and rtype not in self._initialized['rtypes']:
   357         if not self.slave_mode and rtype not in self._initialized['rtypes']:
   362             sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower()
   358             sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower()
   363             self.sql(sql)
   359             self.sql(sql)
   364             if self.drop_index:
   360             # Drop indexes and constraints
   365                 # Drop indexes and constraints
   361             tablename = '%s_relation' % rtype.lower()
   366                 tablename = '%s_relation' % rtype.lower()
   362             self.drop_and_store_indexes_constraints(tablename)
   367                 self.drop_and_store_indexes_constraints(tablename)
       
   368             # Push the etype in the initialized table for easier restart
   363             # Push the etype in the initialized table for easier restart
   369             self.init_create_initialized_table()
   364             self.init_create_initialized_table()
   370             sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
   365             sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
   371             self.sql(sql, {'e': rtype, 't': 'rtype'})
   366             self.sql(sql, {'e': rtype, 't': 'rtype'})
   372             # Mark rtype as "initialized" for faster check
   367             # Mark rtype as "initialized" for faster check
   390                     # Eids are directly set by the entities_id_seq.
   385                     # Eids are directly set by the entities_id_seq.
   391                     # We attach this sequence to all the created etypes.
   386                     # We attach this sequence to all the created etypes.
   392                     sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid "
   387                     sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid "
   393                            "SET DEFAULT nextval('entities_id_seq')" % etype.lower())
   388                            "SET DEFAULT nextval('entities_id_seq')" % etype.lower())
   394                     self.sql(sql)
   389                     self.sql(sql)
   395                 if self.drop_index:
   390                 # Drop indexes and constraints
   396                     # Drop indexes and constraints
   391                 tablename = 'cw_%s' % etype.lower()
   397                     tablename = 'cw_%s' % etype.lower()
   392                 self.drop_and_store_indexes_constraints(tablename)
   398                     self.drop_and_store_indexes_constraints(tablename)
       
   399                 # Push the etype in the initialized table for easier restart
   393                 # Push the etype in the initialized table for easier restart
   400                 self.init_create_initialized_table()
   394                 self.init_create_initialized_table()
   401                 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
   395                 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)'
   402                 self.sql(sql, {'e': etype, 't': 'etype'})
   396                 self.sql(sql, {'e': etype, 't': 'etype'})
   403             # Mark etype as "initialized" for faster check
   397             # Mark etype as "initialized" for faster check
   624         if self._eids_seq_range is None:
   618         if self._eids_seq_range is None:
   625             # Remove DEFAULT eids sequence if added
   619             # Remove DEFAULT eids sequence if added
   626             sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower()
   620             sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower()
   627             self.sql(sql)
   621             self.sql(sql)
   628         # Create indexes and constraints
   622         # Create indexes and constraints
   629         if self.drop_index:
   623         tablename = SQL_PREFIX + etype.lower()
   630             tablename = SQL_PREFIX + etype.lower()
   624         self.reapply_constraint_index(tablename)
   631             self.reapply_constraint_index(tablename)
       
   632 
   625 
   633     def _cleanup_relations(self, rtype):
   626     def _cleanup_relations(self, rtype):
   634         """ Cleanup rtype table """
   627         """ Cleanup rtype table """
   635         # Push into relation table while removing duplicate
   628         # Push into relation table while removing duplicate
   636         sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
   629         sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT
   640         self.sql(sql)
   633         self.sql(sql)
   641         # Drop temporary relation table
   634         # Drop temporary relation table
   642         sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
   635         sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()})
   643         self.sql(sql)
   636         self.sql(sql)
   644         # Create indexes and constraints
   637         # Create indexes and constraints
   645         if self.drop_index:
   638         tablename = '%s_relation' % rtype.lower()
   646             tablename = '%s_relation' % rtype.lower()
   639         self.reapply_constraint_index(tablename)
   647             self.reapply_constraint_index(tablename)
       
   648 
   640 
   649     def insert_massive_meta_data(self, etype):
   641     def insert_massive_meta_data(self, etype):
   650         """ Massive insertion of meta data for a given etype, based on SQL statements.
   642         """ Massive insertion of meta data for a given etype, based on SQL statements.
   651         """
   643         """
   652         # Push data - Use coalesce to avoid NULL (and get 0), if there is no
   644         # Push data - Use coalesce to avoid NULL (and get 0), if there is no