1 # coding: utf-8 |
|
2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY |
|
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR |
|
14 # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 |
|
20 import logging |
|
21 from datetime import datetime |
|
22 from collections import defaultdict |
|
23 from io import StringIO |
|
24 |
|
25 from six.moves import range |
|
26 |
|
27 from yams.constraints import SizeConstraint |
|
28 |
|
29 from psycopg2 import ProgrammingError |
|
30 |
|
31 from cubicweb.server.schema2sql import rschema_has_table |
|
32 from cubicweb.schema import PURE_VIRTUAL_RTYPES |
|
33 from cubicweb.dataimport import stores, pgstore |
|
34 from cubicweb.utils import make_uid |
|
35 from cubicweb.server.sqlutils import SQL_PREFIX |
|
36 |
|
37 |
|
38 class MassiveObjectStore(stores.RQLObjectStore): |
|
39 """ |
|
40 Store for massive import of data, with delayed insertion of meta data. |
|
41 |
|
42 WARNINGS: |
|
43 - This store may be only used with PostgreSQL for now, as it relies |
|
44 on the COPY FROM method, and on specific PostgreSQL tables to get all |
|
45 the indexes. |
|
46 - This store can only insert relations that are not inlined (i.e., |
|
47 which do *not* have inlined=True in their definition in the schema). |
|
48 |
|
49 It should be used as follows: |
|
50 |
|
51 store = MassiveObjectStore(cnx) |
|
52 store.init_rtype_table('Person', 'lives_in', 'Location') |
|
53 ... |
|
54 |
|
55 store.prepare_insert_entity('Person', subj_iid_attribute=person_iid, ...) |
|
56 store.prepare_insert_entity('Location', obj_iid_attribute=location_iid, ...) |
|
57 ... |
|
58 |
|
59 # subj_iid_attribute and obj_iid_attribute are argument names |
|
60 # chosen by the user (e.g. "cwuri"). These names can be identical. |
|
61 # person_iid and location_iid are unique IDs and depend on the data |
|
62 # (e.g URI). |
|
63 store.flush() |
|
64 store.relate_by_iid(person_iid, 'lives_in', location_iid) |
|
65 # For example: |
|
66 store.prepare_insert_entity('Person', |
|
67 cwuri='http://dbpedia.org/toto', |
|
68 name='Toto') |
|
69 store.prepare_insert_entity('Location', |
|
70 uri='http://geonames.org/11111', |
|
71 name='Somewhere') |
|
72 store.flush() |
|
73 store.relate_by_iid('http://dbpedia.org/toto', |
|
74 'lives_in', |
|
75 'http://geonames.org/11111') |
|
76 # Finally |
|
77 store.convert_relations('Person', 'lives_in', 'Location', |
|
78 'subj_iid_attribute', 'obj_iid_attribute') |
|
79 # For the previous example: |
|
80 store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri') |
|
81 ... |
|
82 store.commit() |
|
83 store.finish() |
|
84 """ |
|
85 # max size of the iid, used to create the iid_eid conversion table |
|
86 iid_maxsize = 1024 |
|
87 |
|
88 def __init__(self, cnx, |
|
89 on_commit_callback=None, on_rollback_callback=None, |
|
90 slave_mode=False, |
|
91 source=None, |
|
92 eids_seq_range=10000): |
|
93 """ Create a MassiveObject store, with the following attributes: |
|
94 |
|
95 - cnx: CubicWeb cnx |
|
96 - eids_seq_range: size of eid range reserved by the store for each batch |
|
97 """ |
|
98 super(MassiveObjectStore, self).__init__(cnx) |
|
99 self.logger = logging.getLogger('dataimport.massive_store') |
|
100 self._cnx = cnx |
|
101 self.sql = cnx.system_sql |
|
102 self._data_uri_relations = defaultdict(list) |
|
103 self.eids_seq_range = eids_seq_range |
|
104 |
|
105 # etypes for which we have a uri_eid_%(etype)s table |
|
106 self._init_uri_eid = set() |
|
107 # etypes for which we have a uri_eid_%(e)s_idx index |
|
108 self._uri_eid_inserted = set() |
|
109 # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table |
|
110 self._uri_rtypes = set() |
|
111 # set of etypes whose tables are created |
|
112 self._entities = set() |
|
113 # set of rtypes for which we have a %(rtype)s_relation_tmp table |
|
114 self._rtypes = set() |
|
115 |
|
116 self.slave_mode = slave_mode |
|
117 self.default_values = get_default_values(cnx.vreg.schema) |
|
118 pg_schema = cnx.repo.config.system_source_config.get('db-namespace') or 'public' |
|
119 self._dbh = PGHelper(self._cnx, pg_schema) |
|
120 self._data_entities = defaultdict(list) |
|
121 self._data_relations = defaultdict(list) |
|
122 self._now = datetime.utcnow() |
|
123 self._default_cwuri = make_uid('_auto_generated') |
|
124 self._count_cwuri = 0 |
|
125 self.on_commit_callback = on_commit_callback |
|
126 self.on_rollback_callback = on_rollback_callback |
|
127 # Do our meta tables already exist? |
|
128 self._init_massive_metatables() |
|
129 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
|
130 # recreate then when self.finish() is called |
|
131 |
|
132 if not self.slave_mode: |
|
133 self._drop_all_constraints() |
|
134 self._drop_metatables_constraints() |
|
135 if source is None: |
|
136 source = cnx.repo.system_source |
|
137 self.source = source |
|
138 self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN')) |
|
139 cnx.read_security = False |
|
140 cnx.write_security = False |
|
141 |
|
142 ### INIT FUNCTIONS ######################################################## |
|
143 |
|
144 def _drop_all_constraints(self): |
|
145 schema = self._cnx.vreg.schema |
|
146 tables = ['cw_%s' % etype.type.lower() |
|
147 for etype in schema.entities() if not etype.final] |
|
148 for rschema in schema.relations(): |
|
149 if rschema.inlined: |
|
150 continue |
|
151 elif rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES): |
|
152 tables.append('%s_relation' % rschema.type.lower()) |
|
153 tables.append('entities') |
|
154 for tablename in tables: |
|
155 self._store_and_drop_constraints(tablename) |
|
156 |
|
157 def _store_and_drop_constraints(self, tablename): |
|
158 if not self._constraint_table_created: |
|
159 # Create a table to save the constraints |
|
160 # Allow reload even after crash |
|
161 sql = "CREATE TABLE cwmassive_constraints (origtable text, query text, type varchar(256))" |
|
162 self.sql(sql) |
|
163 self._constraint_table_created = True |
|
164 constraints = self._dbh.application_constraints(tablename) |
|
165 for name, query in constraints.items(): |
|
166 sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)' |
|
167 self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'}) |
|
168 sql = 'ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name) |
|
169 self.sql(sql) |
|
170 |
|
171 def reapply_all_constraints(self): |
|
172 if not self._dbh.table_exists('cwmassive_constraints'): |
|
173 self.logger.info('The table cwmassive_constraints does not exist') |
|
174 return |
|
175 sql = 'SELECT query FROM cwmassive_constraints WHERE type = %(t)s' |
|
176 crs = self.sql(sql, {'t': 'constraint'}) |
|
177 for query, in crs.fetchall(): |
|
178 self.sql(query) |
|
179 self.sql('DELETE FROM cwmassive_constraints WHERE type = %(t)s ' |
|
180 'AND query = %(q)s', {'t': 'constraint', 'q': query}) |
|
181 |
|
182 def init_rtype_table(self, etype_from, rtype, etype_to): |
|
183 """ Build temporary table for standard rtype """ |
|
184 # Create an uri_eid table for each etype for a better |
|
185 # control of which etype is concerned by a particular |
|
186 # possibly multivalued relation. |
|
187 for etype in (etype_from, etype_to): |
|
188 if etype and etype not in self._init_uri_eid: |
|
189 self._init_uri_eid_table(etype) |
|
190 if rtype not in self._uri_rtypes: |
|
191 # Create the temporary table |
|
192 if not self._cnx.repo.schema.rschema(rtype).inlined: |
|
193 try: |
|
194 sql = 'CREATE TABLE %(r)s_relation_iid_tmp (uri_from character ' \ |
|
195 'varying(%(s)s), uri_to character varying(%(s)s))' |
|
196 self.sql(sql % {'r': rtype, 's': self.iid_maxsize}) |
|
197 except ProgrammingError: |
|
198 # XXX Already exist (probably due to multiple import) |
|
199 pass |
|
200 else: |
|
201 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
|
202 # Add it to the initialized set |
|
203 self._uri_rtypes.add(rtype) |
|
204 |
|
205 def _init_uri_eid_table(self, etype): |
|
206 """ Build a temporary table for id/eid convertion |
|
207 """ |
|
208 try: |
|
209 sql = "CREATE TABLE uri_eid_%(e)s (uri character varying(%(size)s), eid integer)" |
|
210 self.sql(sql % {'e': etype.lower(), 'size': self.iid_maxsize,}) |
|
211 except ProgrammingError: |
|
212 # XXX Already exist (probably due to multiple import) |
|
213 pass |
|
214 # Add it to the initialized set |
|
215 self._init_uri_eid.add(etype) |
|
216 |
|
217 def _init_massive_metatables(self): |
|
218 # Check if our tables are not already created (i.e. a restart) |
|
219 self._initialized_table_created = self._dbh.table_exists('cwmassive_initialized') |
|
220 self._constraint_table_created = self._dbh.table_exists('cwmassive_constraints') |
|
221 self._metadata_table_created = self._dbh.table_exists('cwmassive_metadata') |
|
222 |
|
223 ### RELATE FUNCTION ####################################################### |
|
224 |
|
225 def relate_by_iid(self, iid_from, rtype, iid_to): |
|
226 """Add new relation based on the internal id (iid) |
|
227 of the entities (not the eid)""" |
|
228 # Push data |
|
229 if isinstance(iid_from, unicode): |
|
230 iid_from = iid_from.encode('utf-8') |
|
231 if isinstance(iid_to, unicode): |
|
232 iid_to = iid_to.encode('utf-8') |
|
233 self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to}) |
|
234 |
|
235 ### FLUSH FUNCTIONS ####################################################### |
|
236 |
|
237 def flush_relations(self): |
|
238 """ Flush the relations data |
|
239 """ |
|
240 for rtype, data in self._data_uri_relations.items(): |
|
241 if not data: |
|
242 self.logger.info('No data for rtype %s', rtype) |
|
243 buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data])) |
|
244 if not buf: |
|
245 self.logger.info('Empty Buffer for rtype %s', rtype) |
|
246 continue |
|
247 cursor = self._cnx.cnxset.cu |
|
248 if not self._cnx.repo.schema.rschema(rtype).inlined: |
|
249 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(), |
|
250 null='NULL', columns=('uri_from', 'uri_to')) |
|
251 else: |
|
252 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
|
253 buf.close() |
|
254 # Clear data cache |
|
255 self._data_uri_relations[rtype] = [] |
|
256 |
|
257 def fill_uri_eid_table(self, etype, uri_label): |
|
258 """ Fill the uri_eid table |
|
259 """ |
|
260 self.logger.info('Fill uri_eid for etype %s', etype) |
|
261 sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s' |
|
262 self.sql(sql % {'l': uri_label, 'e': etype.lower()}) |
|
263 # Add indexes |
|
264 self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()}) |
|
265 # Set the etype as converted |
|
266 self._uri_eid_inserted.add(etype) |
|
267 |
|
268 def convert_relations(self, etype_from, rtype, etype_to, |
|
269 uri_label_from='cwuri', uri_label_to='cwuri'): |
|
270 """ Flush the converted relations |
|
271 """ |
|
272 # Always flush relations to be sure |
|
273 self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to) |
|
274 self.flush_relations() |
|
275 if uri_label_from and etype_from not in self._uri_eid_inserted: |
|
276 self.fill_uri_eid_table(etype_from, uri_label_from) |
|
277 if uri_label_to and etype_to not in self._uri_eid_inserted: |
|
278 self.fill_uri_eid_table(etype_to, uri_label_to) |
|
279 if self._cnx.repo.schema.rschema(rtype).inlined: |
|
280 self.logger.warning("Can't insert inlined relation %s", rtype) |
|
281 return |
|
282 if uri_label_from and uri_label_to: |
|
283 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid |
|
284 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2 |
|
285 WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS ( |
|
286 SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid); |
|
287 ''' |
|
288 elif uri_label_to: |
|
289 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT |
|
290 CAST(T.uri_from AS INTEGER), O1.eid |
|
291 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1 |
|
292 WHERE O1.uri=T.uri_to AND NOT EXISTS ( |
|
293 SELECT 1 FROM %(r)s_relation AS TT WHERE |
|
294 TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid); |
|
295 ''' |
|
296 elif uri_label_from: |
|
297 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to |
|
298 O1.eid, CAST(T.uri_to AS INTEGER) |
|
299 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1 |
|
300 WHERE O1.uri=T.uri_from AND NOT EXISTS ( |
|
301 SELECT 1 FROM %(r)s_relation AS TT WHERE |
|
302 TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER)); |
|
303 ''' |
|
304 try: |
|
305 self.sql(sql % {'r': rtype.lower(), |
|
306 'et': etype_to.lower() if etype_to else u'', |
|
307 'ef': etype_from.lower() if etype_from else u''}) |
|
308 except Exception as ex: |
|
309 self.logger.error("Can't insert relation %s: %s", rtype, ex) |
|
310 |
|
311 ### SQL UTILITIES ######################################################### |
|
312 |
|
313 def drop_and_store_indexes(self, tablename): |
|
314 # Drop indexes and constraints |
|
315 if not self._constraint_table_created: |
|
316 # Create a table to save the constraints |
|
317 # Allow reload even after crash |
|
318 sql = "CREATE TABLE cwmassive_constraints (origtable text, query text, type varchar(256))" |
|
319 self.sql(sql) |
|
320 self._constraint_table_created = True |
|
321 self._drop_table_indexes(tablename) |
|
322 |
|
323 def _drop_table_indexes(self, tablename): |
|
324 """ Drop and store table constraints and indexes """ |
|
325 indexes = self._dbh.application_indexes(tablename) |
|
326 for name, query in indexes.items(): |
|
327 sql = 'INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)' |
|
328 self.sql(sql, {'e': tablename, 'c': query, 't': 'index'}) |
|
329 sql = 'DROP INDEX %s' % name |
|
330 self.sql(sql) |
|
331 |
|
332 def reapply_constraint_index(self, tablename): |
|
333 if not self._dbh.table_exists('cwmassive_constraints'): |
|
334 self.logger.info('The table cwmassive_constraints does not exist') |
|
335 return |
|
336 sql = 'SELECT query FROM cwmassive_constraints WHERE origtable = %(e)s' |
|
337 crs = self.sql(sql, {'e': tablename}) |
|
338 for query, in crs.fetchall(): |
|
339 self.sql(query) |
|
340 self.sql('DELETE FROM cwmassive_constraints WHERE origtable = %(e)s ' |
|
341 'AND query = %(q)s', {'e': tablename, 'q': query}) |
|
342 |
|
343 def _drop_metatables_constraints(self): |
|
344 """ Drop all the constraints for the meta data""" |
|
345 for tablename in ('created_by_relation', 'owned_by_relation', |
|
346 'is_instance_of_relation', 'is_relation', |
|
347 'entities'): |
|
348 self.drop_and_store_indexes(tablename) |
|
349 |
|
350 def _create_metatables_constraints(self): |
|
351 """ Create all the constraints for the meta data""" |
|
352 for tablename in ('entities', |
|
353 'created_by_relation', 'owned_by_relation', |
|
354 'is_instance_of_relation', 'is_relation'): |
|
355 # Indexes and constraints |
|
356 self.reapply_constraint_index(tablename) |
|
357 |
|
358 def init_relation_table(self, rtype): |
|
359 """ Get and remove all indexes for performance sake """ |
|
360 # Create temporary table |
|
361 if not self.slave_mode and rtype not in self._rtypes: |
|
362 sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower() |
|
363 self.sql(sql) |
|
364 # Drop indexes and constraints |
|
365 tablename = '%s_relation' % rtype.lower() |
|
366 self.drop_and_store_indexes(tablename) |
|
367 # Push the etype in the initialized table for easier restart |
|
368 self.init_create_initialized_table() |
|
369 sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)' |
|
370 self.sql(sql, {'e': rtype, 't': 'rtype'}) |
|
371 # Mark rtype as "initialized" for faster check |
|
372 self._rtypes.add(rtype) |
|
373 |
|
374 def init_create_initialized_table(self): |
|
375 """ Create the cwmassive initialized table |
|
376 """ |
|
377 if not self._initialized_table_created: |
|
378 sql = "CREATE TABLE cwmassive_initialized (retype text, type varchar(128))" |
|
379 self.sql(sql) |
|
380 self._initialized_table_created = True |
|
381 |
|
382 def init_etype_table(self, etype): |
|
383 """ Add eid sequence to a particular etype table and |
|
384 remove all indexes for performance sake """ |
|
385 if etype not in self._entities: |
|
386 # Only for non-initialized etype and not slave mode store |
|
387 if not self.slave_mode: |
|
388 # Drop indexes and constraints |
|
389 tablename = 'cw_%s' % etype.lower() |
|
390 self.drop_and_store_indexes(tablename) |
|
391 # Push the etype in the initialized table for easier restart |
|
392 self.init_create_initialized_table() |
|
393 sql = 'INSERT INTO cwmassive_initialized VALUES (%(e)s, %(t)s)' |
|
394 self.sql(sql, {'e': etype, 't': 'etype'}) |
|
395 # Mark etype as "initialized" for faster check |
|
396 self._entities.add(etype) |
|
397 |
|
398 def restart_eid_sequence(self, start_eid): |
|
399 self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( |
|
400 'entities_id_seq', initial_value=start_eid)) |
|
401 self._cnx.commit() |
|
402 |
|
403 ### ENTITIES CREATION ##################################################### |
|
404 |
|
405 def _get_eid_gen(self): |
|
406 """ Function getting the next eid. This is done by preselecting |
|
407 a given number of eids from the 'entities_id_seq', and then |
|
408 storing them""" |
|
409 while True: |
|
410 last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range) |
|
411 for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1): |
|
412 yield eid |
|
413 |
|
414 def _apply_default_values(self, etype, kwargs): |
|
415 """Apply the default values for a given etype, attribute and value.""" |
|
416 default_values = self.default_values[etype] |
|
417 missing_keys = set(default_values) - set(kwargs) |
|
418 kwargs.update((key, default_values[key]) for key in missing_keys) |
|
419 |
|
420 # store api ################################################################ |
|
421 |
|
422 def prepare_insert_entity(self, etype, **kwargs): |
|
423 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
|
424 eid. |
|
425 """ |
|
426 # Init the table if necessary |
|
427 self.init_etype_table(etype) |
|
428 # Add meta data if not given |
|
429 if 'modification_date' not in kwargs: |
|
430 kwargs['modification_date'] = self._now |
|
431 if 'creation_date' not in kwargs: |
|
432 kwargs['creation_date'] = self._now |
|
433 if 'cwuri' not in kwargs: |
|
434 kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri) |
|
435 self._count_cwuri += 1 |
|
436 if 'eid' not in kwargs: |
|
437 # If eid is not given and the eids sequence is set, |
|
438 # use the value from the sequence |
|
439 kwargs['eid'] = self.get_next_eid() |
|
440 self._apply_default_values(etype, kwargs) |
|
441 self._data_entities[etype].append(kwargs) |
|
442 return kwargs.get('eid') |
|
443 |
|
444 def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs): |
|
445 """Insert into the database a relation ``rtype`` between entities with eids ``eid_from`` |
|
446 and ``eid_to``. |
|
447 """ |
|
448 # Init the table if necessary |
|
449 self.init_relation_table(rtype) |
|
450 self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to}) |
|
451 |
|
452 def flush(self): |
|
453 """Flush the data""" |
|
454 self.flush_entities() |
|
455 self.flush_internal_relations() |
|
456 self.flush_relations() |
|
457 |
|
458 def commit(self): |
|
459 """Commit the database transaction.""" |
|
460 self.on_commit() |
|
461 super(MassiveObjectStore, self).commit() |
|
462 |
|
463 def finish(self): |
|
464 """Remove temporary tables and columns.""" |
|
465 self.logger.info("Start cleaning") |
|
466 if self.slave_mode: |
|
467 raise RuntimeError('Store cleanup is not allowed in slave mode') |
|
468 self.logger.info("Start cleaning") |
|
469 # Cleanup relations tables |
|
470 for etype in self._init_uri_eid: |
|
471 self.sql('DROP TABLE uri_eid_%s' % etype.lower()) |
|
472 # Remove relations tables |
|
473 for rtype in self._uri_rtypes: |
|
474 if not self._cnx.repo.schema.rschema(rtype).inlined: |
|
475 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) |
|
476 else: |
|
477 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) |
|
478 # Create meta constraints (entities, is_instance_of, ...) |
|
479 self._create_metatables_constraints() |
|
480 # Get all the initialized etypes/rtypes |
|
481 if self._dbh.table_exists('cwmassive_initialized'): |
|
482 crs = self.sql('SELECT retype, type FROM cwmassive_initialized') |
|
483 for retype, _type in crs.fetchall(): |
|
484 self.logger.info('Cleanup for %s' % retype) |
|
485 if _type == 'etype': |
|
486 # Cleanup entities tables - Recreate indexes |
|
487 self._cleanup_entities(retype) |
|
488 elif _type == 'rtype': |
|
489 # Cleanup relations tables |
|
490 self._cleanup_relations(retype) |
|
491 self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s', |
|
492 {'e': retype}) |
|
493 self.reapply_all_constraints() |
|
494 # Delete the meta data table |
|
495 for table_name in ('cwmassive_initialized', 'cwmassive_constraints', 'cwmassive_metadata'): |
|
496 if self._dbh.table_exists(table_name): |
|
497 self.sql('DROP TABLE %s' % table_name) |
|
498 self.commit() |
|
499 |
|
500 ### FLUSH ################################################################# |
|
501 |
|
502 def on_commit(self): |
|
503 if self.on_commit_callback: |
|
504 self.on_commit_callback() |
|
505 |
|
506 def on_rollback(self, exc, etype, data): |
|
507 if self.on_rollback_callback: |
|
508 self.on_rollback_callback(exc, etype, data) |
|
509 self._cnx.rollback() |
|
510 else: |
|
511 raise exc |
|
512 |
|
513 def flush_internal_relations(self): |
|
514 """ Flush the relations data |
|
515 """ |
|
516 for rtype, data in self._data_relations.items(): |
|
517 if not data: |
|
518 # There is no data for these etype for this flush round. |
|
519 continue |
|
520 buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to')) |
|
521 if not buf: |
|
522 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
|
523 raise ValueError |
|
524 cursor = self._cnx.cnxset.cu |
|
525 # Push into the tmp table |
|
526 cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(), |
|
527 null='NULL', columns=('eid_from', 'eid_to')) |
|
528 # Clear data cache |
|
529 self._data_relations[rtype] = [] |
|
530 |
|
531 def flush_entities(self): |
|
532 """ Flush the entities data |
|
533 """ |
|
534 for etype, data in self._data_entities.items(): |
|
535 if not data: |
|
536 # There is no data for these etype for this flush round. |
|
537 continue |
|
538 # XXX It may be interresting to directly infer the columns' |
|
539 # names from the schema instead of using .keys() |
|
540 columns = data[0].keys() |
|
541 # XXX For now, the _create_copyfrom_buffer does a "row[column]" |
|
542 # which can lead to a key error. |
|
543 # Thus we should create dictionary with all the keys. |
|
544 columns = set() |
|
545 for d in data: |
|
546 columns.update(d.keys()) |
|
547 _data = [] |
|
548 _base_data = dict.fromkeys(columns) |
|
549 for d in data: |
|
550 _d = _base_data.copy() |
|
551 _d.update(d) |
|
552 _data.append(_d) |
|
553 buf = pgstore._create_copyfrom_buffer(_data, columns) |
|
554 if not buf: |
|
555 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
|
556 raise ValueError('Error in buffer creation for etype %s' % etype) |
|
557 columns = ['cw_%s' % attr for attr in columns] |
|
558 cursor = self._cnx.cnxset.cu |
|
559 try: |
|
560 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns) |
|
561 except Exception as exc: |
|
562 self.on_rollback(exc, etype, data) |
|
563 # Clear data cache |
|
564 self._data_entities[etype] = [] |
|
565 if not self.slave_mode: |
|
566 self.flush_meta_data() |
|
567 |
|
568 def flush_meta_data(self): |
|
569 """ Flush the meta data (entities table, is_instance table, ...) |
|
570 """ |
|
571 if self.slave_mode: |
|
572 raise RuntimeError('Flushing meta data is not allow in slave mode') |
|
573 if not self._dbh.table_exists('cwmassive_initialized'): |
|
574 self.logger.info('No information available for initialized etypes/rtypes') |
|
575 return |
|
576 if not self._metadata_table_created: |
|
577 # Keep the correctly flush meta data in database |
|
578 sql = "CREATE TABLE cwmassive_metadata (etype text)" |
|
579 self.sql(sql) |
|
580 self._metadata_table_created = True |
|
581 crs = self.sql('SELECT etype FROM cwmassive_metadata') |
|
582 already_flushed = set(e for e, in crs.fetchall()) |
|
583 crs = self.sql('SELECT retype FROM cwmassive_initialized WHERE type = %(t)s', |
|
584 {'t': 'etype'}) |
|
585 all_etypes = set(e for e, in crs.fetchall()) |
|
586 for etype in all_etypes: |
|
587 if etype not in already_flushed: |
|
588 # Deals with meta data |
|
589 self.logger.info('Flushing meta data for %s' % etype) |
|
590 self.insert_massive_meta_data(etype) |
|
591 sql = 'INSERT INTO cwmassive_metadata VALUES (%(e)s)' |
|
592 self.sql(sql, {'e': etype}) |
|
593 |
|
594 def _cleanup_entities(self, etype): |
|
595 """ Cleanup etype table """ |
|
596 # Create indexes and constraints |
|
597 tablename = SQL_PREFIX + etype.lower() |
|
598 self.reapply_constraint_index(tablename) |
|
599 |
|
600 def _cleanup_relations(self, rtype): |
|
601 """ Cleanup rtype table """ |
|
602 # Push into relation table while removing duplicate |
|
603 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT |
|
604 T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T |
|
605 WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE |
|
606 TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype} |
|
607 self.sql(sql) |
|
608 # Drop temporary relation table |
|
609 sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
|
610 self.sql(sql) |
|
611 # Create indexes and constraints |
|
612 tablename = '%s_relation' % rtype.lower() |
|
613 self.reapply_constraint_index(tablename) |
|
614 |
|
615 def insert_massive_meta_data(self, etype): |
|
616 """ Massive insertion of meta data for a given etype, based on SQL statements. |
|
617 """ |
|
618 # Push data - Use coalesce to avoid NULL (and get 0), if there is no |
|
619 # entities of this type in the entities table. |
|
620 # Meta data relations |
|
621 self.metagen_push_relation(etype, self._cnx.user.eid, 'created_by_relation') |
|
622 self.metagen_push_relation(etype, self._cnx.user.eid, 'owned_by_relation') |
|
623 self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation') |
|
624 self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation') |
|
625 self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation') |
|
626 sql = ("INSERT INTO entities (eid, type, asource, extid) " |
|
627 "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " |
|
628 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
629 % (etype, etype.lower())) |
|
630 self.sql(sql) |
|
631 |
|
632 def metagen_push_relation(self, etype, eid_to, rtype): |
|
633 sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " |
|
634 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
635 % (rtype, eid_to, etype.lower())) |
|
636 self.sql(sql) |
|
637 |
|
638 |
|
639 ### CONSTRAINTS MANAGEMENT FUNCTIONS ########################################## |
|
640 |
|
641 def get_size_constraints(schema): |
|
642 """analyzes yams ``schema`` and returns the list of size constraints. |
|
643 |
|
644 The returned value is a dictionary mapping entity types to a |
|
645 sub-dictionnaries mapping attribute names -> max size. |
|
646 """ |
|
647 size_constraints = {} |
|
648 # iterates on all entity types |
|
649 for eschema in schema.entities(): |
|
650 # for each entity type, iterates on attribute definitions |
|
651 size_constraints[eschema.type] = eschema_constraints = {} |
|
652 for rschema, aschema in eschema.attribute_definitions(): |
|
653 # for each attribute, if a size constraint is found, |
|
654 # append it to the size constraint list |
|
655 maxsize = None |
|
656 rdef = rschema.rdef(eschema, aschema) |
|
657 for constraint in rdef.constraints: |
|
658 if isinstance(constraint, SizeConstraint): |
|
659 maxsize = constraint.max |
|
660 eschema_constraints[rschema.type] = maxsize |
|
661 return size_constraints |
|
662 |
|
663 def get_default_values(schema): |
|
664 """analyzes yams ``schema`` and returns the list of default values. |
|
665 |
|
666 The returned value is a dictionary mapping entity types to a |
|
667 sub-dictionnaries mapping attribute names -> default values. |
|
668 """ |
|
669 default_values = {} |
|
670 # iterates on all entity types |
|
671 for eschema in schema.entities(): |
|
672 # for each entity type, iterates on attribute definitions |
|
673 default_values[eschema.type] = eschema_constraints = {} |
|
674 for rschema, _ in eschema.attribute_definitions(): |
|
675 # for each attribute, if a size constraint is found, |
|
676 # append it to the size constraint list |
|
677 if eschema.default(rschema.type) is not None: |
|
678 eschema_constraints[rschema.type] = eschema.default(rschema.type) |
|
679 return default_values |
|
680 |
|
681 |
|
682 class PGHelper(object): |
|
683 def __init__(self, cnx, pg_schema='public'): |
|
684 self.cnx = cnx |
|
685 # Deals with pg schema, see #3216686 |
|
686 self.pg_schema = pg_schema |
|
687 |
|
688 def application_indexes_constraints(self, tablename): |
|
689 """ Get all the indexes/constraints for a given tablename """ |
|
690 indexes = self.application_indexes(tablename) |
|
691 constraints = self.application_constraints(tablename) |
|
692 _indexes = {} |
|
693 for name, query in indexes.items(): |
|
694 # Remove pkey indexes (automatically created by constraints) |
|
695 # Specific cases of primary key, see #3224079 |
|
696 if name not in constraints: |
|
697 _indexes[name] = query |
|
698 return _indexes, constraints |
|
699 |
|
700 def table_exists(self, table_name): |
|
701 sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s" |
|
702 crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema}) |
|
703 res = crs.fetchall() |
|
704 if res: |
|
705 return True |
|
706 return False |
|
707 |
|
708 # def check_if_primary_key_exists_for_table(self, table_name): |
|
709 # sql = ("SELECT constraint_name FROM information_schema.table_constraints " |
|
710 # "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s") |
|
711 # crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema}) |
|
712 # res = crs.fetchall() |
|
713 # if res: |
|
714 # return True |
|
715 # return False |
|
716 |
|
717 def index_query(self, name): |
|
718 """Get the request to be used to recreate the index""" |
|
719 return self.cnx.system_sql("SELECT pg_get_indexdef(c.oid) " |
|
720 "from pg_catalog.pg_class c " |
|
721 "LEFT JOIN pg_catalog.pg_namespace n " |
|
722 "ON n.oid = c.relnamespace " |
|
723 "WHERE c.relname = %(r)s AND n.nspname=%(n)s", |
|
724 {'r': name, 'n': self.pg_schema}).fetchone()[0] |
|
725 |
|
726 def constraint_query(self, name): |
|
727 """Get the request to be used to recreate the constraint""" |
|
728 return self.cnx.system_sql("SELECT pg_get_constraintdef(c.oid) " |
|
729 "from pg_catalog.pg_constraint c " |
|
730 "LEFT JOIN pg_catalog.pg_namespace n " |
|
731 "ON n.oid = c.connamespace " |
|
732 "WHERE c.conname = %(r)s AND n.nspname=%(n)s", |
|
733 {'r': name, 'n': self.pg_schema}).fetchone()[0] |
|
734 |
|
735 def index_list(self, tablename): |
|
736 # This SQL query (cf http://www.postgresql.org/message-id/432F450F.4080700@squiz.net) |
|
737 # aims at getting all the indexes for each table. |
|
738 sql = '''SELECT c.relname as "Name" |
|
739 FROM pg_catalog.pg_class c |
|
740 JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid |
|
741 JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid |
|
742 LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner |
|
743 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace |
|
744 WHERE c.relkind IN ('i','') |
|
745 AND c2.relname = '%s' |
|
746 AND i.indisprimary = FALSE |
|
747 AND n.nspname NOT IN ('pg_catalog', 'pg_toast') |
|
748 AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename |
|
749 return self.cnx.system_sql(sql).fetchall() |
|
750 |
|
751 def application_indexes(self, tablename): |
|
752 """ Iterate over all the indexes """ |
|
753 indexes_list = self.index_list(tablename) |
|
754 indexes = {} |
|
755 for name, in indexes_list: |
|
756 indexes[name] = self.index_query(name) |
|
757 return indexes |
|
758 |
|
759 def constraint_list(self, tablename): |
|
760 sql = '''SELECT i.conname as "Name" |
|
761 FROM pg_catalog.pg_class c |
|
762 JOIN pg_catalog.pg_constraint i ON i.conrelid = c.oid |
|
763 JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid |
|
764 LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner |
|
765 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace |
|
766 WHERE |
|
767 c2.relname = '%s' |
|
768 AND n.nspname NOT IN ('pg_catalog', 'pg_toast') |
|
769 AND pg_catalog.pg_table_is_visible(c.oid) |
|
770 ''' % tablename |
|
771 return self.cnx.system_sql(sql).fetchall() |
|
772 |
|
773 def application_constraints(self, tablename): |
|
774 """ Iterate over all the constraints """ |
|
775 constraint_list = self.constraint_list(tablename) |
|
776 constraints = {} |
|
777 for name, in constraint_list: |
|
778 query = self.constraint_query(name) |
|
779 constraints[name] = 'ALTER TABLE %s ADD CONSTRAINT %s %s' % (tablename, name, query) |
|
780 return constraints |
|