104 self.slave_mode = slave_mode |
105 self.slave_mode = slave_mode |
105 self.eids_seq_range = eids_seq_range |
106 self.eids_seq_range = eids_seq_range |
106 |
107 |
107 self.logger = logging.getLogger('dataimport.massive_store') |
108 self.logger = logging.getLogger('dataimport.massive_store') |
108 self.sql = cnx.system_sql |
109 self.sql = cnx.system_sql |
109 self.default_values = get_default_values(cnx.vreg.schema) |
110 self.schema = self._cnx.vreg.schema |
|
111 self.default_values = get_default_values(self.schema) |
110 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
112 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) |
111 self._dbh = PGHelper(cnx) |
113 self._dbh = PGHelper(cnx) |
112 |
114 |
113 cnx.read_security = False |
115 cnx.read_security = False |
114 cnx.write_security = False |
116 cnx.write_security = False |
141 self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN')) |
143 self._etype_eid_idx = dict(cnx.execute('Any XN,X WHERE X is CWEType, X name XN')) |
142 |
144 |
143 # INIT FUNCTIONS ######################################################## |
145 # INIT FUNCTIONS ######################################################## |
144 |
146 |
145 def _drop_all_constraints(self): |
147 def _drop_all_constraints(self): |
146 schema = self._cnx.vreg.schema |
148 etypes_tables = ('cw_%s' % eschema.type.lower() for eschema in self.schema.entities() |
147 tables = ['cw_%s' % etype.type.lower() |
149 if not eschema.final) |
148 for etype in schema.entities() if not etype.final] |
150 rtypes_tables = ('%s_relation' % rschema.type.lower() for rschema in self.schema.relations() |
149 for rschema in schema.relations(): |
151 if rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES)) |
150 if rschema.inlined: |
152 for tablename in chain(etypes_tables, rtypes_tables, ('entities',)): |
151 continue |
|
152 elif rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES): |
|
153 tables.append('%s_relation' % rschema.type.lower()) |
|
154 tables.append('entities') |
|
155 for tablename in tables: |
|
156 self._store_and_drop_constraints(tablename) |
153 self._store_and_drop_constraints(tablename) |
157 |
154 |
158 def _store_and_drop_constraints(self, tablename): |
155 def _store_and_drop_constraints(self, tablename): |
159 # Create a table to save the constraints, it allows reloading even after crash |
156 # Create a table to save the constraints, it allows reloading even after crash |
160 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints' |
157 self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints' |
183 for etype in (etype_from, etype_to): |
180 for etype in (etype_from, etype_to): |
184 if etype and etype not in self._init_uri_eid: |
181 if etype and etype not in self._init_uri_eid: |
185 self._init_uri_eid_table(etype) |
182 self._init_uri_eid_table(etype) |
186 if rtype not in self._uri_rtypes: |
183 if rtype not in self._uri_rtypes: |
187 # Create the temporary table |
184 # Create the temporary table |
188 if not self._cnx.repo.schema.rschema(rtype).inlined: |
185 if not self.schema.rschema(rtype).inlined: |
189 self.sql('CREATE TABLE IF NOT EXISTS %(r)s_relation_iid_tmp' |
186 self.sql('CREATE TABLE IF NOT EXISTS %(r)s_relation_iid_tmp' |
190 '(uri_from character varying(%(s)s), uri_to character varying(%(s)s))' |
187 '(uri_from character varying(%(s)s), uri_to character varying(%(s)s))' |
191 % {'r': rtype, 's': self.iid_maxsize}) |
188 % {'r': rtype, 's': self.iid_maxsize}) |
192 else: |
189 else: |
193 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
190 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
227 buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data])) |
224 buf = StringIO('\n'.join(['%(uri_from)s\t%(uri_to)s' % d for d in data])) |
228 if not buf: |
225 if not buf: |
229 self.logger.info('Empty Buffer for rtype %s', rtype) |
226 self.logger.info('Empty Buffer for rtype %s', rtype) |
230 continue |
227 continue |
231 cursor = self._cnx.cnxset.cu |
228 cursor = self._cnx.cnxset.cu |
232 if not self._cnx.repo.schema.rschema(rtype).inlined: |
229 if not self.schema.rschema(rtype).inlined: |
233 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(), |
230 cursor.copy_from(buf, '%s_relation_iid_tmp' % rtype.lower(), |
234 null='NULL', columns=('uri_from', 'uri_to')) |
231 null='NULL', columns=('uri_from', 'uri_to')) |
235 else: |
232 else: |
236 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
233 self.logger.warning("inlined relation %s: cannot insert it", rtype) |
237 buf.close() |
234 buf.close() |
258 self.flush_relations() |
255 self.flush_relations() |
259 if uri_label_from and etype_from not in self._uri_eid_inserted: |
256 if uri_label_from and etype_from not in self._uri_eid_inserted: |
260 self.fill_uri_eid_table(etype_from, uri_label_from) |
257 self.fill_uri_eid_table(etype_from, uri_label_from) |
261 if uri_label_to and etype_to not in self._uri_eid_inserted: |
258 if uri_label_to and etype_to not in self._uri_eid_inserted: |
262 self.fill_uri_eid_table(etype_to, uri_label_to) |
259 self.fill_uri_eid_table(etype_to, uri_label_to) |
263 if self._cnx.repo.schema.rschema(rtype).inlined: |
260 if self.schema.rschema(rtype).inlined: |
264 self.logger.warning("Can't insert inlined relation %s", rtype) |
261 self.logger.warning("Can't insert inlined relation %s", rtype) |
265 return |
262 return |
266 if uri_label_from and uri_label_to: |
263 if uri_label_from and uri_label_to: |
267 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid |
264 sql = '''INSERT INTO %(r)s_relation (eid_from, eid_to) SELECT DISTINCT O1.eid, O2.eid |
268 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2 |
265 FROM %(r)s_relation_iid_tmp AS T, uri_eid_%(ef)s as O1, uri_eid_%(et)s as O2 |
446 # Cleanup relations tables |
443 # Cleanup relations tables |
447 for etype in self._init_uri_eid: |
444 for etype in self._init_uri_eid: |
448 self.sql('DROP TABLE uri_eid_%s' % etype.lower()) |
445 self.sql('DROP TABLE uri_eid_%s' % etype.lower()) |
449 # Remove relations tables |
446 # Remove relations tables |
450 for rtype in self._uri_rtypes: |
447 for rtype in self._uri_rtypes: |
451 if not self._cnx.repo.schema.rschema(rtype).inlined: |
448 if not self.schema.rschema(rtype).inlined: |
452 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) |
449 self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) |
453 else: |
450 else: |
454 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) |
451 self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) |
455 # Create meta constraints (entities, is_instance_of, ...) |
452 # Create meta constraints (entities, is_instance_of, ...) |
456 self._create_metatables_constraints() |
453 self._create_metatables_constraints() |