87 self.logger = logging.getLogger('dataimport.massive_store') |
88 self.logger = logging.getLogger('dataimport.massive_store') |
88 self.sql = cnx.system_sql |
89 self.sql = cnx.system_sql |
89 self.schema = cnx.vreg.schema |
90 self.schema = cnx.vreg.schema |
90 self.default_values = get_default_values(self.schema) |
91 self.default_values = get_default_values(self.schema) |
91 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
92 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
|
93 self._source_dbhelper = cnx.repo.system_source.dbhelper |
92 self._dbh = PGHelper(cnx) |
94 self._dbh = PGHelper(cnx) |
93 |
95 |
94 self._data_entities = defaultdict(list) |
96 self._data_entities = defaultdict(list) |
95 self._data_relations = defaultdict(list) |
97 self._data_relations = defaultdict(list) |
96 self._initialized = set() |
98 self._initialized = {} |
97 self._constraints_dropped = self.slave_mode |
|
98 |
99 |
99 def _get_eid_gen(self): |
100 def _get_eid_gen(self): |
100 """ Function getting the next eid. This is done by preselecting |
101 """ Function getting the next eid. This is done by preselecting |
101 a given number of eids from the 'entities_id_seq', and then |
102 a given number of eids from the 'entities_id_seq', and then |
102 storing them""" |
103 storing them""" |
114 """ |
115 """ |
115 assert not self.slave_mode |
116 assert not self.slave_mode |
116 if self not in self._initialized: |
117 if self not in self._initialized: |
117 self.sql('CREATE TABLE cwmassive_initialized' |
118 self.sql('CREATE TABLE cwmassive_initialized' |
118 '(retype text, type varchar(128), uuid varchar(32))') |
119 '(retype text, type varchar(128), uuid varchar(32))') |
119 self._initialized.append(self) |
120 self._initialized[self] = None |
120 |
|
121 def master_init_etype(self, etype): |
|
122 """Initialize database for insertion of entities of the given etype. |
|
123 |
|
124 This is expected to be called once, usually by the master store in master/slaves |
|
125 configuration. |
|
126 """ |
|
127 self._drop_metadata_constraints_if_necessary() |
|
128 tablename = 'cw_%s' % etype.lower() |
|
129 self._dbh.drop_constraints(tablename) |
|
130 self._dbh.drop_indexes(tablename) |
|
131 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
|
132 '(retype text, type varchar(128), uuid varchar(32))') |
|
133 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", |
|
134 {'e': etype, 'uuid': self.uuid}) |
|
135 |
|
136 def master_insert_etype_metadata(self, etype): |
|
137 """Massive insertion of meta data for a given etype, based on SQL statements. |
|
138 |
|
139 In master/slabes configuration, you'll usually want to call it from the master once all |
|
140 slaves have finished (at least slaves won't call it automatically, so that's your |
|
141 reponsability). |
|
142 """ |
|
143 # insert standard metadata relations |
|
144 for rtype, eid in self.metagen.base_etype_rels(etype).items(): |
|
145 self._insert_meta_relation(etype, eid, '%s_relation' % rtype) |
|
146 # insert cw_source, is and is_instance_of relations (normally handled by the system source) |
|
147 self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation') |
|
148 eschema = self.schema[etype] |
|
149 self._insert_meta_relation(etype, eschema.eid, 'is_relation') |
|
150 for parent_eschema in chain(eschema.ancestors(), [eschema]): |
|
151 self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') |
|
152 # finally insert records into the entities table |
|
153 self.sql("INSERT INTO entities (eid, type, extid) " |
|
154 "SELECT cw_eid, '%s', extid FROM cw_%s " |
|
155 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
156 % (etype, etype.lower())) |
|
157 |
121 |
158 # SQL utilities ######################################################### |
122 # SQL utilities ######################################################### |
159 |
|
160 def _drop_metadata_constraints_if_necessary(self): |
|
161 """Drop constraints and indexes for the metadata tables if necessary.""" |
|
162 if not self._constraints_dropped: |
|
163 self._drop_metadata_constraints() |
|
164 self._constraints_dropped = True |
|
165 |
123 |
166 def _drop_metadata_constraints(self): |
124 def _drop_metadata_constraints(self): |
167 """Drop constraints and indexes for the metadata tables. |
125 """Drop constraints and indexes for the metadata tables. |
168 |
126 |
169 They will be recreated by the `finish` method. |
127 They will be recreated by the `finish` method. |
187 |
145 |
188 def prepare_insert_entity(self, etype, **data): |
146 def prepare_insert_entity(self, etype, **data): |
189 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
147 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
190 eid. |
148 eid. |
191 """ |
149 """ |
192 if not self.slave_mode and etype not in self._initialized: |
150 if etype not in self._initialized: |
193 self._initialized.add(etype) |
151 if not self.slave_mode: |
194 self.master_init_etype(etype) |
152 self.master_init() |
|
153 tablename = 'cw_%s' % etype.lower() |
|
154 tmp_tablename = '%s_%s' % (tablename, self.uuid) |
|
155 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype', %(uuid)s)", |
|
156 {'e': etype, 'uuid': self.uuid}) |
|
157 attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype]) |
|
158 self.sql('CREATE TABLE %s(%s);' % (tmp_tablename, |
|
159 ', '.join('cw_%s %s' % (column, sqltype) |
|
160 for column, sqltype in attr_defs))) |
|
161 self._initialized[etype] = [attr for attr, _ in attr_defs] |
|
162 |
195 if 'eid' not in data: |
163 if 'eid' not in data: |
196 # If eid is not given and the eids sequence is set, use the value from the sequence |
164 # If eid is not given and the eids sequence is set, use the value from the sequence |
197 eid = self.get_next_eid() |
165 eid = self.get_next_eid() |
198 data['eid'] = eid |
166 data['eid'] = eid |
199 self._data_entities[etype].append(data) |
167 self._data_entities[etype].append(data) |
207 """ |
175 """ |
208 if rtype not in self._initialized: |
176 if rtype not in self._initialized: |
209 if not self.slave_mode: |
177 if not self.slave_mode: |
210 self.master_init() |
178 self.master_init() |
211 assert not self._cnx.vreg.schema.rschema(rtype).inlined |
179 assert not self._cnx.vreg.schema.rschema(rtype).inlined |
212 self._initialized.add(rtype) |
180 self._initialized[rtype] = None |
213 tablename = '%s_relation' % rtype.lower() |
181 tablename = '%s_relation' % rtype.lower() |
214 tmp_tablename = '%s_%s' % (tablename, self.uuid) |
182 tmp_tablename = '%s_%s' % (tablename, self.uuid) |
215 self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)", |
183 self.sql("INSERT INTO cwmassive_initialized VALUES (%(r)s, 'rtype', %(uuid)s)", |
216 {'r': rtype, 'uuid': self.uuid}) |
184 {'r': rtype, 'uuid': self.uuid}) |
217 self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename) |
185 self.sql('CREATE TABLE %s(eid_from integer, eid_to integer)' % tmp_tablename) |
232 assert not self.slave_mode, 'finish method should only be called by the master store' |
200 assert not self.slave_mode, 'finish method should only be called by the master store' |
233 self.logger.info("Start cleaning") |
201 self.logger.info("Start cleaning") |
234 # Get all the initialized etypes/rtypes |
202 # Get all the initialized etypes/rtypes |
235 if self._dbh.table_exists('cwmassive_initialized'): |
203 if self._dbh.table_exists('cwmassive_initialized'): |
236 cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized') |
204 cu = self.sql('SELECT retype, type, uuid FROM cwmassive_initialized') |
|
205 entities = defaultdict(list) |
237 relations = defaultdict(list) |
206 relations = defaultdict(list) |
238 for retype, _type, uuid in cu.fetchall(): |
207 for retype, _type, uuid in cu.fetchall(): |
239 if _type == 'rtype': |
208 if _type == 'rtype': |
240 relations[retype].append(uuid) |
209 relations[retype].append(uuid) |
|
210 else: # _type = 'etype' |
|
211 entities[retype].append(uuid) |
|
212 # if there is some entities to insert, delete constraint on metadata tables once for all |
|
213 if entities: |
|
214 self._drop_metadata_constraints() |
|
215 # get back entity data from the temporary tables |
|
216 for etype, uuids in entities.items(): |
|
217 tablename = 'cw_%s' % etype.lower() |
|
218 attr_defs = eschema_sql_def(self._source_dbhelper, self.schema[etype]) |
|
219 columns = ','.join('cw_%s' % attr for attr, _ in attr_defs) |
|
220 self._dbh.drop_constraints(tablename) |
|
221 self._dbh.drop_indexes(tablename) |
|
222 for uuid in uuids: |
|
223 tmp_tablename = '%s_%s' % (tablename, uuid) |
|
224 self.sql('INSERT INTO %(table)s(%(columns)s) ' |
|
225 'SELECT %(columns)s FROM %(tmp_table)s' |
|
226 % {'table': tablename, 'tmp_table': tmp_tablename, |
|
227 'columns': columns}) |
|
228 self._insert_etype_metadata(etype, tmp_tablename) |
|
229 self._tmp_data_cleanup(tmp_tablename, etype, uuid) |
241 # get back relation data from the temporary tables |
230 # get back relation data from the temporary tables |
242 for rtype, uuids in relations.items(): |
231 for rtype, uuids in relations.items(): |
243 tablename = '%s_relation' % rtype.lower() |
232 tablename = '%s_relation' % rtype.lower() |
244 self._dbh.drop_constraints(tablename) |
233 self._dbh.drop_constraints(tablename) |
245 self._dbh.drop_indexes(tablename) |
234 self._dbh.drop_indexes(tablename) |
249 self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT ' |
238 self.sql('INSERT INTO %(table)s(eid_from, eid_to) SELECT DISTINCT ' |
250 'T.eid_from, T.eid_to FROM %(tmp_table)s AS T ' |
239 'T.eid_from, T.eid_to FROM %(tmp_table)s AS T ' |
251 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE ' |
240 'WHERE NOT EXISTS (SELECT 1 FROM %(table)s AS TT WHERE ' |
252 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' |
241 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' |
253 % {'table': tablename, 'tmp_table': tmp_tablename}) |
242 % {'table': tablename, 'tmp_table': tmp_tablename}) |
254 # Drop temporary relation table and record from cwmassive_initialized |
243 self._tmp_data_cleanup(tmp_tablename, rtype, uuid) |
255 self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) |
|
256 self.sql('DELETE FROM cwmassive_initialized ' |
|
257 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', |
|
258 {'rtype': retype, 'uuid': uuid}) |
|
259 # restore all deleted indexes and constraints |
244 # restore all deleted indexes and constraints |
260 self._dbh.restore_indexes_and_constraints() |
245 self._dbh.restore_indexes_and_constraints() |
261 # delete the meta data table |
246 # delete the meta data table |
262 self.sql('DROP TABLE IF EXISTS cwmassive_initialized') |
247 self.sql('DROP TABLE IF EXISTS cwmassive_initialized') |
263 self.commit() |
248 self.commit() |
|
249 |
|
250 def _insert_etype_metadata(self, etype, tmp_tablename): |
|
251 """Massive insertion of meta data for `etype`, with new entities in `tmp_tablename`. |
|
252 """ |
|
253 # insert standard metadata relations |
|
254 for rtype, eid in self.metagen.base_etype_rels(etype).items(): |
|
255 self._insert_meta_relation(tmp_tablename, rtype, eid) |
|
256 # insert cw_source, is and is_instance_of relations (normally handled by the system source) |
|
257 self._insert_meta_relation(tmp_tablename, 'cw_source', self.metagen.source.eid) |
|
258 eschema = self.schema[etype] |
|
259 self._insert_meta_relation(tmp_tablename, 'is', eschema.eid) |
|
260 for parent_eschema in chain(eschema.ancestors(), [eschema]): |
|
261 self._insert_meta_relation(tmp_tablename, 'is_instance_of', parent_eschema.eid) |
|
262 # finally insert records into the entities table |
|
263 self.sql("INSERT INTO entities(eid, type) " |
|
264 "SELECT cw_eid, '%s' FROM %s " |
|
265 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
266 % (etype, tmp_tablename)) |
|
267 |
|
268 def _insert_meta_relation(self, tmp_tablename, rtype, eid_to): |
|
269 self.sql("INSERT INTO %s_relation(eid_from, eid_to) SELECT cw_eid, %s FROM %s " |
|
270 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
271 % (rtype, eid_to, tmp_tablename)) |
|
272 |
|
273 def _tmp_data_cleanup(self, tmp_tablename, ertype, uuid): |
|
274 """Drop temporary relation table and record from cwmassive_initialized.""" |
|
275 self.sql('DROP TABLE %(tmp_table)s' % {'tmp_table': tmp_tablename}) |
|
276 self.sql('DELETE FROM cwmassive_initialized ' |
|
277 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', |
|
278 {'rtype': ertype, 'uuid': uuid}) |
264 |
279 |
265 # FLUSH ################################################################# |
280 # FLUSH ################################################################# |
266 |
281 |
267 def on_commit(self): |
282 def on_commit(self): |
268 if self.on_commit_callback: |
283 if self.on_commit_callback: |
294 metagen = self.metagen |
309 metagen = self.metagen |
295 for etype, data in self._data_entities.items(): |
310 for etype, data in self._data_entities.items(): |
296 if not data: |
311 if not data: |
297 # There is no data for these etype for this flush round. |
312 # There is no data for these etype for this flush round. |
298 continue |
313 continue |
299 # XXX It may be interresting to directly infer the columns' names from the schema |
314 attrs = self._initialized[etype] |
300 # XXX For now, the _create_copyfrom_buffer does a "row[column]" |
315 _base_data = dict.fromkeys(attrs) |
301 # which can lead to a key error. |
|
302 # Thus we should create dictionary with all the keys. |
|
303 columns = set() |
|
304 for d in data: |
|
305 columns.update(d) |
|
306 _base_data = dict.fromkeys(columns) |
|
307 _base_data.update(self.default_values[etype]) |
316 _base_data.update(self.default_values[etype]) |
308 _base_data.update(metagen.base_etype_attrs(etype)) |
317 _base_data.update(metagen.base_etype_attrs(etype)) |
309 _data = [] |
318 _data = [] |
310 for d in data: |
319 for d in data: |
|
320 # do this first on `d`, because it won't fill keys associated to None as provided by |
|
321 # `_base_data` |
|
322 metagen.init_entity_attrs(etype, d['eid'], d) |
|
323 # XXX warn/raise if there is some key not in attrs? |
311 _d = _base_data.copy() |
324 _d = _base_data.copy() |
312 _d.update(d) |
325 _d.update(d) |
313 metagen.init_entity_attrs(etype, _d['eid'], _d) |
|
314 _data.append(_d) |
326 _data.append(_d) |
315 buf = pgstore._create_copyfrom_buffer(_data, columns) |
327 buf = pgstore._create_copyfrom_buffer(_data, attrs) |
316 columns = ['cw_%s' % attr for attr in columns] |
328 tablename = 'cw_%s' % etype.lower() |
|
329 tmp_tablename = '%s_%s' % (tablename, self.uuid) |
|
330 columns = ['cw_%s' % attr for attr in attrs] |
317 cursor = self._cnx.cnxset.cu |
331 cursor = self._cnx.cnxset.cu |
318 try: |
332 try: |
319 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns) |
333 cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns) |
320 except Exception as exc: |
334 except Exception as exc: |
321 self.on_rollback(exc, etype, data) |
335 self.on_rollback(exc, etype, data) |
322 # Clear data cache |
336 # Clear data cache |
323 self._data_entities[etype] = [] |
337 self._data_entities[etype] = [] |
324 if not self.slave_mode: |
|
325 self.master_insert_etype_metadata(etype) |
|
326 |
|
327 def _insert_meta_relation(self, etype, eid_to, rtype): |
|
328 self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " |
|
329 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
330 % (rtype, eid_to, etype.lower())) |
|
331 |
338 |
332 |
339 |
333 def get_default_values(schema): |
340 def get_default_values(schema): |
334 """analyzes yams ``schema`` and returns the list of default values. |
341 """analyzes yams ``schema`` and returns the list of default values. |
335 |
342 |