|
1 # coding: utf-8 |
|
2 # copyright 2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT ANY |
|
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR |
|
14 # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 |
|
20 import logging |
|
21 from datetime import datetime |
|
22 from collections import defaultdict |
|
23 from StringIO import StringIO |
|
24 |
|
25 from yams.constraints import SizeConstraint |
|
26 |
|
27 from psycopg2 import ProgrammingError |
|
28 |
|
29 from cubicweb.dataimport import stores, pgstore |
|
30 from cubicweb.utils import make_uid |
|
31 from cubicweb.server.sqlutils import SQL_PREFIX |
|
32 |
|
33 |
|
34 class MassiveObjectStore(stores.RQLObjectStore): |
|
35 """ |
|
36 Store for massive import of data, with delayed insertion of meta data. |
|
37 |
|
38 WARNINGS: |
|
39 - This store may be only used with PostgreSQL for now, as it relies |
|
40 on the COPY FROM method, and on specific PostgreSQL tables to get all |
|
41 the indexes. |
|
42 - This store can only insert relations that are not inlined (i.e., |
|
43 which do *not* have inlined=True in their definition in the schema). |
|
44 |
|
45 |
|
46 It should be used as follows: |
|
47 |
|
48 store = MassiveObjectStore(cnx) |
|
49 store.init_rtype_table('Person', 'lives_in', 'Location') |
|
50 ... |
|
51 |
|
52 store.create_entity('Person', subj_iid_attribute=person_iid, ...) |
|
53 store.create_entity('Location', obj_iid_attribute=location_iid, ...) |
|
54 ... |
|
55 |
|
56 # subj_iid_attribute and obj_iid_attribute are argument names |
|
57 # chosen by the user (e.g. "cwuri"). These names can be identical. |
|
58 # person_iid and location_iid are unique IDs and depend on the data |
|
59 # (e.g URI). |
|
60 store.flush() |
|
61 store.relate_by_iid(person_iid, 'lives_in', location_iid) |
|
62 # For example: |
|
63 store.create_entity('Person', |
|
64 cwuri='http://dbpedia.org/toto', |
|
65 name='Toto') |
|
66 store.create_entity('Location', |
|
67 uri='http://geonames.org/11111', |
|
68 name='Somewhere') |
|
69 store.flush() |
|
70 store.relate_by_iid('http://dbpedia.org/toto', |
|
71 'lives_in', |
|
72 'http://geonames.org/11111') |
|
73 # Finally |
|
74 store.flush_meta_data() |
|
75 store.convert_relations('Person', 'lives_in', 'Location', |
|
76 'subj_iid_attribute', 'obj_iid_attribute') |
|
77 # For the previous example: |
|
78 store.convert_relations('Person', 'lives_in', 'Location', 'cwuri', 'uri') |
|
79 ... |
|
80 store.cleanup() |
|
81 """ |
|
82 |
|
83 def __init__(self, cnx, autoflush_metadata=True, |
|
84 replace_sep='', commit_at_flush=True, |
|
85 drop_index=True, |
|
86 pg_schema='public', |
|
87 iid_maxsize=1024, uri_param_name='rdf:about', |
|
88 eids_seq_range=10000, eids_seq_start=None, |
|
89 on_commit_callback=None, on_rollback_callback=None, |
|
90 slave_mode=False, build_entities=False, |
|
91 source=None): |
|
92 """ Create a MassiveObject store, with the following attributes: |
|
93 |
|
94 - cnx: CubicWeb cnx |
|
95 - autoflush_metadata: Boolean. |
|
96 Automatically flush the metadata after |
|
97 each flush() |
|
98 - replace_sep: String. Replace separator used for |
|
99 (COPY FROM) buffer creation. |
|
100 - commit_at_flush: Boolean. Commit after each flush(). |
|
101 - drop_index: Boolean. Drop SQL index before COPY FROM |
|
102 - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time |
|
103 by the store (default is 10000). |
|
104 If None, the sequence eids is attached to each entity tables |
|
105 (backward compatibility with the 0.2.0). |
|
106 - eids_seq_start: Int. Set the eids sequence value (if None, nothing is done). |
|
107 - iid_maxsize: Int. Max size of the iid, used to create the |
|
108 iid_eid convertion table. |
|
109 - uri_param_name: String. If given, will use this parameter to get cw_uri |
|
110 for entities. |
|
111 - build_entities: Boolean. If True, create_entity returns a CW etype object |
|
112 (but WITHOUT eid !). |
|
113 """ |
|
114 super(MassiveObjectStore, self).__init__(cnx) |
|
115 self.logger = logging.getLogger('dataio.relationmixin') |
|
116 self._cnx = cnx |
|
117 self.sql = cnx.system_sql |
|
118 self.iid_maxsize = iid_maxsize |
|
119 self.replace_sep = replace_sep |
|
120 self.commit_at_flush = commit_at_flush |
|
121 self._data_uri_relations = defaultdict(list) |
|
122 self._initialized = {'init_uri_eid': set(), |
|
123 'uri_eid_inserted': set(), |
|
124 'uri_rtypes': set(), |
|
125 'entities': set(), |
|
126 'rtypes': set(), |
|
127 } |
|
128 self.sql = self._cnx.system_sql |
|
129 self.logger = logging.getLogger('dataio.massiveimport') |
|
130 self.autoflush_metadata = autoflush_metadata |
|
131 self.replace_sep = replace_sep |
|
132 self.drop_index = drop_index |
|
133 self.slave_mode = slave_mode |
|
134 self.size_constraints = get_size_constraints(cnx.vreg.schema) |
|
135 self.default_values = get_default_values(cnx.vreg.schema) |
|
136 self._dbh = PGHelper(self._cnx, pg_schema or 'public') |
|
137 self._build_entities = build_entities |
|
138 self._data_entities = defaultdict(list) |
|
139 self._data_relations = defaultdict(list) |
|
140 self._now = datetime.now() |
|
141 self._default_cwuri = make_uid('_auto_generated') |
|
142 self.uri_param_name = uri_param_name |
|
143 self._count_cwuri = 0 |
|
144 self.commit_at_flush = commit_at_flush |
|
145 self.on_commit_callback = on_commit_callback |
|
146 self.on_rollback_callback = on_rollback_callback |
|
147 # Initialized the meta tables of dataio for warm restart |
|
148 self._init_dataio_metatables() |
|
149 # Internal markers of initialization |
|
150 self._eids_seq_range = eids_seq_range |
|
151 self._eids_seq_start = eids_seq_start |
|
152 if self._eids_seq_start is not None: |
|
153 self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( |
|
154 'entities_id_seq', initial_value=self._eids_seq_start + 1)) |
|
155 cnx.commit() |
|
156 self.get_next_eid = self._get_eid_gen().next |
|
157 # recreate then when self.cleanup() is called |
|
158 if not self.slave_mode and self.drop_index: |
|
159 self._drop_metatables_constraints() |
|
160 if source is None: |
|
161 source = cnx.repo.system_source |
|
162 self.source = source |
|
163 self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN')) |
|
164 cnx.read_security = False |
|
165 cnx.write_security = False |
|
166 |
|
167 ### INIT FUNCTIONS ######################################################## |
|
168 |
|
169 def init_rtype_table(self, etype_from, rtype, etype_to): |
|
170 """ Build temporary table a for standard rtype """ |
|
171 # Create an uri_eid table for each etype for a better |
|
172 # control of which etype is concerns for a particular |
|
173 # possibly multivalued relation. |
|
174 for etype in (etype_from, etype_to): |
|
175 if etype and etype not in self._initialized['init_uri_eid']: |
|
176 self._init_uri_eid_table(etype) |
|
177 if rtype not in self._initialized['uri_rtypes']: |
|
178 # Create the temporary tables |
|
179 if not self._cnx.repo.schema.rschema(rtype).inlined: |
|
180 try: |
|
181 sql = 'CREATE TABLE %(r)s_relation_iid_tmp (uri_from character ' \ |
|
182 'varying(%(s)s), uri_to character varying(%(s)s))' |
|
183 self.sql(sql % {'r': rtype, 's': self.iid_maxsize}) |
|
184 except ProgrammingError: |
|
185 # XXX Already exist (probably due to multiple import) |
|
186 pass |
|
187 else: |
|
188 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
|
189 #Add it to the initialized set |
|
190 self._initialized['uri_rtypes'].add(rtype) |
|
191 |
|
192 def _init_uri_eid_table(self, etype): |
|
193 """ Build a temporary table for id/eid convertion |
|
194 """ |
|
195 try: |
|
196 sql = "CREATE TABLE uri_eid_%(e)s (uri character varying(%(size)s), eid integer)" |
|
197 self.sql(sql % {'e': etype.lower(), 'size': self.iid_maxsize,}) |
|
198 except ProgrammingError: |
|
199 # XXX Already exist (probably due to multiple import) |
|
200 pass |
|
201 # Add it to the initialized set |
|
202 self._initialized['init_uri_eid'].add(etype) |
|
203 |
|
204 def _init_dataio_metatables(self): |
|
205 """ Initialized the meta tables of dataio for warm restart |
|
206 """ |
|
207 # Check if dataio tables are not already created (i.e. a restart) |
|
208 self._initialized_table_created = self._dbh.table_exists('dataio_initialized') |
|
209 self._constraint_table_created = self._dbh.table_exists('dataio_constraints') |
|
210 self._metadata_table_created = self._dbh.table_exists('dataio_metadata') |
|
211 |
|
212 ### RELATE FUNCTION ####################################################### |
|
213 |
|
214 def relate_by_iid(self, iid_from, rtype, iid_to): |
|
215 """Add new relation based on the internal id (iid) |
|
216 of the entities (not the eid)""" |
|
217 # Push data |
|
218 if isinstance(iid_from, unicode): |
|
219 iid_from = iid_from.encode('utf-8') |
|
220 if isinstance(iid_to, unicode): |
|
221 iid_to = iid_to.encode('utf-8') |
|
222 self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to}) |
|
223 |
|
224 ### FLUSH FUNCTIONS ####################################################### |
|
225 |
|
226 def flush_relations(self): |
|
227 """ Flush the relations data |
|
228 """ |
|
229 for rtype, data in self._data_uri_relations.iteritems(): |
|
230 if not data: |
|
231 self.logger.info('No data for rtype %s', rtype) |
|
232 buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data])) |
|
233 if not buf: |
|
234 self.logger.info('Empty Buffer for rtype %s', rtype) |
|
235 continue |
|
236 cursor = self._cnx.cnxset.cu |
|
237 if not self._cnx.repo.schema.rschema(rtype).inlined: |
|
238 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(), |
|
239 null='NULL', columns=('uri_from', 'uri_to')) |
|
240 else: |
|
241 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
|
242 buf.close() |
|
243 # Clear data cache |
|
244 self._data_uri_relations[rtype] = [] |
|
245 # Commit if asked |
|
246 if self.commit_at_flush: |
|
247 self.commit() |
|
248 |
|
249 def fill_uri_eid_table(self, etype, uri_label): |
|
250 """ Fill the uri_eid table |
|
251 """ |
|
252 self.logger.info('Fill uri_eid for etype %s', etype) |
|
253 sql = 'INSERT INTO uri_eid_%(e)s SELECT cw_%(l)s, cw_eid FROM cw_%(e)s' |
|
254 self.sql(sql % {'l': uri_label, 'e': etype.lower()}) |
|
255 # Add indexes |
|
256 self.sql('CREATE INDEX uri_eid_%(e)s_idx ON uri_eid_%(e)s' '(uri)' % {'e': etype.lower()}) |
|
257 # Set the etype as converted |
|
258 self._initialized['uri_eid_inserted'].add(etype) |
|
259 self.commit() |
|
260 |
|
261 def convert_relations(self, etype_from, rtype, etype_to, |
|
262 uri_label_from='cwuri', uri_label_to='cwuri'): |
|
263 """ Flush the converted relations |
|
264 """ |
|
265 # Always flush relations to be sure |
|
266 self.logger.info('Convert relations %s %s %s', etype_from, rtype, etype_to) |
|
267 self.flush_relations() |
|
268 if uri_label_from and etype_from not in self._initialized['uri_eid_inserted']: |
|
269 self.fill_uri_eid_table(etype_from, uri_label_from) |
|
270 if uri_label_to and etype_to not in self._initialized['uri_eid_inserted']: |
|
271 self.fill_uri_eid_table(etype_to, uri_label_to) |
|
272 if self._cnx.repo.schema.rschema(rtype).inlined: |
|
273 self.logger.warning("Can't insert inlined relation %s", rtype) |
|
274 return |
|
275 if uri_label_from and uri_label_to: |
|
276 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid |
|
277 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2 |
|
278 WHERE O1.uri=T.uri_from AND O2.uri=T.uri_to AND NOT EXISTS ( |
|
279 SELECT 1 FROM %(r)s_relation AS TT WHERE TT.eid_from=O1.eid AND TT.eid_to=O2.eid); |
|
280 ''' |
|
281 elif uri_label_to: |
|
282 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT |
|
283 CAST(T.uri_from AS INTEGER), O1.eid |
|
284 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(et)s as O1 |
|
285 WHERE O1.uri=T.uri_to AND NOT EXISTS ( |
|
286 SELECT 1 FROM %(r)s_relation AS TT WHERE |
|
287 TT.eid_from=CAST(T.uri_from AS INTEGER) AND TT.eid_to=O1.eid); |
|
288 ''' |
|
289 elif uri_label_from: |
|
290 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, T.uri_to |
|
291 O1.eid, CAST(T.uri_to AS INTEGER) |
|
292 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1 |
|
293 WHERE O1.uri=T.uri_from AND NOT EXISTS ( |
|
294 SELECT 1 FROM %(r)s_relation AS TT WHERE |
|
295 TT.eid_from=O1.eid AND TT.eid_to=CAST(T.uri_to AS INTEGER)); |
|
296 ''' |
|
297 try: |
|
298 self.sql(sql % {'r': rtype.lower(), |
|
299 'et': etype_to.lower() if etype_to else u'', |
|
300 'ef': etype_from.lower() if etype_from else u''}) |
|
301 except Exception as ex: |
|
302 self.logger.error("Can't insert relation %s: %s", rtype, ex) |
|
303 self.commit() |
|
304 |
|
305 ### SQL UTILITIES ######################################################### |
|
306 |
|
307 def drop_and_store_indexes_constraints(self, tablename): |
|
308 # Drop indexes and constraints |
|
309 if not self._constraint_table_created: |
|
310 # Create a table to save the constraints |
|
311 # Allow reload even after crash |
|
312 sql = "CREATE TABLE dataio_constraints (origtable text, query text, type varchar(256))" |
|
313 self.sql(sql) |
|
314 self._constraint_table_created = True |
|
315 self._drop_table_constraints_indexes(tablename) |
|
316 |
|
317 def _drop_table_constraints_indexes(self, tablename): |
|
318 """ Drop and store table constraints and indexes """ |
|
319 indexes, constraints = self._dbh.application_indexes_constraints(tablename) |
|
320 for name, query in constraints.iteritems(): |
|
321 sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)' |
|
322 self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'}) |
|
323 sql = 'ALTER TABLE %s DROP CONSTRAINT %s CASCADE' % (tablename, name) |
|
324 self.sql(sql) |
|
325 for name, query in indexes.iteritems(): |
|
326 sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)' |
|
327 self.sql(sql, {'e': tablename, 'c': query, 't': 'index'}) |
|
328 sql = 'DROP INDEX %s' % name |
|
329 self.sql(sql) |
|
330 |
|
331 def reapply_constraint_index(self, tablename): |
|
332 if not self._dbh.table_exists('dataio_constraints'): |
|
333 self.logger.info('The table dataio_constraints does not exist ' |
|
334 '(keep_index option should be True)') |
|
335 return |
|
336 sql = 'SELECT query FROM dataio_constraints WHERE origtable = %(e)s' |
|
337 crs = self.sql(sql, {'e': tablename}) |
|
338 for query, in crs.fetchall(): |
|
339 self.sql(query) |
|
340 self.sql('DELETE FROM dataio_constraints WHERE origtable = %(e)s ' |
|
341 'AND query = %(q)s', {'e': tablename, 'q': query}) |
|
342 |
|
343 def _drop_metatables_constraints(self): |
|
344 """ Drop all the constraints for the meta data""" |
|
345 for tablename in ('created_by_relation', 'owned_by_relation', |
|
346 'is_instance_of_relation', 'identity_relation', |
|
347 'entities'): |
|
348 self.drop_and_store_indexes_constraints(tablename) |
|
349 |
|
350 def _create_metatables_constraints(self): |
|
351 """ Create all the constraints for the meta data""" |
|
352 for tablename in ('entities', |
|
353 'created_by_relation', 'owned_by_relation', |
|
354 'is_instance_of_relation', 'identity_relation'): |
|
355 # Indexes and constraints |
|
356 if self.drop_index: |
|
357 self.reapply_constraint_index(tablename) |
|
358 |
|
359 def init_relation_table(self, rtype): |
|
360 """ Get and remove all indexes for performance sake """ |
|
361 # Create temporary table |
|
362 if not self.slave_mode and rtype not in self._initialized['rtypes']: |
|
363 sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower() |
|
364 self.sql(sql) |
|
365 if self.drop_index: |
|
366 # Drop indexes and constraints |
|
367 tablename = '%s_relation' % rtype.lower() |
|
368 self.drop_and_store_indexes_constraints(tablename) |
|
369 # Push the etype in the initialized table for easier restart |
|
370 self.init_create_initialized_table() |
|
371 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)' |
|
372 self.sql(sql, {'e': rtype, 't': 'rtype'}) |
|
373 # Mark rtype as "initialized" for faster check |
|
374 self._initialized['rtypes'].add(rtype) |
|
375 |
|
376 def init_create_initialized_table(self): |
|
377 """ Create the dataio initialized table |
|
378 """ |
|
379 if not self._initialized_table_created: |
|
380 sql = "CREATE TABLE dataio_initialized (retype text, type varchar(128))" |
|
381 self.sql(sql) |
|
382 self._initialized_table_created = True |
|
383 |
|
384 def init_etype_table(self, etype): |
|
385 """ Add eid sequence to a particular etype table and |
|
386 remove all indexes for performance sake """ |
|
387 if etype not in self._initialized['entities']: |
|
388 # Only for non-initialized etype and not slave mode store |
|
389 if not self.slave_mode: |
|
390 if self._eids_seq_range is None: |
|
391 # Eids are directly set by the entities_id_seq. |
|
392 # We attach this sequence to all the created etypes. |
|
393 sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid " |
|
394 "SET DEFAULT nextval('entities_id_seq')" % etype.lower()) |
|
395 self.sql(sql) |
|
396 if self.drop_index: |
|
397 # Drop indexes and constraints |
|
398 tablename = 'cw_%s' % etype.lower() |
|
399 self.drop_and_store_indexes_constraints(tablename) |
|
400 # Push the etype in the initialized table for easier restart |
|
401 self.init_create_initialized_table() |
|
402 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)' |
|
403 self.sql(sql, {'e': etype, 't': 'etype'}) |
|
404 # Mark etype as "initialized" for faster check |
|
405 self._initialized['entities'].add(etype) |
|
406 |
|
407 |
|
408 ### ENTITIES CREATION ##################################################### |
|
409 |
|
410 def _get_eid_gen(self): |
|
411 """ Function getting the next eid. This is done by preselecting |
|
412 a given number of eids from the 'entities_id_seq', and then |
|
413 storing them""" |
|
414 while True: |
|
415 last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self._eids_seq_range) |
|
416 for eid in xrange(last_eid - self._eids_seq_range + 1, last_eid + 1): |
|
417 yield eid |
|
418 |
|
419 def apply_size_constraints(self, etype, kwargs): |
|
420 """ Apply the size constraints for a given etype, attribute and value |
|
421 """ |
|
422 size_constraints = self.size_constraints[etype] |
|
423 for attr, value in kwargs.items(): |
|
424 if value: |
|
425 maxsize = size_constraints.get(attr) |
|
426 if maxsize is not None and len(value) > maxsize: |
|
427 kwargs[attr] = value[:maxsize-4] + '...' |
|
428 return kwargs |
|
429 |
|
430 def apply_default_values(self, etype, kwargs): |
|
431 """ Apply the default values for a given etype, attribute and value |
|
432 """ |
|
433 default_values = self.default_values[etype] |
|
434 missing_keys = set(default_values) - set(kwargs) |
|
435 kwargs.update((key, default_values[key]) for key in missing_keys) |
|
436 return kwargs |
|
437 |
|
438 def create_entity(self, etype, **kwargs): |
|
439 """ Create an entity |
|
440 """ |
|
441 # Init the table if necessary |
|
442 self.init_etype_table(etype) |
|
443 # Add meta data if not given |
|
444 if 'modification_date' not in kwargs: |
|
445 kwargs['modification_date'] = self._now |
|
446 if 'creation_date' not in kwargs: |
|
447 kwargs['creation_date'] = self._now |
|
448 if 'cwuri' not in kwargs: |
|
449 if self.uri_param_name and self.uri_param_name in kwargs: |
|
450 kwargs['cwuri'] = kwargs[self.uri_param_name] |
|
451 else: |
|
452 kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri) |
|
453 self._count_cwuri += 1 |
|
454 if 'eid' not in kwargs and self._eids_seq_range is not None: |
|
455 # If eid is not given and the eids sequence is set, |
|
456 # use the value from the sequence |
|
457 kwargs['eid'] = self.get_next_eid() |
|
458 # Check size constraints |
|
459 kwargs = self.apply_size_constraints(etype, kwargs) |
|
460 # Apply default values |
|
461 kwargs = self.apply_default_values(etype, kwargs) |
|
462 # Push data / Return entity |
|
463 self._data_entities[etype].append(kwargs) |
|
464 entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx) |
|
465 entity.cw_attr_cache.update(kwargs) |
|
466 if 'eid' in kwargs: |
|
467 entity.eid = kwargs['eid'] |
|
468 return entity |
|
469 |
|
470 ### RELATIONS CREATION #################################################### |
|
471 |
|
472 def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs): |
|
473 """ Compatibility with other stores |
|
474 """ |
|
475 # Init the table if necessary |
|
476 self.init_relation_table(rtype) |
|
477 self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid}) |
|
478 |
|
479 |
|
480 ### FLUSH ################################################################# |
|
481 |
|
482 def on_commit(self): |
|
483 if self.on_commit_callback: |
|
484 self.on_commit_callback() |
|
485 |
|
486 def on_rollback(self, exc, etype, data): |
|
487 if self.on_rollback_callback: |
|
488 self.on_rollback_callback(exc, etype, data) |
|
489 self._cnx.rollback() |
|
490 else: |
|
491 raise exc |
|
492 |
|
493 def commit(self): |
|
494 self.on_commit() |
|
495 super(MassiveObjectStore, self).commit() |
|
496 |
|
497 def flush(self): |
|
498 """ Flush the data |
|
499 """ |
|
500 self.flush_entities() |
|
501 self.flush_internal_relations() |
|
502 self.flush_relations() |
|
503 |
|
504 def flush_internal_relations(self): |
|
505 """ Flush the relations data |
|
506 """ |
|
507 for rtype, data in self._data_relations.iteritems(): |
|
508 if not data: |
|
509 # There is no data for these etype for this flush round. |
|
510 continue |
|
511 buf = pgstore._create_copyfrom_buffer(data, ('eid_from', 'eid_to'), |
|
512 replace_sep=self.replace_sep) |
|
513 if not buf: |
|
514 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
|
515 raise ValueError |
|
516 cursor = self._cnx.cnxset.cu |
|
517 # Push into the tmp table |
|
518 cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(), |
|
519 null='NULL', columns=('eid_from', 'eid_to')) |
|
520 # Clear data cache |
|
521 self._data_relations[rtype] = [] |
|
522 # Commit if asked |
|
523 if self.commit_at_flush: |
|
524 self.commit() |
|
525 |
|
526 def flush_entities(self): |
|
527 """ Flush the entities data |
|
528 """ |
|
529 for etype, data in self._data_entities.iteritems(): |
|
530 if not data: |
|
531 # There is no data for these etype for this flush round. |
|
532 continue |
|
533 # XXX It may be interresting to directly infer the columns' |
|
534 # names from the schema instead of using .keys() |
|
535 columns = data[0].keys() |
|
536 # XXX For now, the _create_copyfrom_buffer does a "row[column]" |
|
537 # which can lead to a key error. |
|
538 # Thus we should create dictionary with all the keys. |
|
539 columns = set() |
|
540 for d in data: |
|
541 columns.update(d.keys()) |
|
542 _data = [] |
|
543 _base_data = dict.fromkeys(columns) |
|
544 for d in data: |
|
545 _d = _base_data.copy() |
|
546 _d.update(d) |
|
547 _data.append(_d) |
|
548 buf = pgstore._create_copyfrom_buffer(_data, columns, |
|
549 replace_sep=self.replace_sep) |
|
550 if not buf: |
|
551 # The buffer is empty. This is probably due to error in _create_copyfrom_buffer |
|
552 raise ValueError('Error in buffer creation for etype %s' % etype) |
|
553 columns = ['cw_%s' % attr for attr in columns] |
|
554 cursor = self._cnx.cnxset.cu |
|
555 try: |
|
556 cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns) |
|
557 except Exception as exc: |
|
558 self.on_rollback(exc, etype, data) |
|
559 # Clear data cache |
|
560 self._data_entities[etype] = [] |
|
561 if self.autoflush_metadata: |
|
562 self.flush_meta_data() |
|
563 # Commit if asked |
|
564 if self.commit_at_flush: |
|
565 self.commit() |
|
566 |
|
567 def flush_meta_data(self): |
|
568 """ Flush the meta data (entities table, is_instance table, ...) |
|
569 """ |
|
570 if self.slave_mode: |
|
571 raise RuntimeError('Flushing meta data is not allow in slave mode') |
|
572 if not self._dbh.table_exists('dataio_initialized'): |
|
573 self.logger.info('No information available for initialized etypes/rtypes') |
|
574 return |
|
575 if not self._metadata_table_created: |
|
576 # Keep the correctly flush meta data in database |
|
577 sql = "CREATE TABLE dataio_metadata (etype text)" |
|
578 self.sql(sql) |
|
579 self._metadata_table_created = True |
|
580 crs = self.sql('SELECT etype FROM dataio_metadata') |
|
581 already_flushed = set(e for e, in crs.fetchall()) |
|
582 crs = self.sql('SELECT retype FROM dataio_initialized WHERE type = %(t)s', |
|
583 {'t': 'etype'}) |
|
584 all_etypes = set(e for e, in crs.fetchall()) |
|
585 for etype in all_etypes: |
|
586 if etype not in already_flushed: |
|
587 # Deals with meta data |
|
588 self.logger.info('Flushing meta data for %s' % etype) |
|
589 self.insert_massive_meta_data(etype) |
|
590 sql = 'INSERT INTO dataio_metadata VALUES (%(e)s)' |
|
591 self.sql(sql, {'e': etype}) |
|
592 # Final commit |
|
593 self.commit() |
|
594 |
|
595 def _cleanup_entities(self, etype): |
|
596 """ Cleanup etype table """ |
|
597 if self._eids_seq_range is None: |
|
598 # Remove DEFAULT eids sequence if added |
|
599 sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower() |
|
600 self.sql(sql) |
|
601 # Create indexes and constraints |
|
602 if self.drop_index: |
|
603 tablename = SQL_PREFIX + etype.lower() |
|
604 self.reapply_constraint_index(tablename) |
|
605 |
|
606 def _cleanup_relations(self, rtype): |
|
607 """ Cleanup rtype table """ |
|
608 # Push into relation table while removing duplicate |
|
609 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT |
|
610 T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T |
|
611 WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE |
|
612 TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);''' % {'r': rtype} |
|
613 self.sql(sql) |
|
614 # Drop temporary relation table |
|
615 sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
|
616 self.sql(sql) |
|
617 # Create indexes and constraints |
|
618 if self.drop_index: |
|
619 tablename = '%s_relation' % rtype.lower() |
|
620 self.reapply_constraint_index(tablename) |
|
621 |
|
622 def cleanup(self): |
|
623 """ Remove temporary tables and columns |
|
624 """ |
|
625 self.logger.info("Start cleaning") |
|
626 if self.slave_mode: |
|
627 raise RuntimeError('Store cleanup is not allowed in slave mode') |
|
628 self.logger.info("Start cleaning") |
|
629 # Cleanup relations tables |
|
630 for etype in self._initialized['init_uri_eid']: |
|
631 self.sql('DROP TABLE uri_eid_%s' % etype.lower()) |
|
632 # Remove relations tables |
|
633 for rtype in self._initialized['uri_rtypes']: |
|
634 if not self._cnx.repo.schema.rschema(rtype).inlined: |
|
635 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) |
|
636 else: |
|
637 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) |
|
638 self.commit() |
|
639 # Get all the initialized etypes/rtypes |
|
640 if self._dbh.table_exists('dataio_initialized'): |
|
641 crs = self.sql('SELECT retype, type FROM dataio_initialized') |
|
642 for retype, _type in crs.fetchall(): |
|
643 self.logger.info('Cleanup for %s' % retype) |
|
644 if _type == 'etype': |
|
645 # Cleanup entities tables - Recreate indexes |
|
646 self._cleanup_entities(retype) |
|
647 elif _type == 'rtype': |
|
648 # Cleanup relations tables |
|
649 self._cleanup_relations(retype) |
|
650 self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s', |
|
651 {'e': retype}) |
|
652 # Create meta constraints (entities, is_instance_of, ...) |
|
653 self._create_metatables_constraints() |
|
654 self.commit() |
|
655 # Delete the meta data table |
|
656 for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'): |
|
657 if self._dbh.table_exists(table_name): |
|
658 self.sql('DROP TABLE %s' % table_name) |
|
659 self.commit() |
|
660 |
|
661 def insert_massive_meta_data(self, etype): |
|
662 """ Massive insertion of meta data for a given etype, based on SQL statements. |
|
663 """ |
|
664 # Push data - Use coalesce to avoid NULL (and get 0), if there is no |
|
665 # entities of this type in the entities table. |
|
666 # Meta data relations |
|
667 self.metagen_push_relation(etype, self._cnx.user.eid, 'created_by_relation') |
|
668 self.metagen_push_relation(etype, self._cnx.user.eid, 'owned_by_relation') |
|
669 self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation') |
|
670 self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation') |
|
671 self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation') |
|
672 sql = ("INSERT INTO entities (eid, type, asource, extid) " |
|
673 "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " |
|
674 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
675 % (etype, etype.lower())) |
|
676 self.sql(sql) |
|
677 |
|
678 def metagen_push_relation(self, etype, eid_to, rtype): |
|
679 sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " |
|
680 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
681 % (rtype, eid_to, etype.lower())) |
|
682 self.sql(sql) |
|
683 |
|
684 |
|
685 ### CONSTRAINTS MANAGEMENT FUNCTIONS ########################################## |
|
686 |
|
687 def get_size_constraints(schema): |
|
688 """analyzes yams ``schema`` and returns the list of size constraints. |
|
689 |
|
690 The returned value is a dictionary mapping entity types to a |
|
691 sub-dictionnaries mapping attribute names -> max size. |
|
692 """ |
|
693 size_constraints = {} |
|
694 # iterates on all entity types |
|
695 for eschema in schema.entities(): |
|
696 # for each entity type, iterates on attribute definitions |
|
697 size_constraints[eschema.type] = eschema_constraints = {} |
|
698 for rschema, aschema in eschema.attribute_definitions(): |
|
699 # for each attribute, if a size constraint is found, |
|
700 # append it to the size constraint list |
|
701 maxsize = None |
|
702 rdef = rschema.rdef(eschema, aschema) |
|
703 for constraint in rdef.constraints: |
|
704 if isinstance(constraint, SizeConstraint): |
|
705 maxsize = constraint.max |
|
706 eschema_constraints[rschema.type] = maxsize |
|
707 return size_constraints |
|
708 |
|
709 def get_default_values(schema): |
|
710 """analyzes yams ``schema`` and returns the list of default values. |
|
711 |
|
712 The returned value is a dictionary mapping entity types to a |
|
713 sub-dictionnaries mapping attribute names -> default values. |
|
714 """ |
|
715 default_values = {} |
|
716 # iterates on all entity types |
|
717 for eschema in schema.entities(): |
|
718 # for each entity type, iterates on attribute definitions |
|
719 default_values[eschema.type] = eschema_constraints = {} |
|
720 for rschema, _ in eschema.attribute_definitions(): |
|
721 # for each attribute, if a size constraint is found, |
|
722 # append it to the size constraint list |
|
723 if eschema.default(rschema.type) is not None: |
|
724 eschema_constraints[rschema.type] = eschema.default(rschema.type) |
|
725 return default_values |
|
726 |
|
727 |
|
728 class PGHelper(object): |
|
729 def __init__(self, cnx, pg_schema='public'): |
|
730 self.cnx = cnx |
|
731 # Deals with pg schema, see #3216686 |
|
732 self.pg_schema = pg_schema |
|
733 |
|
734 def application_indexes_constraints(self, tablename): |
|
735 """ Get all the indexes/constraints for a given tablename """ |
|
736 indexes = self.application_indexes(tablename) |
|
737 constraints = self.application_constraints(tablename) |
|
738 _indexes = {} |
|
739 for name, query in indexes.iteritems(): |
|
740 # Remove pkey indexes (automatically created by constraints) |
|
741 # Specific cases of primary key, see #3224079 |
|
742 if name not in constraints: |
|
743 _indexes[name] = query |
|
744 return _indexes, constraints |
|
745 |
|
746 def table_exists(self, table_name): |
|
747 sql = "SELECT * from information_schema.tables WHERE table_name=%(t)s AND table_schema=%(s)s" |
|
748 crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema}) |
|
749 res = crs.fetchall() |
|
750 if res: |
|
751 return True |
|
752 return False |
|
753 |
|
754 # def check_if_primary_key_exists_for_table(self, table_name): |
|
755 # sql = ("SELECT constraint_name FROM information_schema.table_constraints " |
|
756 # "WHERE constraint_type = 'PRIMARY KEY' AND table_name=%(t)s AND table_schema=%(s)s") |
|
757 # crs = self.cnx.system_sql(sql, {'t': table_name, 's': self.pg_schema}) |
|
758 # res = crs.fetchall() |
|
759 # if res: |
|
760 # return True |
|
761 # return False |
|
762 |
|
763 def index_query(self, name): |
|
764 """Get the request to be used to recreate the index""" |
|
765 return self.cnx.system_sql("SELECT pg_get_indexdef(c.oid) " |
|
766 "from pg_catalog.pg_class c " |
|
767 "LEFT JOIN pg_catalog.pg_namespace n " |
|
768 "ON n.oid = c.relnamespace " |
|
769 "WHERE c.relname = %(r)s AND n.nspname=%(n)s", |
|
770 {'r': name, 'n': self.pg_schema}).fetchone()[0] |
|
771 |
|
772 def constraint_query(self, name): |
|
773 """Get the request to be used to recreate the constraint""" |
|
774 return self.cnx.system_sql("SELECT pg_get_constraintdef(c.oid) " |
|
775 "from pg_catalog.pg_constraint c " |
|
776 "LEFT JOIN pg_catalog.pg_namespace n " |
|
777 "ON n.oid = c.connamespace " |
|
778 "WHERE c.conname = %(r)s AND n.nspname=%(n)s", |
|
779 {'r': name, 'n': self.pg_schema}).fetchone()[0] |
|
780 |
|
781 def application_indexes(self, tablename): |
|
782 """ Iterate over all the indexes """ |
|
783 # This SQL query (cf http://www.postgresql.org/message-id/432F450F.4080700@squiz.net) |
|
784 # aims at getting all the indexes for each table. |
|
785 sql = '''SELECT c.relname as "Name" |
|
786 FROM pg_catalog.pg_class c |
|
787 JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid |
|
788 JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid |
|
789 LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner |
|
790 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace |
|
791 WHERE c.relkind IN ('i','') |
|
792 AND c2.relname = '%s' |
|
793 AND n.nspname NOT IN ('pg_catalog', 'pg_toast') |
|
794 AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename |
|
795 indexes_list = self.cnx.system_sql(sql).fetchall() |
|
796 indexes = {} |
|
797 for name, in indexes_list: |
|
798 indexes[name] = self.index_query(name) |
|
799 return indexes |
|
800 |
|
801 def application_constraints(self, tablename): |
|
802 """ Iterate over all the constraints """ |
|
803 sql = '''SELECT i.conname as "Name" |
|
804 FROM pg_catalog.pg_class c JOIN pg_catalog.pg_constraint i |
|
805 ON i.conrelid = c.oid JOIN pg_catalog.pg_class c2 ON i.conrelid=c2.oid |
|
806 LEFT JOIN pg_catalog.pg_user u ON u.usesysid = c.relowner |
|
807 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace |
|
808 WHERE c2.relname = '%s' AND n.nspname NOT IN ('pg_catalog', 'pg_toast') |
|
809 AND pg_catalog.pg_table_is_visible(c.oid);''' % tablename |
|
810 indexes_list = self.cnx.system_sql(sql).fetchall() |
|
811 constraints = {} |
|
812 for name, in indexes_list: |
|
813 query = self.constraint_query(name) |
|
814 constraints[name] = 'ALTER TABLE %s ADD CONSTRAINT %s %s' % (tablename, name, query) |
|
815 return constraints |