82 store.cleanup() |
82 store.cleanup() |
83 """ |
83 """ |
84 |
84 |
85 def __init__(self, cnx, autoflush_metadata=True, |
85 def __init__(self, cnx, autoflush_metadata=True, |
86 replace_sep='', commit_at_flush=True, |
86 replace_sep='', commit_at_flush=True, |
87 drop_index=True, |
|
88 pg_schema='public', |
87 pg_schema='public', |
89 iid_maxsize=1024, uri_param_name='rdf:about', |
88 iid_maxsize=1024, uri_param_name='rdf:about', |
90 eids_seq_range=10000, eids_seq_start=None, |
89 eids_seq_range=10000, eids_seq_start=None, |
91 on_commit_callback=None, on_rollback_callback=None, |
90 on_commit_callback=None, on_rollback_callback=None, |
92 slave_mode=False, |
91 slave_mode=False, |
98 Automatically flush the metadata after |
97 Automatically flush the metadata after |
99 each flush() |
98 each flush() |
100 - replace_sep: String. Replace separator used for |
99 - replace_sep: String. Replace separator used for |
101 (COPY FROM) buffer creation. |
100 (COPY FROM) buffer creation. |
102 - commit_at_flush: Boolean. Commit after each flush(). |
101 - commit_at_flush: Boolean. Commit after each flush(). |
103 - drop_index: Boolean. Drop SQL index before COPY FROM |
|
104 - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time |
102 - eids_seq_range: Int. Range of the eids_seq_range to be fetched each time |
105 by the store (default is 10000). |
103 by the store (default is 10000). |
106 If None, the sequence eids is attached to each entity tables |
104 If None, the sequence eids is attached to each entity tables |
107 (backward compatibility with the 0.2.0). |
105 (backward compatibility with the 0.2.0). |
108 - eids_seq_start: Int. Set the eids sequence value (if None, nothing is done). |
106 - eids_seq_start: Int. Set the eids sequence value (if None, nothing is done). |
127 } |
125 } |
128 self.sql = self._cnx.system_sql |
126 self.sql = self._cnx.system_sql |
129 self.logger = logging.getLogger('dataio.massiveimport') |
127 self.logger = logging.getLogger('dataio.massiveimport') |
130 self.autoflush_metadata = autoflush_metadata |
128 self.autoflush_metadata = autoflush_metadata |
131 self.replace_sep = replace_sep |
129 self.replace_sep = replace_sep |
132 self.drop_index = drop_index |
|
133 self.slave_mode = slave_mode |
130 self.slave_mode = slave_mode |
134 self.size_constraints = get_size_constraints(cnx.vreg.schema) |
131 self.size_constraints = get_size_constraints(cnx.vreg.schema) |
135 self.default_values = get_default_values(cnx.vreg.schema) |
132 self.default_values = get_default_values(cnx.vreg.schema) |
136 self._dbh = PGHelper(self._cnx, pg_schema or 'public') |
133 self._dbh = PGHelper(self._cnx, pg_schema or 'public') |
137 self._data_entities = defaultdict(list) |
134 self._data_entities = defaultdict(list) |
152 self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( |
149 self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( |
153 'entities_id_seq', initial_value=self._eids_seq_start + 1)) |
150 'entities_id_seq', initial_value=self._eids_seq_start + 1)) |
154 cnx.commit() |
151 cnx.commit() |
155 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
152 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
156 # recreate then when self.finish() is called |
153 # recreate then when self.finish() is called |
157 if not self.slave_mode and self.drop_index: |
154 if not self.slave_mode: |
158 self._drop_metatables_constraints() |
155 self._drop_metatables_constraints() |
159 if source is None: |
156 if source is None: |
160 source = cnx.repo.system_source |
157 source = cnx.repo.system_source |
161 self.source = source |
158 self.source = source |
162 self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN')) |
159 self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN')) |
350 """ Create all the constraints for the meta data""" |
347 """ Create all the constraints for the meta data""" |
351 for tablename in ('entities', |
348 for tablename in ('entities', |
352 'created_by_relation', 'owned_by_relation', |
349 'created_by_relation', 'owned_by_relation', |
353 'is_instance_of_relation', 'is_relation'): |
350 'is_instance_of_relation', 'is_relation'): |
354 # Indexes and constraints |
351 # Indexes and constraints |
355 if self.drop_index: |
352 self.reapply_constraint_index(tablename) |
356 self.reapply_constraint_index(tablename) |
|
357 |
353 |
358 def init_relation_table(self, rtype): |
354 def init_relation_table(self, rtype): |
359 """ Get and remove all indexes for performance sake """ |
355 """ Get and remove all indexes for performance sake """ |
360 # Create temporary table |
356 # Create temporary table |
361 if not self.slave_mode and rtype not in self._initialized['rtypes']: |
357 if not self.slave_mode and rtype not in self._initialized['rtypes']: |
362 sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower() |
358 sql = "CREATE TABLE %s_relation_tmp (eid_from integer, eid_to integer)" % rtype.lower() |
363 self.sql(sql) |
359 self.sql(sql) |
364 if self.drop_index: |
360 # Drop indexes and constraints |
365 # Drop indexes and constraints |
361 tablename = '%s_relation' % rtype.lower() |
366 tablename = '%s_relation' % rtype.lower() |
362 self.drop_and_store_indexes_constraints(tablename) |
367 self.drop_and_store_indexes_constraints(tablename) |
|
368 # Push the etype in the initialized table for easier restart |
363 # Push the etype in the initialized table for easier restart |
369 self.init_create_initialized_table() |
364 self.init_create_initialized_table() |
370 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)' |
365 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)' |
371 self.sql(sql, {'e': rtype, 't': 'rtype'}) |
366 self.sql(sql, {'e': rtype, 't': 'rtype'}) |
372 # Mark rtype as "initialized" for faster check |
367 # Mark rtype as "initialized" for faster check |
390 # Eids are directly set by the entities_id_seq. |
385 # Eids are directly set by the entities_id_seq. |
391 # We attach this sequence to all the created etypes. |
386 # We attach this sequence to all the created etypes. |
392 sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid " |
387 sql = ("ALTER TABLE cw_%s ALTER COLUMN cw_eid " |
393 "SET DEFAULT nextval('entities_id_seq')" % etype.lower()) |
388 "SET DEFAULT nextval('entities_id_seq')" % etype.lower()) |
394 self.sql(sql) |
389 self.sql(sql) |
395 if self.drop_index: |
390 # Drop indexes and constraints |
396 # Drop indexes and constraints |
391 tablename = 'cw_%s' % etype.lower() |
397 tablename = 'cw_%s' % etype.lower() |
392 self.drop_and_store_indexes_constraints(tablename) |
398 self.drop_and_store_indexes_constraints(tablename) |
|
399 # Push the etype in the initialized table for easier restart |
393 # Push the etype in the initialized table for easier restart |
400 self.init_create_initialized_table() |
394 self.init_create_initialized_table() |
401 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)' |
395 sql = 'INSERT INTO dataio_initialized VALUES (%(e)s, %(t)s)' |
402 self.sql(sql, {'e': etype, 't': 'etype'}) |
396 self.sql(sql, {'e': etype, 't': 'etype'}) |
403 # Mark etype as "initialized" for faster check |
397 # Mark etype as "initialized" for faster check |
624 if self._eids_seq_range is None: |
618 if self._eids_seq_range is None: |
625 # Remove DEFAULT eids sequence if added |
619 # Remove DEFAULT eids sequence if added |
626 sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower() |
620 sql = 'ALTER TABLE cw_%s ALTER COLUMN cw_eid DROP DEFAULT;' % etype.lower() |
627 self.sql(sql) |
621 self.sql(sql) |
628 # Create indexes and constraints |
622 # Create indexes and constraints |
629 if self.drop_index: |
623 tablename = SQL_PREFIX + etype.lower() |
630 tablename = SQL_PREFIX + etype.lower() |
624 self.reapply_constraint_index(tablename) |
631 self.reapply_constraint_index(tablename) |
|
632 |
625 |
633 def _cleanup_relations(self, rtype): |
626 def _cleanup_relations(self, rtype): |
634 """ Cleanup rtype table """ |
627 """ Cleanup rtype table """ |
635 # Push into relation table while removing duplicate |
628 # Push into relation table while removing duplicate |
636 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT |
629 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT |
640 self.sql(sql) |
633 self.sql(sql) |
641 # Drop temporary relation table |
634 # Drop temporary relation table |
642 sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
635 sql = ('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
643 self.sql(sql) |
636 self.sql(sql) |
644 # Create indexes and constraints |
637 # Create indexes and constraints |
645 if self.drop_index: |
638 tablename = '%s_relation' % rtype.lower() |
646 tablename = '%s_relation' % rtype.lower() |
639 self.reapply_constraint_index(tablename) |
647 self.reapply_constraint_index(tablename) |
|
648 |
640 |
649 def insert_massive_meta_data(self, etype): |
641 def insert_massive_meta_data(self, etype): |
650 """ Massive insertion of meta data for a given etype, based on SQL statements. |
642 """ Massive insertion of meta data for a given etype, based on SQL statements. |
651 """ |
643 """ |
652 # Push data - Use coalesce to avoid NULL (and get 0), if there is no |
644 # Push data - Use coalesce to avoid NULL (and get 0), if there is no |