16 # You should have received a copy of the GNU Lesser General Public License along |
16 # You should have received a copy of the GNU Lesser General Public License along |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 """Postgres specific store""" |
18 """Postgres specific store""" |
19 from __future__ import print_function |
19 from __future__ import print_function |
20 |
20 |
21 import threading |
|
22 import warnings |
21 import warnings |
23 import os.path as osp |
22 import os.path as osp |
24 from io import StringIO |
23 from io import StringIO |
25 from time import asctime |
24 from time import asctime |
26 from datetime import date, datetime, time |
25 from datetime import date, datetime, time |
33 from cubicweb.utils import make_uid |
32 from cubicweb.utils import make_uid |
34 from cubicweb.server.utils import eschema_eid |
33 from cubicweb.server.utils import eschema_eid |
35 from cubicweb.server.sqlutils import SQL_PREFIX |
34 from cubicweb.server.sqlutils import SQL_PREFIX |
36 from cubicweb.dataimport.stores import NoHookRQLObjectStore |
35 from cubicweb.dataimport.stores import NoHookRQLObjectStore |
37 |
36 |
38 def _import_statements(sql_connect, statements, nb_threads=3, |
|
39 dump_output_dir=None, |
|
40 support_copy_from=True, encoding='utf-8'): |
|
41 """ |
|
42 Import a bunch of sql statements, using different threads. |
|
43 """ |
|
44 try: |
|
45 chunksize = (len(statements) / nb_threads) + 1 |
|
46 threads = [] |
|
47 for i in range(nb_threads): |
|
48 chunks = statements[i*chunksize:(i+1)*chunksize] |
|
49 thread = threading.Thread(target=_execmany_thread, |
|
50 args=(sql_connect, chunks, |
|
51 dump_output_dir, |
|
52 support_copy_from, |
|
53 encoding)) |
|
54 thread.start() |
|
55 threads.append(thread) |
|
56 for t in threads: |
|
57 t.join() |
|
58 except Exception: |
|
59 print('Error in import statements') |
|
60 |
37 |
61 def _execmany_thread_not_copy_from(cu, statement, data, table=None, |
38 def _execmany_thread_not_copy_from(cu, statement, data, table=None, |
62 columns=None, encoding='utf-8'): |
39 columns=None, encoding='utf-8'): |
63 """ Execute thread without copy from |
40 """ Execute thread without copy from |
64 """ |
41 """ |
225 >>> store = SQLGenObjectStore(cnx) |
202 >>> store = SQLGenObjectStore(cnx) |
226 >>> store.create_entity('Person', ...) |
203 >>> store.create_entity('Person', ...) |
227 >>> store.flush() |
204 >>> store.flush() |
228 """ |
205 """ |
229 |
206 |
230 def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=3): |
207 def __init__(self, cnx, dump_output_dir=None, nb_threads_statement=1): |
231 """ |
208 """ |
232 Initialize a SQLGenObjectStore. |
209 Initialize a SQLGenObjectStore. |
233 |
210 |
234 Parameters: |
211 Parameters: |
235 |
212 |
236 - cnx: connection on the cubicweb instance |
213 - cnx: connection on the cubicweb instance |
237 - dump_output_dir: a directory to dump failed statements |
214 - dump_output_dir: a directory to dump failed statements |
238 for easier recovery. Default is None (no dump). |
215 for easier recovery. Default is None (no dump). |
239 - nb_threads_statement: number of threads used |
|
240 for SQL insertion (default is 3). |
|
241 """ |
216 """ |
242 super(SQLGenObjectStore, self).__init__(cnx) |
217 super(SQLGenObjectStore, self).__init__(cnx) |
243 ### hijack default source |
218 ### hijack default source |
244 self.source = SQLGenSourceWrapper( |
219 self.source = SQLGenSourceWrapper( |
245 self.source, cnx.vreg.schema, |
220 self.source, cnx.vreg.schema, |
246 dump_output_dir=dump_output_dir, |
221 dump_output_dir=dump_output_dir) |
247 nb_threads_statement=nb_threads_statement) |
|
248 ### XXX This is done in super().__init__(), but should be |
222 ### XXX This is done in super().__init__(), but should be |
249 ### redone here to link to the correct source |
223 ### redone here to link to the correct source |
250 self.add_relation = self.source.add_relation |
224 self.add_relation = self.source.add_relation |
251 self.indexes_etypes = {} |
225 self.indexes_etypes = {} |
|
226 if nb_threads_statement != 1: |
|
227 warn('[3.21] SQLGenObjectStore is no longer threaded', DeprecationWarning) |
252 |
228 |
253 def flush(self): |
229 def flush(self): |
254 """Flush data to the database""" |
230 """Flush data to the database""" |
255 self.source.flush() |
231 self.source.flush() |
256 |
232 |
291 ########################################################################### |
267 ########################################################################### |
292 |
268 |
293 class SQLGenSourceWrapper(object): |
269 class SQLGenSourceWrapper(object): |
294 |
270 |
295 def __init__(self, system_source, schema, |
271 def __init__(self, system_source, schema, |
296 dump_output_dir=None, nb_threads_statement=3): |
272 dump_output_dir=None): |
297 self.system_source = system_source |
273 self.system_source = system_source |
298 self._sql = threading.local() |
|
299 # Explicitely backport attributes from system source |
274 # Explicitely backport attributes from system source |
300 self._storage_handler = self.system_source._storage_handler |
275 self._storage_handler = self.system_source._storage_handler |
301 self.preprocess_entity = self.system_source.preprocess_entity |
276 self.preprocess_entity = self.system_source.preprocess_entity |
302 self.sqlgen = self.system_source.sqlgen |
277 self.sqlgen = self.system_source.sqlgen |
303 self.uri = self.system_source.uri |
278 self.uri = self.system_source.uri |
308 # not support (yet...) copy_from |
283 # not support (yet...) copy_from |
309 # XXX Should be dealt with in logilab.database |
284 # XXX Should be dealt with in logilab.database |
310 spcfrom = system_source.dbhelper.dbapi_module.support_copy_from |
285 spcfrom = system_source.dbhelper.dbapi_module.support_copy_from |
311 self.support_copy_from = spcfrom |
286 self.support_copy_from = spcfrom |
312 self.dbencoding = system_source.dbhelper.dbencoding |
287 self.dbencoding = system_source.dbhelper.dbencoding |
313 self.nb_threads_statement = nb_threads_statement |
288 self.init_statement_lists() |
314 # initialize thread-local data for main thread |
|
315 self.init_thread_locals() |
|
316 self._inlined_rtypes_cache = {} |
289 self._inlined_rtypes_cache = {} |
317 self._fill_inlined_rtypes_cache(schema) |
290 self._fill_inlined_rtypes_cache(schema) |
318 self.schema = schema |
291 self.schema = schema |
319 self.do_fti = False |
292 self.do_fti = False |
320 |
293 |
323 for eschema in schema.entities(): |
296 for eschema in schema.entities(): |
324 for rschema in eschema.ordered_relations(): |
297 for rschema in eschema.ordered_relations(): |
325 if rschema.inlined: |
298 if rschema.inlined: |
326 cache[eschema.type] = SQL_PREFIX + rschema.type |
299 cache[eschema.type] = SQL_PREFIX + rschema.type |
327 |
300 |
328 def init_thread_locals(self): |
301 def init_statement_lists(self): |
329 """initializes thread-local data""" |
302 self._sql_entities = defaultdict(list) |
330 self._sql.entities = defaultdict(list) |
303 self._sql_relations = {} |
331 self._sql.relations = {} |
304 self._sql_inlined_relations = {} |
332 self._sql.inlined_relations = {} |
305 self._sql_eids = defaultdict(list) |
333 # keep track, for each eid of the corresponding data dict |
306 # keep track, for each eid of the corresponding data dict |
334 self._sql.eid_insertdicts = {} |
307 self._sql_eid_insertdicts = {} |
335 |
308 |
336 def flush(self): |
309 def flush(self): |
337 print('starting flush') |
310 print('starting flush') |
338 _entities_sql = self._sql.entities |
311 _entities_sql = self._sql_entities |
339 _relations_sql = self._sql.relations |
312 _relations_sql = self._sql_relations |
340 _inlined_relations_sql = self._sql.inlined_relations |
313 _inlined_relations_sql = self._sql_inlined_relations |
341 _insertdicts = self._sql.eid_insertdicts |
314 _insertdicts = self._sql_eid_insertdicts |
342 try: |
315 try: |
343 # try, for each inlined_relation, to find if we're also creating |
316 # try, for each inlined_relation, to find if we're also creating |
344 # the host entity (i.e. the subject of the relation). |
317 # the host entity (i.e. the subject of the relation). |
345 # In that case, simply update the insert dict and remove |
318 # In that case, simply update the insert dict and remove |
346 # the need to make the |
319 # the need to make the |
363 else: |
336 else: |
364 # could not find corresponding insert dict, keep the |
337 # could not find corresponding insert dict, keep the |
365 # UPDATE query |
338 # UPDATE query |
366 new_datalist.append(data) |
339 new_datalist.append(data) |
367 _inlined_relations_sql[statement] = new_datalist |
340 _inlined_relations_sql[statement] = new_datalist |
368 _import_statements(self.system_source.get_connection, |
341 _execmany_thread(self.system_source.get_connection, |
369 _entities_sql.items() |
342 self._sql_eids.items() |
370 + _relations_sql.items() |
343 + _entities_sql.items() |
371 + _inlined_relations_sql.items(), |
344 + _relations_sql.items() |
372 dump_output_dir=self.dump_output_dir, |
345 + _inlined_relations_sql.items(), |
373 nb_threads=self.nb_threads_statement, |
346 dump_output_dir=self.dump_output_dir, |
374 support_copy_from=self.support_copy_from, |
347 support_copy_from=self.support_copy_from, |
375 encoding=self.dbencoding) |
348 encoding=self.dbencoding) |
376 finally: |
349 finally: |
377 _entities_sql.clear() |
350 _entities_sql.clear() |
378 _relations_sql.clear() |
351 _relations_sql.clear() |
379 _insertdicts.clear() |
352 _insertdicts.clear() |
380 _inlined_relations_sql.clear() |
353 _inlined_relations_sql.clear() |
381 |
354 |
382 def add_relation(self, cnx, subject, rtype, object, |
355 def add_relation(self, cnx, subject, rtype, object, |
383 inlined=False, **kwargs): |
356 inlined=False, **kwargs): |
384 if inlined: |
357 if inlined: |
385 _sql = self._sql.inlined_relations |
358 _sql = self._sql_inlined_relations |
386 data = {'cw_eid': subject, SQL_PREFIX + rtype: object} |
359 data = {'cw_eid': subject, SQL_PREFIX + rtype: object} |
387 subjtype = kwargs.get('subjtype') |
360 subjtype = kwargs.get('subjtype') |
388 if subjtype is None: |
361 if subjtype is None: |
389 # Try to infer it |
362 # Try to infer it |
390 targets = [t.type for t in |
363 targets = [t.type for t in |
398 'this type is given as keyword argument ' |
371 'this type is given as keyword argument ' |
399 '``subjtype``'% rtype) |
372 '``subjtype``'% rtype) |
400 statement = self.sqlgen.update(SQL_PREFIX + subjtype, |
373 statement = self.sqlgen.update(SQL_PREFIX + subjtype, |
401 data, ['cw_eid']) |
374 data, ['cw_eid']) |
402 else: |
375 else: |
403 _sql = self._sql.relations |
376 _sql = self._sql_relations |
404 data = {'eid_from': subject, 'eid_to': object} |
377 data = {'eid_from': subject, 'eid_to': object} |
405 statement = self.sqlgen.insert('%s_relation' % rtype, data) |
378 statement = self.sqlgen.insert('%s_relation' % rtype, data) |
406 if statement in _sql: |
379 if statement in _sql: |
407 _sql[statement].append(data) |
380 _sql[statement].append(data) |
408 else: |
381 else: |
416 rtypes = (rtypes,) |
389 rtypes = (rtypes,) |
417 for rtype in rtypes: |
390 for rtype in rtypes: |
418 if rtype not in attrs: |
391 if rtype not in attrs: |
419 attrs[rtype] = None |
392 attrs[rtype] = None |
420 sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) |
393 sql = self.sqlgen.insert(SQL_PREFIX + entity.cw_etype, attrs) |
421 self._sql.eid_insertdicts[entity.eid] = attrs |
394 self._sql_eid_insertdicts[entity.eid] = attrs |
422 self._append_to_entities(sql, attrs) |
395 self._append_to_entities(sql, attrs) |
423 |
396 |
424 def _append_to_entities(self, sql, attrs): |
397 def _append_to_entities(self, sql, attrs): |
425 self._sql.entities[sql].append(attrs) |
398 self._sql_entities[sql].append(attrs) |
426 |
399 |
427 def _handle_insert_entity_sql(self, cnx, sql, attrs): |
400 def _handle_insert_entity_sql(self, cnx, sql, attrs): |
428 # We have to overwrite the source given in parameters |
401 # We have to overwrite the source given in parameters |
429 # as here, we directly use the system source |
402 # as here, we directly use the system source |
430 attrs['asource'] = self.system_source.uri |
403 attrs['asource'] = self.system_source.uri |
431 self._append_to_entities(sql, attrs) |
404 self._sql_eids[sql].append(attrs) |
432 |
405 |
433 def _handle_is_relation_sql(self, cnx, sql, attrs): |
406 def _handle_is_relation_sql(self, cnx, sql, attrs): |
434 self._append_to_entities(sql, attrs) |
407 self._append_to_entities(sql, attrs) |
435 |
408 |
436 def _handle_is_instance_of_sql(self, cnx, sql, attrs): |
409 def _handle_is_instance_of_sql(self, cnx, sql, attrs): |