dataimport/massive_store.py
changeset 10871 1d4a94d04ec6
parent 10870 9dedf464596b
child 10872 ff4f94cfa2fb
equal deleted inserted replaced
10870:9dedf464596b 10871:1d4a94d04ec6
    81        ...
    81        ...
    82        store.cleanup()
    82        store.cleanup()
    83     """
    83     """
    84 
    84 
    85     def __init__(self, cnx, autoflush_metadata=True,
    85     def __init__(self, cnx, autoflush_metadata=True,
    86                  replace_sep='', commit_at_flush=True,
    86                  commit_at_flush=True,
    87                  iid_maxsize=1024, uri_param_name='rdf:about',
    87                  iid_maxsize=1024, uri_param_name='rdf:about',
    88                  eids_seq_range=10000, eids_seq_start=None,
    88                  eids_seq_range=10000, eids_seq_start=None,
    89                  on_commit_callback=None, on_rollback_callback=None,
    89                  on_commit_callback=None, on_rollback_callback=None,
    90                  slave_mode=False,
    90                  slave_mode=False,
    91                  source=None):
    91                  source=None):
    93 
    93 
    94         - cnx: CubicWeb cnx
    94         - cnx: CubicWeb cnx
    95         - autoflush_metadata: Boolean.
    95         - autoflush_metadata: Boolean.
    96                               Automatically flush the metadata after
    96                               Automatically flush the metadata after
    97                               each flush()
    97                               each flush()
    98         - replace_sep: String. Replace separator used for
       
    99                        (COPY FROM) buffer creation.
       
   100         - commit_at_flush: Boolean. Commit after each flush().
    98         - commit_at_flush: Boolean. Commit after each flush().
   101         - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time
    99         - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time
   102                                by the store (default is 10000).
   100                                by the store (default is 10000).
   103                                If None, the sequence eids is attached to each entity tables
   101                                If None, the sequence eids is attached to each entity tables
   104                                (backward compatibility with the 0.2.0).
   102                                (backward compatibility with the 0.2.0).
   111         super(MassiveObjectStore, self).__init__(cnx)
   109         super(MassiveObjectStore, self).__init__(cnx)
   112         self.logger = logging.getLogger('dataio.relationmixin')
   110         self.logger = logging.getLogger('dataio.relationmixin')
   113         self._cnx = cnx
   111         self._cnx = cnx
   114         self.sql = cnx.system_sql
   112         self.sql = cnx.system_sql
   115         self.iid_maxsize = iid_maxsize
   113         self.iid_maxsize = iid_maxsize
   116         self.replace_sep = replace_sep
       
   117         self.commit_at_flush = commit_at_flush
   114         self.commit_at_flush = commit_at_flush
   118         self._data_uri_relations = defaultdict(list)
   115         self._data_uri_relations = defaultdict(list)
   119         self._initialized = {'init_uri_eid': set(),
   116         self._initialized = {'init_uri_eid': set(),
   120                              'uri_eid_inserted': set(),
   117                              'uri_eid_inserted': set(),
   121                              'uri_rtypes': set(),
   118                              'uri_rtypes': set(),
   123                              'rtypes': set(),
   120                              'rtypes': set(),
   124                             }
   121                             }
   125         self.sql = self._cnx.system_sql
   122         self.sql = self._cnx.system_sql
   126         self.logger = logging.getLogger('dataio.massiveimport')
   123         self.logger = logging.getLogger('dataio.massiveimport')
   127         self.autoflush_metadata = autoflush_metadata
   124         self.autoflush_metadata = autoflush_metadata
   128         self.replace_sep = replace_sep
       
   129         self.slave_mode = slave_mode
   125         self.slave_mode = slave_mode
   130         self.size_constraints = get_size_constraints(cnx.vreg.schema)
   126         self.size_constraints = get_size_constraints(cnx.vreg.schema)
   131         self.default_values = get_default_values(cnx.vreg.schema)
   127         self.default_values = get_default_values(cnx.vreg.schema)
   132         pg_schema = cnx.repo.config.system_source_config.get('db-namespace', 'public')
   128         pg_schema = cnx.repo.config.system_source_config.get('db-namespace', 'public')
   133         self._dbh = PGHelper(self._cnx, pg_schema)
   129         self._dbh = PGHelper(self._cnx, pg_schema)
   527         """
   523         """
   528         for rtype, data in self._data_relations.items():
   524         for rtype, data in self._data_relations.items():
   529             if not data:
   525             if not data:
   530                 # There is no data for these etype for this flush round.
   526                 # There is no data for these etype for this flush round.
   531                 continue
   527                 continue
   532             buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'),
   528             buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'))
   533                                                   replace_sep=self.replace_sep)
       
   534             if not buf:
   529             if not buf:
   535                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
   530                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
   536                 raise ValueError
   531                 raise ValueError
   537             cursor = self._cnx.cnxset.cu
   532             cursor = self._cnx.cnxset.cu
   538             # Push into the tmp table
   533             # Push into the tmp table
   564             _base_data = dict.fromkeys(columns)
   559             _base_data = dict.fromkeys(columns)
   565             for d in data:
   560             for d in data:
   566                 _d = _base_data.copy()
   561                 _d = _base_data.copy()
   567                 _d.update(d)
   562                 _d.update(d)
   568                 _data.append(_d)
   563                 _data.append(_d)
   569             buf = pgstore._create_copyfrom_buffer(_data, columns,
   564             buf = pgstore._create_copyfrom_buffer(_data, columns)
   570                                                   replace_sep=self.replace_sep)
       
   571             if not buf:
   565             if not buf:
   572                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
   566                 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer
   573                 raise ValueError('Error in buffer creation for etype %s' % etype)
   567                 raise ValueError('Error in buffer creation for etype %s' % etype)
   574             columns = ['cw_%s' % attr for attr in columns]
   568             columns = ['cw_%s' % attr for attr in columns]
   575             cursor = self._cnx.cnxset.cu
   569             cursor = self._cnx.cnxset.cu