cubicweb/dataimport/massive_store.py
changeset 11792 f1911a4638af
parent 11791 20555214576b
child 11802 2f885861cb84
equal deleted inserted replaced
11791:20555214576b 11792:f1911a4638af
    61 
    61 
    62     Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself
    62     Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself
    63     if desired.
    63     if desired.
    64     """
    64     """
    65 
    65 
    66     def __init__(self, cnx,
    66     def __init__(self, cnx, slave_mode=False, eids_seq_range=10000, metagen=None):
    67                  on_commit_callback=None, on_rollback_callback=None,
       
    68                  slave_mode=False,
       
    69                  eids_seq_range=10000,
       
    70                  metagen=None):
       
    71         """Create a MassiveObject store, with the following arguments:
    67         """Create a MassiveObject store, with the following arguments:
    72 
    68 
    73         - `cnx`, a connection to the repository
    69         - `cnx`, a connection to the repository
    74         - `metagen`, optional :class:`MetadataGenerator` instance
    70         - `metagen`, optional :class:`MetadataGenerator` instance
    75         - `eids_seq_range`: size of eid range reserved by the store for each batch
    71         - `eids_seq_range`: size of eid range reserved by the store for each batch
    76         """
    72         """
    77         super(MassiveObjectStore, self).__init__(cnx)
    73         super(MassiveObjectStore, self).__init__(cnx)
    78 
    74 
    79         self.uuid = text_type(uuid4()).replace('-', '')
    75         self.uuid = text_type(uuid4()).replace('-', '')
    80         self.on_commit_callback = on_commit_callback
       
    81         self.on_rollback_callback = on_rollback_callback
       
    82         self.slave_mode = slave_mode
    76         self.slave_mode = slave_mode
    83         self.eids_seq_range = eids_seq_range
    77         self.eids_seq_range = eids_seq_range
    84         if metagen is None:
    78         if metagen is None:
    85             metagen = stores.MetadataGenerator(cnx)
    79             metagen = stores.MetadataGenerator(cnx)
    86         self.metagen = metagen
    80         self.metagen = metagen
   189 
   183 
   190     def flush(self):
   184     def flush(self):
   191         """Flush the data"""
   185         """Flush the data"""
   192         self.flush_entities()
   186         self.flush_entities()
   193         self.flush_relations()
   187         self.flush_relations()
   194 
       
   195     def commit(self):
       
   196         """Commit the database transaction."""
       
   197         self.on_commit()
       
   198         super(MassiveObjectStore, self).commit()
       
   199 
   188 
   200     def finish(self):
   189     def finish(self):
   201         """Remove temporary tables and columns."""
   190         """Remove temporary tables and columns."""
   202         assert not self.slave_mode, 'finish method should only be called by the master store'
   191         assert not self.slave_mode, 'finish method should only be called by the master store'
   203         self.logger.info("Start cleaning")
   192         self.logger.info("Start cleaning")
   285                  'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
   274                  'WHERE retype = %(rtype)s AND uuid = %(uuid)s',
   286                  {'rtype': ertype, 'uuid': uuid})
   275                  {'rtype': ertype, 'uuid': uuid})
   287 
   276 
   288     # FLUSH #################################################################
   277     # FLUSH #################################################################
   289 
   278 
   290     def on_commit(self):
       
   291         if self.on_commit_callback:
       
   292             self.on_commit_callback()
       
   293 
       
   294     def on_rollback(self, exc, etype, data):
       
   295         if self.on_rollback_callback:
       
   296             self.on_rollback_callback(exc, etype, data)
       
   297             self._cnx.rollback()
       
   298         else:
       
   299             raise exc
       
   300 
       
   301     def flush_relations(self):
   279     def flush_relations(self):
   302         """Flush the relations data from in-memory structures to a temporary table."""
   280         """Flush the relations data from in-memory structures to a temporary table."""
   303         for rtype, data in self._data_relations.items():
   281         for rtype, data in self._data_relations.items():
   304             if not data:
   282             if not data:
   305                 # There is no data for these etype for this flush round.
   283                 # There is no data for these etype for this flush round.
   335             buf = pgstore._create_copyfrom_buffer(_data, attrs)
   313             buf = pgstore._create_copyfrom_buffer(_data, attrs)
   336             tablename = 'cw_%s' % etype.lower()
   314             tablename = 'cw_%s' % etype.lower()
   337             tmp_tablename = '%s_%s' % (tablename, self.uuid)
   315             tmp_tablename = '%s_%s' % (tablename, self.uuid)
   338             columns = ['cw_%s' % attr for attr in attrs]
   316             columns = ['cw_%s' % attr for attr in attrs]
   339             cursor = self._cnx.cnxset.cu
   317             cursor = self._cnx.cnxset.cu
   340             try:
   318             cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns)
   341                 cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns)
       
   342             except Exception as exc:
       
   343                 self.on_rollback(exc, etype, data)
       
   344             # Clear data cache
   319             # Clear data cache
   345             self._data_entities[etype] = []
   320             self._data_entities[etype] = []
   346 
   321 
   347 
   322 
   348 def get_default_values(schema):
   323 def get_default_values(schema):