cubicweb/dataimport/massive_store.py
changeset 11320 78da04c853dc
parent 11319 fe90d07f3afa
child 11321 fab543f542ac
equal deleted inserted replaced
11319:fe90d07f3afa 11320:78da04c853dc
   113         self._dbh = PGHelper(cnx)
   113         self._dbh = PGHelper(cnx)
   114 
   114 
   115         cnx.read_security = False
   115         cnx.read_security = False
   116         cnx.write_security = False
   116         cnx.write_security = False
   117 
   117 
   118         self._data_uri_relations = defaultdict(list)
       
   119         self._data_entities = defaultdict(list)
   118         self._data_entities = defaultdict(list)
   120         self._data_relations = defaultdict(list)
   119         self._data_relations = defaultdict(list)
       
   120         self._initialized = set()
       
   121         # uri handling
       
   122         self._data_uri_relations = defaultdict(list)
   121         # etypes for which we have a uri_eid_%(etype)s table
   123         # etypes for which we have a uri_eid_%(etype)s table
   122         self._init_uri_eid = set()
   124         self._init_uri_eid = set()
   123         # etypes for which we have a uri_eid_%(e)s_idx index
   125         # etypes for which we have a uri_eid_%(e)s_idx index
   124         self._uri_eid_inserted = set()
   126         self._uri_eid_inserted = set()
   125         # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
   127         # set of rtypes for which we have a %(rtype)s_relation_iid_tmp table
   126         self._uri_rtypes = set()
   128         self._uri_rtypes = set()
   127         # set of etypes/rtypes whose tables are created
       
   128         self._initialized = set()
       
   129 
   129 
   130         self._now = datetime.now(pytz.utc)
   130         self._now = datetime.now(pytz.utc)
   131         self._default_cwuri = make_uid('_auto_generated')
   131         self._default_cwuri = make_uid('_auto_generated')
   132 
   132 
   133         if not self.slave_mode:
   133         if not self.slave_mode:
   137             self._drop_metatables_constraints()
   137             self._drop_metatables_constraints()
   138         if source is None:
   138         if source is None:
   139             source = cnx.repo.system_source
   139             source = cnx.repo.system_source
   140         self.source = source
   140         self.source = source
   141 
   141 
   142     # INIT FUNCTIONS ########################################################
   142     # URI related things #######################################################
   143 
       
   144     def _drop_all_constraints(self):
       
   145         etypes_tables = ('cw_%s' % eschema.type.lower() for eschema in self.schema.entities()
       
   146                          if not eschema.final)
       
   147         rtypes_tables = ('%s_relation' % rschema.type.lower() for rschema in self.schema.relations()
       
   148                          if rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES))
       
   149         for tablename in chain(etypes_tables, rtypes_tables, ('entities',)):
       
   150             self._store_and_drop_constraints(tablename)
       
   151 
       
   152     def _store_and_drop_constraints(self, tablename):
       
   153         # Create a table to save the constraints, it allows reloading even after crash
       
   154         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
       
   155                  '(origtable text, query text, type varchar(256))')
       
   156         constraints = self._dbh.table_constraints(tablename)
       
   157         for name, query in constraints.items():
       
   158             self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)',
       
   159                      {'e': tablename, 'c': query, 't': 'constraint'})
       
   160             self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name))
       
   161 
       
   162     def reapply_all_constraints(self):
       
   163         if not self._dbh.table_exists('cwmassive_constraints'):
       
   164             self.logger.info('The table cwmassive_constraints does not exist')
       
   165             return
       
   166         cu = self.sql("SELECT query FROM cwmassive_constraints WHERE type='constraint'")
       
   167         for query, in cu.fetchall():
       
   168             self.sql(query)
       
   169             self.sql("DELETE FROM cwmassive_constraints WHERE type='constraint' AND query=%(q)s",
       
   170                      {'q': query})
       
   171 
   143 
   172     def init_rtype_table(self, etype_from, rtype, etype_to):
   144     def init_rtype_table(self, etype_from, rtype, etype_to):
   173         """ Build temporary table for standard rtype """
   145         """ Build temporary table for standard rtype """
   174         # Create an uri_eid table for each etype for a better
   146         # Create an uri_eid table for each etype for a better control of which etype is concerned by
   175         # control of which etype is concerned by a particular
   147         # a particular possibly multivalued relation.
   176         # possibly multivalued relation.
       
   177         for etype in (etype_from, etype_to):
   148         for etype in (etype_from, etype_to):
   178             if etype and etype not in self._init_uri_eid:
   149             if etype and etype not in self._init_uri_eid:
   179                 self._init_uri_eid.add(etype)
   150                 self._init_uri_eid.add(etype)
   180                 self.sql('CREATE TABLE IF NOT EXISTS uri_eid_%(e)s'
   151                 self.sql('CREATE TABLE IF NOT EXISTS uri_eid_%(e)s'
   181                          '(uri character varying(%(size)s), eid integer)'
   152                          '(uri character varying(%(size)s), eid integer)'
   188                          % {'r': rtype, 's': self.iid_maxsize})
   159                          % {'r': rtype, 's': self.iid_maxsize})
   189                 self._uri_rtypes.add(rtype)
   160                 self._uri_rtypes.add(rtype)
   190             else:
   161             else:
   191                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   162                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   192 
   163 
   193     # RELATE FUNCTION #######################################################
       
   194 
       
   195     def relate_by_iid(self, iid_from, rtype, iid_to):
   164     def relate_by_iid(self, iid_from, rtype, iid_to):
   196         """Add new relation based on the internal id (iid)
   165         """Add new relation based on the internal id (iid)
   197         of the entities (not the eid)"""
   166         of the entities (not the eid)"""
   198         # Push data
   167         # Push data
   199         if isinstance(iid_from, unicode):
   168         if isinstance(iid_from, unicode):
   200             iid_from = iid_from.encode('utf-8')
   169             iid_from = iid_from.encode('utf-8')
   201         if isinstance(iid_to, unicode):
   170         if isinstance(iid_to, unicode):
   202             iid_to = iid_to.encode('utf-8')
   171             iid_to = iid_to.encode('utf-8')
   203         self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})
   172         self._data_uri_relations[rtype].append({'uri_from': iid_from, 'uri_to': iid_to})
   204 
       
   205     # FLUSH FUNCTIONS #######################################################
       
   206 
   173 
   207     def flush_relations(self):
   174     def flush_relations(self):
   208         """ Flush the relations data
   175         """ Flush the relations data
   209         """
   176         """
   210         for rtype, data in self._data_uri_relations.items():
   177         for rtype, data in self._data_uri_relations.items():
   278         except Exception as ex:
   245         except Exception as ex:
   279             self.logger.error("Can't insert relation %s: %s", rtype, ex)
   246             self.logger.error("Can't insert relation %s: %s", rtype, ex)
   280 
   247 
   281     # SQL UTILITIES #########################################################
   248     # SQL UTILITIES #########################################################
   282 
   249 
       
   250     def _drop_all_constraints(self):
       
   251         etypes_tables = ('cw_%s' % eschema.type.lower() for eschema in self.schema.entities()
       
   252                          if not eschema.final)
       
   253         rtypes_tables = ('%s_relation' % rschema.type.lower() for rschema in self.schema.relations()
       
   254                          if rschema_has_table(rschema, skip_relations=PURE_VIRTUAL_RTYPES))
       
   255         # Create a table to save the constraints, it allows reloading even after crash
       
   256         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
       
   257                  '(origtable text, query text, type varchar(256))')
       
   258         for tablename in chain(etypes_tables, rtypes_tables, ('entities',)):
       
   259             constraints = self._dbh.table_constraints(tablename)
       
   260             for name, query in constraints.items():
       
   261                 self.sql('INSERT INTO cwmassive_constraints VALUES (%(e)s, %(c)s, %(t)s)',
       
   262                          {'e': tablename, 'c': query, 't': 'constraint'})
       
   263                 self.sql('ALTER TABLE %s DROP CONSTRAINT %s' % (tablename, name))
       
   264 
       
   265     def _reapply_all_constraints(self):
       
   266         if not self._dbh.table_exists('cwmassive_constraints'):
       
   267             self.logger.info('The table cwmassive_constraints does not exist')
       
   268             return
       
   269         cu = self.sql("SELECT query FROM cwmassive_constraints WHERE type='constraint'")
       
   270         for query, in cu.fetchall():
       
   271             self.sql(query)
       
   272             self.sql("DELETE FROM cwmassive_constraints WHERE type='constraint' AND query=%(q)s",
       
   273                      {'q': query})
       
   274 
   283     def drop_and_store_indexes(self, tablename):
   275     def drop_and_store_indexes(self, tablename):
   284         """Drop indexes and constraints"""
   276         """Drop indexes and constraints"""
   285         # Create a table to save the constraints, it allows reloading even after crash
   277         # Create a table to save the constraints, it allows reloading even after crash
   286         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
   278         self.sql('CREATE TABLE IF NOT EXISTS cwmassive_constraints'
   287                  '(origtable text, query text, type varchar(256))')
   279                  '(origtable text, query text, type varchar(256))')
   436                 elif _type == 'rtype':
   428                 elif _type == 'rtype':
   437                     # Cleanup relations tables
   429                     # Cleanup relations tables
   438                     self._cleanup_relations(retype)
   430                     self._cleanup_relations(retype)
   439                 self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
   431                 self.sql('DELETE FROM cwmassive_initialized WHERE retype = %(e)s',
   440                          {'e': retype})
   432                          {'e': retype})
   441         self.reapply_all_constraints()
   433         self._reapply_all_constraints()
   442         # Delete the meta data table
   434         # Delete the meta data table
   443         for table_name in ('cwmassive_initialized', 'cwmassive_constraints', 'cwmassive_metadata'):
   435         for table_name in ('cwmassive_initialized', 'cwmassive_constraints', 'cwmassive_metadata'):
   444             self.sql('DROP TABLE IF EXISTS %s' % table_name)
   436             self.sql('DROP TABLE IF EXISTS %s' % table_name)
   445         self.commit()
   437         self.commit()
   446 
   438