61 |
61 |
62 Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself |
62 Full-text indexation is not handled, you'll have to reindex the proper entity types by yourself |
63 if desired. |
63 if desired. |
64 """ |
64 """ |
65 |
65 |
66 def __init__(self, cnx, |
66 def __init__(self, cnx, slave_mode=False, eids_seq_range=10000, metagen=None): |
67 on_commit_callback=None, on_rollback_callback=None, |
|
68 slave_mode=False, |
|
69 eids_seq_range=10000, |
|
70 metagen=None): |
|
71 """Create a MassiveObject store, with the following arguments: |
67 """Create a MassiveObject store, with the following arguments: |
72 |
68 |
73 - `cnx`, a connection to the repository |
69 - `cnx`, a connection to the repository |
74 - `metagen`, optional :class:`MetadataGenerator` instance |
70 - `metagen`, optional :class:`MetadataGenerator` instance |
75 - `eids_seq_range`: size of eid range reserved by the store for each batch |
71 - `eids_seq_range`: size of eid range reserved by the store for each batch |
76 """ |
72 """ |
77 super(MassiveObjectStore, self).__init__(cnx) |
73 super(MassiveObjectStore, self).__init__(cnx) |
78 |
74 |
79 self.uuid = text_type(uuid4()).replace('-', '') |
75 self.uuid = text_type(uuid4()).replace('-', '') |
80 self.on_commit_callback = on_commit_callback |
|
81 self.on_rollback_callback = on_rollback_callback |
|
82 self.slave_mode = slave_mode |
76 self.slave_mode = slave_mode |
83 self.eids_seq_range = eids_seq_range |
77 self.eids_seq_range = eids_seq_range |
84 if metagen is None: |
78 if metagen is None: |
85 metagen = stores.MetadataGenerator(cnx) |
79 metagen = stores.MetadataGenerator(cnx) |
86 self.metagen = metagen |
80 self.metagen = metagen |
189 |
183 |
190 def flush(self): |
184 def flush(self): |
191 """Flush the data""" |
185 """Flush the data""" |
192 self.flush_entities() |
186 self.flush_entities() |
193 self.flush_relations() |
187 self.flush_relations() |
194 |
|
195 def commit(self): |
|
196 """Commit the database transaction.""" |
|
197 self.on_commit() |
|
198 super(MassiveObjectStore, self).commit() |
|
199 |
188 |
200 def finish(self): |
189 def finish(self): |
201 """Remove temporary tables and columns.""" |
190 """Remove temporary tables and columns.""" |
202 assert not self.slave_mode, 'finish method should only be called by the master store' |
191 assert not self.slave_mode, 'finish method should only be called by the master store' |
203 self.logger.info("Start cleaning") |
192 self.logger.info("Start cleaning") |
285 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', |
274 'WHERE retype = %(rtype)s AND uuid = %(uuid)s', |
286 {'rtype': ertype, 'uuid': uuid}) |
275 {'rtype': ertype, 'uuid': uuid}) |
287 |
276 |
288 # FLUSH ################################################################# |
277 # FLUSH ################################################################# |
289 |
278 |
290 def on_commit(self): |
|
291 if self.on_commit_callback: |
|
292 self.on_commit_callback() |
|
293 |
|
294 def on_rollback(self, exc, etype, data): |
|
295 if self.on_rollback_callback: |
|
296 self.on_rollback_callback(exc, etype, data) |
|
297 self._cnx.rollback() |
|
298 else: |
|
299 raise exc |
|
300 |
|
301 def flush_relations(self): |
279 def flush_relations(self): |
302 """Flush the relations data from in-memory structures to a temporary table.""" |
280 """Flush the relations data from in-memory structures to a temporary table.""" |
303 for rtype, data in self._data_relations.items(): |
281 for rtype, data in self._data_relations.items(): |
304 if not data: |
282 if not data: |
305 # There is no data for these etype for this flush round. |
283 # There is no data for these etype for this flush round. |
335 buf = pgstore._create_copyfrom_buffer(_data, attrs) |
313 buf = pgstore._create_copyfrom_buffer(_data, attrs) |
336 tablename = 'cw_%s' % etype.lower() |
314 tablename = 'cw_%s' % etype.lower() |
337 tmp_tablename = '%s_%s' % (tablename, self.uuid) |
315 tmp_tablename = '%s_%s' % (tablename, self.uuid) |
338 columns = ['cw_%s' % attr for attr in attrs] |
316 columns = ['cw_%s' % attr for attr in attrs] |
339 cursor = self._cnx.cnxset.cu |
317 cursor = self._cnx.cnxset.cu |
340 try: |
318 cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns) |
341 cursor.copy_from(buf, tmp_tablename, null='NULL', columns=columns) |
|
342 except Exception as exc: |
|
343 self.on_rollback(exc, etype, data) |
|
344 # Clear data cache |
319 # Clear data cache |
345 self._data_entities[etype] = [] |
320 self._data_entities[etype] = [] |
346 |
321 |
347 |
322 |
348 def get_default_values(schema): |
323 def get_default_values(schema): |