16 # |
16 # |
17 # You should have received a copy of the GNU Lesser General Public License along |
17 # You should have received a copy of the GNU Lesser General Public License along |
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
19 |
19 |
20 import logging |
20 import logging |
21 from datetime import datetime |
21 from copy import copy |
22 from collections import defaultdict |
22 from collections import defaultdict |
23 from io import StringIO |
23 from io import StringIO |
24 from itertools import chain |
24 from itertools import chain |
25 |
25 |
26 from six.moves import range |
26 from six.moves import range |
27 |
27 |
28 import pytz |
|
29 |
|
30 from yams.constraints import SizeConstraint |
28 from yams.constraints import SizeConstraint |
31 |
29 |
32 from cubicweb.schema import PURE_VIRTUAL_RTYPES |
30 from cubicweb.schema import PURE_VIRTUAL_RTYPES |
33 from cubicweb.server.schema2sql import rschema_has_table |
31 from cubicweb.server.schema2sql import rschema_has_table |
34 from cubicweb.server.sqlutils import SQL_PREFIX |
|
35 from cubicweb.dataimport import stores, pgstore |
32 from cubicweb.dataimport import stores, pgstore |
36 from cubicweb.utils import make_uid |
|
37 |
33 |
38 |
34 |
39 class MassiveObjectStore(stores.RQLObjectStore): |
35 class MassiveObjectStore(stores.RQLObjectStore): |
40 """ |
36 """ |
41 Store for massive import of data, with delayed insertion of meta data. |
37 Store for massive import of data, with delayed insertion of meta data. |
90 iid_maxsize = 1024 |
86 iid_maxsize = 1024 |
91 |
87 |
92 def __init__(self, cnx, |
88 def __init__(self, cnx, |
93 on_commit_callback=None, on_rollback_callback=None, |
89 on_commit_callback=None, on_rollback_callback=None, |
94 slave_mode=False, |
90 slave_mode=False, |
95 source=None, |
91 eids_seq_range=10000, |
96 eids_seq_range=10000): |
92 metagen=None): |
97 """ Create a MassiveObject store, with the following attributes: |
93 """ Create a MassiveObject store, with the following attributes: |
98 |
94 |
99 - cnx: CubicWeb cnx |
95 - cnx: CubicWeb cnx |
100 - eids_seq_range: size of eid range reserved by the store for each batch |
96 - eids_seq_range: size of eid range reserved by the store for each batch |
101 """ |
97 """ |
102 super(MassiveObjectStore, self).__init__(cnx) |
98 super(MassiveObjectStore, self).__init__(cnx) |
103 self.on_commit_callback = on_commit_callback |
99 self.on_commit_callback = on_commit_callback |
104 self.on_rollback_callback = on_rollback_callback |
100 self.on_rollback_callback = on_rollback_callback |
105 self.slave_mode = slave_mode |
101 self.slave_mode = slave_mode |
106 self.eids_seq_range = eids_seq_range |
102 self.eids_seq_range = eids_seq_range |
|
103 if metagen is None: |
|
104 metagen = stores.MetadataGenerator(cnx) |
|
105 self.metagen = metagen |
107 |
106 |
108 self.logger = logging.getLogger('dataimport.massive_store') |
107 self.logger = logging.getLogger('dataimport.massive_store') |
109 self.sql = cnx.system_sql |
108 self.sql = cnx.system_sql |
110 self.schema = self._cnx.vreg.schema |
109 self.schema = self._cnx.vreg.schema |
111 self.default_values = get_default_values(self.schema) |
110 self.default_values = get_default_values(self.schema) |
125 # etypes for which we have a uri_eid_%(e)s_idx index |
124 # etypes for which we have a uri_eid_%(e)s_idx index |
126 self._uri_eid_inserted = set() |
125 self._uri_eid_inserted = set() |
127 # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table |
126 # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table |
128 self._uri_rtypes = set() |
127 self._uri_rtypes = set() |
129 |
128 |
130 self._now = datetime.now(pytz.utc) |
|
131 self._default_cwuri = make_uid('_auto_generated') |
|
132 |
|
133 if not self.slave_mode: |
129 if not self.slave_mode: |
134 # drop constraint and metadata table, they will be recreated when self.finish() is |
130 # drop constraint and metadata table, they will be recreated when self.finish() is |
135 # called |
131 # called |
136 self._drop_all_constraints() |
132 self._drop_all_constraints() |
137 self._drop_metatables_constraints() |
133 self._drop_metatables_constraints() |
138 if source is None: |
134 |
139 source = cnx.repo.system_source |
135 def _get_eid_gen(self): |
140 self.source = source |
136 """ Function getting the next eid. This is done by preselecting |
|
137 a given number of eids from the 'entities_id_seq', and then |
|
138 storing them""" |
|
139 while True: |
|
140 last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range) |
|
141 for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1): |
|
142 yield eid |
141 |
143 |
142 # URI related things ####################################################### |
144 # URI related things ####################################################### |
143 |
145 |
144 def init_rtype_table(self, etype_from, rtype, etype_to): |
146 def init_rtype_table(self, etype_from, rtype, etype_to): |
145 """ Build temporary table for standard rtype """ |
147 """ Build temporary table for standard rtype """ |
265 def restart_eid_sequence(self, start_eid): |
267 def restart_eid_sequence(self, start_eid): |
266 self.sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( |
268 self.sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( |
267 'entities_id_seq', initial_value=start_eid)) |
269 'entities_id_seq', initial_value=start_eid)) |
268 self._cnx.commit() |
270 self._cnx.commit() |
269 |
271 |
270 # ENTITIES CREATION ##################################################### |
|
271 |
|
272 def _get_eid_gen(self): |
|
273 """ Function getting the next eid. This is done by preselecting |
|
274 a given number of eids from the 'entities_id_seq', and then |
|
275 storing them""" |
|
276 while True: |
|
277 last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range) |
|
278 for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1): |
|
279 yield eid |
|
280 |
|
281 def _apply_default_values(self, etype, kwargs): |
|
282 """Apply the default values for a given etype, attribute and value.""" |
|
283 default_values = self.default_values[etype] |
|
284 missing_keys = set(default_values) - set(kwargs) |
|
285 kwargs.update((key, default_values[key]) for key in missing_keys) |
|
286 |
|
287 # store api ################################################################ |
272 # store api ################################################################ |
288 |
273 |
289 def prepare_insert_entity(self, etype, **kwargs): |
274 def prepare_insert_entity(self, etype, **kwargs): |
290 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
275 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
291 eid. |
276 eid. |
294 self._initialized.add(etype) |
279 self._initialized.add(etype) |
295 self._dbh.drop_indexes('cw_%s' % etype.lower()) |
280 self._dbh.drop_indexes('cw_%s' % etype.lower()) |
296 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
281 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
297 '(retype text, type varchar(128))') |
282 '(retype text, type varchar(128))') |
298 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) |
283 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) |
299 # Add meta data if not given |
284 attrs = self.metagen.base_etype_attrs(etype) |
300 if 'modification_date' not in kwargs: |
285 data = copy(attrs) # base_etype_attrs is @cached, a copy is necessary |
301 kwargs['modification_date'] = self._now |
286 data.update(kwargs) |
302 if 'creation_date' not in kwargs: |
287 if 'eid' not in data: |
303 kwargs['creation_date'] = self._now |
288 # If eid is not given and the eids sequence is set, use the value from the sequence |
304 if 'cwuri' not in kwargs: |
289 eid = self.get_next_eid() |
305 kwargs['cwuri'] = self._default_cwuri + str(self._count_cwuri) |
290 data['eid'] = eid |
306 self._count_cwuri += 1 |
291 # XXX default values could be set once for all in base entity |
307 if 'eid' not in kwargs: |
292 default_values = self.default_values[etype] |
308 # If eid is not given and the eids sequence is set, |
293 missing_keys = set(default_values) - set(data) |
309 # use the value from the sequence |
294 data.update((key, default_values[key]) for key in missing_keys) |
310 kwargs['eid'] = self.get_next_eid() |
295 self.metagen.init_entity_attrs(etype, data['eid'], data) |
311 self._apply_default_values(etype, kwargs) |
296 self._data_entities[etype].append(data) |
312 self._data_entities[etype].append(kwargs) |
297 return data['eid'] |
313 return kwargs.get('eid') |
|
314 |
298 |
315 def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs): |
299 def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs): |
316 """Insert into the database a relation ``rtype`` between entities with eids ``eid_from`` |
300 """Insert into the database a relation ``rtype`` between entities with eids ``eid_from`` |
317 and ``eid_to``. |
301 and ``eid_to``. |
318 """ |
302 """ |
464 self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
448 self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
465 |
449 |
466 def insert_massive_metadata(self, etype): |
450 def insert_massive_metadata(self, etype): |
467 """ Massive insertion of meta data for a given etype, based on SQL statements. |
451 """ Massive insertion of meta data for a given etype, based on SQL statements. |
468 """ |
452 """ |
469 self._insert_meta_relation(etype, self._cnx.user.eid, 'created_by_relation') |
453 # insert standard metadata relations |
470 self._insert_meta_relation(etype, self._cnx.user.eid, 'owned_by_relation') |
454 for rtype, eid in self.metagen.base_etype_rels(etype).items(): |
471 self._insert_meta_relation(etype, self.source.eid, 'cw_source_relation') |
455 self._insert_meta_relation(etype, eid, '%s_relation' % rtype) |
472 eschema = self.schema[etype].eid |
456 # insert cw_source, is and is_instance_of relations (normally handled by the system source) |
|
457 self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation') |
|
458 eschema = self.schema[etype] |
473 self._insert_meta_relation(etype, eschema.eid, 'is_relation') |
459 self._insert_meta_relation(etype, eschema.eid, 'is_relation') |
474 for parent_eschema in eschema.ancestors() + [eschema]: |
460 for parent_eschema in chain(eschema.ancestors(), [eschema]): |
475 self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') |
461 self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') |
|
462 # finally insert records into the entities table |
476 self.sql("INSERT INTO entities (eid, type, asource, extid) " |
463 self.sql("INSERT INTO entities (eid, type, asource, extid) " |
477 "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " |
464 "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s " |
478 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
465 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
479 % (etype, etype.lower())) |
466 % (etype, etype.lower())) |
480 |
467 |