104 while True: |
104 while True: |
105 last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range) |
105 last_eid = self._cnx.repo.system_source.create_eid(self._cnx, self.eids_seq_range) |
106 for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1): |
106 for eid in range(last_eid - self.eids_seq_range + 1, last_eid + 1): |
107 yield eid |
107 yield eid |
108 |
108 |
|
109 # master/slaves specific API |
|
110 |
|
111 def master_init_etype(self, etype): |
|
112 """Initialize database for insertion of entities of the given etype. |
|
113 |
|
114 This is expected to be called once, usually by the master store in master/slaves |
|
115 configuration. |
|
116 """ |
|
117 self._drop_metadata_constraints_if_necessary() |
|
118 tablename = 'cw_%s' % etype.lower() |
|
119 self._dbh.drop_constraints(tablename) |
|
120 self._dbh.drop_indexes(tablename) |
|
121 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
|
122 '(retype text, type varchar(128))') |
|
123 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) |
|
124 self.sql('ALTER TABLE %s ADD COLUMN extid VARCHAR(256)' % tablename) |
|
125 |
|
126 def master_init_rtype(self, rtype): |
|
127 """Initialize database for insertion of relation of the given rtype. |
|
128 |
|
129 This is expected to be called once, usually by the master store in master/slaves |
|
130 configuration. |
|
131 """ |
|
132 assert not self._cnx.vreg.schema.rschema(rtype).inlined |
|
133 self._drop_metadata_constraints_if_necessary() |
|
134 tablename = '%s_relation' % rtype.lower() |
|
135 self._dbh.drop_constraints(tablename) |
|
136 self._dbh.drop_indexes(tablename) |
|
137 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
|
138 '(retype text, type varchar(128))') |
|
139 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype}) |
|
140 self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' % tablename) |
|
141 |
|
142 def master_insert_etype_metadata(self, etype): |
|
143 """Massive insertion of meta data for a given etype, based on SQL statements. |
|
144 |
|
145 In master/slabes configuration, you'll usually want to call it from the master once all |
|
146 slaves have finished (at least slaves won't call it automatically, so that's your |
|
147 reponsability). |
|
148 """ |
|
149 # insert standard metadata relations |
|
150 for rtype, eid in self.metagen.base_etype_rels(etype).items(): |
|
151 self._insert_meta_relation(etype, eid, '%s_relation' % rtype) |
|
152 # insert cw_source, is and is_instance_of relations (normally handled by the system source) |
|
153 self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation') |
|
154 eschema = self.schema[etype] |
|
155 self._insert_meta_relation(etype, eschema.eid, 'is_relation') |
|
156 for parent_eschema in chain(eschema.ancestors(), [eschema]): |
|
157 self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') |
|
158 # finally insert records into the entities table |
|
159 self.sql("INSERT INTO entities (eid, type, extid) " |
|
160 "SELECT cw_eid, '%s', extid FROM cw_%s " |
|
161 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
162 % (etype, etype.lower())) |
|
163 |
109 # SQL utilities ######################################################### |
164 # SQL utilities ######################################################### |
110 |
165 |
111 def _drop_metadata_constraints_if_necessary(self): |
166 def _drop_metadata_constraints_if_necessary(self): |
112 """Drop constraints and indexes for the metadata tables if necessary.""" |
167 """Drop constraints and indexes for the metadata tables if necessary.""" |
113 if not self._constraints_dropped: |
168 if not self._constraints_dropped: |
140 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
195 """Given an entity type, attributes and inlined relations, returns the inserted entity's |
141 eid. |
196 eid. |
142 """ |
197 """ |
143 if not self.slave_mode and etype not in self._initialized: |
198 if not self.slave_mode and etype not in self._initialized: |
144 self._initialized.add(etype) |
199 self._initialized.add(etype) |
145 self._drop_metadata_constraints_if_necessary() |
200 self.master_init_etype(etype) |
146 tablename = 'cw_%s' % etype.lower() |
|
147 self._dbh.drop_constraints(tablename) |
|
148 self._dbh.drop_indexes(tablename) |
|
149 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
|
150 '(retype text, type varchar(128))') |
|
151 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'etype')", {'e': etype}) |
|
152 attrs = self.metagen.base_etype_attrs(etype) |
201 attrs = self.metagen.base_etype_attrs(etype) |
153 data = copy(attrs) # base_etype_attrs is @cached, a copy is necessary |
202 data = copy(attrs) # base_etype_attrs is @cached, a copy is necessary |
154 data.update(kwargs) |
203 data.update(kwargs) |
155 if 'eid' not in data: |
204 if 'eid' not in data: |
156 # If eid is not given and the eids sequence is set, use the value from the sequence |
205 # If eid is not given and the eids sequence is set, use the value from the sequence |
169 and ``eid_to``. |
218 and ``eid_to``. |
170 |
219 |
171 Relation must not be inlined. |
220 Relation must not be inlined. |
172 """ |
221 """ |
173 if not self.slave_mode and rtype not in self._initialized: |
222 if not self.slave_mode and rtype not in self._initialized: |
174 assert not self._cnx.vreg.schema.rschema(rtype).inlined |
|
175 self._initialized.add(rtype) |
223 self._initialized.add(rtype) |
176 self._drop_metadata_constraints_if_necessary() |
224 self.master_init_rtype(rtype) |
177 tablename = '%s_relation' % rtype.lower() |
|
178 self._dbh.drop_constraints(tablename) |
|
179 self._dbh.drop_indexes(tablename) |
|
180 self.sql('CREATE TABLE %s_tmp (eid_from integer, eid_to integer)' |
|
181 % tablename) |
|
182 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_initialized' |
|
183 '(retype text, type varchar(128))') |
|
184 self.sql("INSERT INTO cwmassive_initialized VALUES (%(e)s, 'rtype')", {'e': rtype}) |
|
185 self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to}) |
225 self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to}) |
186 |
226 |
187 def flush(self): |
227 def flush(self): |
188 """Flush the data""" |
228 """Flush the data""" |
189 self.flush_entities() |
229 self.flush_entities() |
274 except Exception as exc: |
314 except Exception as exc: |
275 self.on_rollback(exc, etype, data) |
315 self.on_rollback(exc, etype, data) |
276 # Clear data cache |
316 # Clear data cache |
277 self._data_entities[etype] = [] |
317 self._data_entities[etype] = [] |
278 if not self.slave_mode: |
318 if not self.slave_mode: |
279 self._insert_etype_metadata(etype) |
319 self.master_insert_etype_metadata(etype) |
280 |
320 |
281 def _cleanup_relations(self, rtype): |
321 def _cleanup_relations(self, rtype): |
282 """ Cleanup rtype table """ |
322 """ Cleanup rtype table """ |
283 # Push into relation table while removing duplicate |
323 # Push into relation table while removing duplicate |
284 self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT ' |
324 self.sql('INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT ' |
285 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T ' |
325 'T.eid_from, T.eid_to FROM %(r)s_relation_tmp AS T ' |
286 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE ' |
326 'WHERE NOT EXISTS (SELECT 1 FROM %(r)s_relation AS TT WHERE ' |
287 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype}) |
327 'TT.eid_from=T.eid_from AND TT.eid_to=T.eid_to);' % {'r': rtype}) |
288 # Drop temporary relation table |
328 # Drop temporary relation table |
289 self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
329 self.sql('DROP TABLE %(r)s_relation_tmp' % {'r': rtype.lower()}) |
290 |
|
291 def _insert_etype_metadata(self, etype): |
|
292 """Massive insertion of meta data for a given etype, based on SQL statements. |
|
293 """ |
|
294 # insert standard metadata relations |
|
295 for rtype, eid in self.metagen.base_etype_rels(etype).items(): |
|
296 self._insert_meta_relation(etype, eid, '%s_relation' % rtype) |
|
297 # insert cw_source, is and is_instance_of relations (normally handled by the system source) |
|
298 self._insert_meta_relation(etype, self.metagen.source.eid, 'cw_source_relation') |
|
299 eschema = self.schema[etype] |
|
300 self._insert_meta_relation(etype, eschema.eid, 'is_relation') |
|
301 for parent_eschema in chain(eschema.ancestors(), [eschema]): |
|
302 self._insert_meta_relation(etype, parent_eschema.eid, 'is_instance_of_relation') |
|
303 # finally insert records into the entities table |
|
304 self.sql("INSERT INTO entities (eid, type) " |
|
305 "SELECT cw_eid, '%s' FROM cw_%s " |
|
306 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
|
307 % (etype, etype.lower())) |
|
308 |
330 |
309 def _insert_meta_relation(self, etype, eid_to, rtype): |
331 def _insert_meta_relation(self, etype, eid_to, rtype): |
310 self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " |
332 self.sql("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s " |
311 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
333 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)" |
312 % (rtype, eid_to, etype.lower())) |
334 % (rtype, eid_to, etype.lower())) |