dataimport/massive_store.py
changeset 10876 11a9b2fb83d0
parent 10875 75d1b2d66f18
child 10877 a6a9a8fc94c3
equal deleted inserted replaced
10875:75d1b2d66f18 10876:11a9b2fb83d0
    87     eids_seq_start = None
    87     eids_seq_start = None
    88     # max size of the iid, used to create the iid_eid conversion table
    88     # max size of the iid, used to create the iid_eid conversion table
    89     iid_maxsize = 1024
    89     iid_maxsize = 1024
    90 
    90 
    91     def __init__(self, cnx,
    91     def __init__(self, cnx,
    92                  commit_at_flush=True,
       
    93                  on_commit_callback=None, on_rollback_callback=None,
    92                  on_commit_callback=None, on_rollback_callback=None,
    94                  slave_mode=False,
    93                  slave_mode=False,
    95                  source=None):
    94                  source=None):
    96         """ Create a MassiveObject store, with the following attributes:
    95         """ Create a MassiveObject store, with the following attributes:
    97 
    96 
    98         - cnx: CubicWeb cnx
    97         - cnx: CubicWeb cnx
    99         - commit_at_flush: Boolean. Commit after each flush().
       
   100         """
    98         """
   101         super(MassiveObjectStore, self).__init__(cnx)
    99         super(MassiveObjectStore, self).__init__(cnx)
   102         self.logger = logging.getLogger('dataio.relationmixin')
   100         self.logger = logging.getLogger('dataio.relationmixin')
   103         self._cnx = cnx
   101         self._cnx = cnx
   104         self.sql = cnx.system_sql
   102         self.sql = cnx.system_sql
   105         self.commit_at_flush = commit_at_flush
       
   106         self._data_uri_relations = defaultdict(list)
   103         self._data_uri_relations = defaultdict(list)
   107         self._initialized = {'init_uri_eid': set(),
   104         self._initialized = {'init_uri_eid': set(),
   108                              'uri_eid_inserted': set(),
   105                              'uri_eid_inserted': set(),
   109                              'uri_rtypes': set(),
   106                              'uri_rtypes': set(),
   110                              'entities': set(),
   107                              'entities': set(),
   120         self._data_entities = defaultdict(list)
   117         self._data_entities = defaultdict(list)
   121         self._data_relations = defaultdict(list)
   118         self._data_relations = defaultdict(list)
   122         self._now = datetime.now()
   119         self._now = datetime.now()
   123         self._default_cwuri = make_uid('_auto_generated')
   120         self._default_cwuri = make_uid('_auto_generated')
   124         self._count_cwuri = 0
   121         self._count_cwuri = 0
   125         self.commit_at_flush = commit_at_flush
       
   126         self.on_commit_callback = on_commit_callback
   122         self.on_commit_callback = on_commit_callback
   127         self.on_rollback_callback = on_rollback_callback
   123         self.on_rollback_callback = on_rollback_callback
   128         # Initialized the meta tables of dataio for warm restart
   124         # Initialized the meta tables of dataio for warm restart
   129         self._init_dataio_metatables()
   125         self._init_dataio_metatables()
   130         # Internal markers of initialization
   126         # Internal markers of initialization
   219             else:
   215             else:
   220                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   216                 self.logger.warning("inlined relation %s: cannot insert it", rtype)
   221             buf.close()
   217             buf.close()
   222             # Clear data cache
   218             # Clear data cache
   223             self._data_uri_relations[rtype] = []
   219             self._data_uri_relations[rtype] = []
   224             # Commit if asked
       
   225             if self.commit_at_flush:
       
   226                 self.commit()
       
   227 
   220 
   228     def fill_uri_eid_table(self, etype, uri_label):
   221     def fill_uri_eid_table(self, etype, uri_label):
   229         """ Fill the uri_eid table
   222         """ Fill the uri_eid table
   230         """
   223         """
   231         self.logger.info('Fill uri_eid for etype %s', etype)
   224         self.logger.info('Fill uri_eid for etype %s', etype)
   517             # Push into the tmp table
   510             # Push into the tmp table
   518             cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
   511             cursor.copy_from(buf, '%s_relation_tmp' % rtype.lower(),
   519                              null='NULL', columns=('eid_from', 'eid_to'))
   512                              null='NULL', columns=('eid_from', 'eid_to'))
   520             # Clear data cache
   513             # Clear data cache
   521             self._data_relations[rtype] = []
   514             self._data_relations[rtype] = []
   522             # Commit if asked
       
   523             if self.commit_at_flush:
       
   524                 self.commit()
       
   525 
   515 
   526     def flush_entities(self):
   516     def flush_entities(self):
   527         """ Flush the entities data
   517         """ Flush the entities data
   528         """
   518         """
   529         for etype, data in self._data_entities.items():
   519         for etype, data in self._data_entities.items():
   556             except Exception as exc:
   546             except Exception as exc:
   557                 self.on_rollback(exc, etype, data)
   547                 self.on_rollback(exc, etype, data)
   558             # Clear data cache
   548             # Clear data cache
   559             self._data_entities[etype] = []
   549             self._data_entities[etype] = []
   560         self.flush_meta_data()
   550         self.flush_meta_data()
   561         # Commit if asked
       
   562         if self.commit_at_flush:
       
   563             self.commit()
       
   564 
   551 
   565     def flush_meta_data(self):
   552     def flush_meta_data(self):
   566         """ Flush the meta data (entities table, is_instance table, ...)
   553         """ Flush the meta data (entities table, is_instance table, ...)
   567         """
   554         """
   568         if self.slave_mode:
   555         if self.slave_mode: