equal
deleted
inserted
replaced
81 ... |
81 ... |
82 store.cleanup() |
82 store.cleanup() |
83 """ |
83 """ |
84 |
84 |
85 def __init__(self, cnx, autoflush_metadata=True, |
85 def __init__(self, cnx, autoflush_metadata=True, |
86 replace_sep='', commit_at_flush=True, |
86 commit_at_flush=True, |
87 iid_maxsize=1024, uri_param_name='rdf:about', |
87 iid_maxsize=1024, uri_param_name='rdf:about', |
88 eids_seq_range=10000, eids_seq_start=None, |
88 eids_seq_range=10000, eids_seq_start=None, |
89 on_commit_callback=None, on_rollback_callback=None, |
89 on_commit_callback=None, on_rollback_callback=None, |
90 slave_mode=False, |
90 slave_mode=False, |
91 source=None): |
91 source=None): |
93 |
93 |
94 - cnx: CubicWeb cnx |
94 - cnx: CubicWeb cnx |
95 - autoflush_metadata: Boolean. |
95 - autoflush_metadata: Boolean. |
96 Automatically flush the metadata after |
96 Automatically flush the metadata after |
97 each flush() |
97 each flush() |
98 - replace_sep: String. Replace separator used for |
|
99 (COPY FROM) buffer creation. |
|
100 - commit_at_flush: Boolean. Commit after each flush(). |
98 - commit_at_flush: Boolean. Commit after each flush(). |
101 - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time |
99 - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time |
102 by the store (default is 10000). |
100 by the store (default is 10000). |
103 If None, the sequence eids is attached to each entity tables |
101 If None, the sequence eids is attached to each entity tables |
104 (backward compatibility with the 0.2.0). |
102 (backward compatibility with the 0.2.0). |
111 super(MassiveObjectStore, self).__init__(cnx) |
109 super(MassiveObjectStore, self).__init__(cnx) |
112 self.logger = logging.getLogger('dataio.relationmixin') |
110 self.logger = logging.getLogger('dataio.relationmixin') |
113 self._cnx = cnx |
111 self._cnx = cnx |
114 self.sql = cnx.system_sql |
112 self.sql = cnx.system_sql |
115 self.iid_maxsize = iid_maxsize |
113 self.iid_maxsize = iid_maxsize |
116 self.replace_sep = replace_sep |
|
117 self.commit_at_flush = commit_at_flush |
114 self.commit_at_flush = commit_at_flush |
118 self._data_uri_relations = defaultdict(list) |
115 self._data_uri_relations = defaultdict(list) |
119 self._initialized = {'init_uri_eid': set(), |
116 self._initialized = {'init_uri_eid': set(), |
120 'uri_eid_inserted': set(), |
117 'uri_eid_inserted': set(), |
121 'uri_rtypes': set(), |
118 'uri_rtypes': set(), |
123 'rtypes': set(), |
120 'rtypes': set(), |
124 } |
121 } |
125 self.sql = self._cnx.system_sql |
122 self.sql = self._cnx.system_sql |
126 self.logger = logging.getLogger('dataio.massiveimport') |
123 self.logger = logging.getLogger('dataio.massiveimport') |
127 self.autoflush_metadata = autoflush_metadata |
124 self.autoflush_metadata = autoflush_metadata |
128 self.replace_sep = replace_sep |
|
129 self.slave_mode = slave_mode |
125 self.slave_mode = slave_mode |
130 self.size_constraints = get_size_constraints(cnx.vreg.schema) |
126 self.size_constraints = get_size_constraints(cnx.vreg.schema) |
131 self.default_values = get_default_values(cnx.vreg.schema) |
127 self.default_values = get_default_values(cnx.vreg.schema) |
132 pg_schema = cnx.repo.config.system_source_config.get('db-namespace', 'public') |
128 pg_schema = cnx.repo.config.system_source_config.get('db-namespace', 'public') |
133 self._dbh = PGHelper(self._cnx, pg_schema) |
129 self._dbh = PGHelper(self._cnx, pg_schema) |
527 """ |
523 """ |
528 for rtype, data in self._data_relations.items(): |
524 for rtype, data in self._data_relations.items(): |
529 if not data: |
525 if not data: |
530 # There is no data for these etype for this flush round. |
526 # There is no data for these etype for this flush round. |
531 continue |
527 continue |
532 buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'), |
528 buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to')) |
533 replace_sep=self.replace_sep) |
|
534 if not buf: |
529 if not buf: |
535 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
530 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
536 raise ValueError |
531 raise ValueError |
537 cursor = self._cnx.cnxset.cu |
532 cursor = self._cnx.cnxset.cu |
538 # Push into the tmp table |
533 # Push into the tmp table |
564 _base_data = dict.fromkeys(columns) |
559 _base_data = dict.fromkeys(columns) |
565 for d in data: |
560 for d in data: |
566 _d = _base_data.copy() |
561 _d = _base_data.copy() |
567 _d.update(d) |
562 _d.update(d) |
568 _data.append(_d) |
563 _data.append(_d) |
569 buf = pgstore._create_copyfrom_buffer(_data, columns, |
564 buf = pgstore._create_copyfrom_buffer(_data, columns) |
570 replace_sep=self.replace_sep) |
|
571 if not buf: |
565 if not buf: |
572 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
566 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
573 raise ValueError('Error in buffer creation for etype %s' % etype) |
567 raise ValueError('Error in buffer creation for etype %s' % etype) |
574 columns = ['cw_%s' % attr for attr in columns] |
568 columns = ['cw_%s' % attr for attr in columns] |
575 cursor = self._cnx.cnxset.cu |
569 cursor = self._cnx.cnxset.cu |