297 |
320 |
298 def check_doubles_not_none(buckets): |
321 def check_doubles_not_none(buckets): |
299 """Extract the keys that have more than one item in their bucket.""" |
322 """Extract the keys that have more than one item in their bucket.""" |
300 return [(k, len(v)) for k, v in buckets.items() |
323 return [(k, len(v)) for k, v in buckets.items() |
301 if k is not None and len(v) > 1] |
324 if k is not None and len(v) > 1] |
|
325 |
|
326 |
|
327 # sql generator utility functions ############################################# |
|
328 |
|
329 |
|
330 def _import_statements(sql_connect, statements, nb_threads=3, |
|
331 dump_output_dir=None, |
|
332 support_copy_from=True, encoding='utf-8'): |
|
333 """ |
|
334 Import a bunch of sql statements, using different threads. |
|
335 """ |
|
336 try: |
|
337 chunksize = (len(statements) / nb_threads) + 1 |
|
338 threads = [] |
|
339 for i in xrange(nb_threads): |
|
340 chunks = statements[i*chunksize:(i+1)*chunksize] |
|
341 thread = threading.Thread(target=_execmany_thread, |
|
342 args=(sql_connect, chunks, |
|
343 dump_output_dir, |
|
344 support_copy_from, |
|
345 encoding)) |
|
346 thread.start() |
|
347 threads.append(thread) |
|
348 for t in threads: |
|
349 t.join() |
|
350 except Exception: |
|
351 print 'Error in import statements' |
|
352 |
|
353 def _execmany_thread_not_copy_from(cu, statement, data, table=None, |
|
354 columns=None, encoding='utf-8'): |
|
355 """ Execute thread without copy from |
|
356 """ |
|
357 cu.executemany(statement, data) |
|
358 |
|
359 def _execmany_thread_copy_from(cu, statement, data, table, |
|
360 columns, encoding='utf-8'): |
|
361 """ Execute thread with copy from |
|
362 """ |
|
363 buf = _create_copyfrom_buffer(data, columns, encoding) |
|
364 if buf is None: |
|
365 _execmany_thread_not_copy_from(cu, statement, data) |
|
366 else: |
|
367 if columns is None: |
|
368 cu.copy_from(buf, table, null='NULL') |
|
369 else: |
|
370 cu.copy_from(buf, table, null='NULL', columns=columns) |
|
371 |
|
372 def _execmany_thread(sql_connect, statements, dump_output_dir=None, |
|
373 support_copy_from=True, encoding='utf-8'): |
|
374 """ |
|
375 Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command, |
|
376 or fallback to execute_many. |
|
377 """ |
|
378 if support_copy_from: |
|
379 execmany_func = _execmany_thread_copy_from |
|
380 else: |
|
381 execmany_func = _execmany_thread_not_copy_from |
|
382 cnx = sql_connect() |
|
383 cu = cnx.cursor() |
|
384 try: |
|
385 for statement, data in statements: |
|
386 table = None |
|
387 columns = None |
|
388 try: |
|
389 if not statement.startswith('INSERT INTO'): |
|
390 cu.executemany(statement, data) |
|
391 continue |
|
392 table = statement.split()[2] |
|
393 if isinstance(data[0], (tuple, list)): |
|
394 columns = None |
|
395 else: |
|
396 columns = list(data[0]) |
|
397 execmany_func(cu, statement, data, table, columns, encoding) |
|
398 except Exception: |
|
399 print 'unable to copy data into table %s', table |
|
400 # Error in import statement, save data in dump_output_dir |
|
401 if dump_output_dir is not None: |
|
402 pdata = {'data': data, 'statement': statement, |
|
403 'time': asctime(), 'columns': columns} |
|
404 filename = make_uid() |
|
405 try: |
|
406 with open(osp.join(dump_output_dir, |
|
407 '%s.pickle' % filename), 'w') as fobj: |
|
408 fobj.write(cPickle.dumps(pdata)) |
|
409 except IOError: |
|
410 print 'ERROR while pickling in', dump_output_dir, filename+'.pickle' |
|
411 pass |
|
412 cnx.rollback() |
|
413 raise |
|
414 finally: |
|
415 cnx.commit() |
|
416 cu.close() |
|
417 |
|
418 def _create_copyfrom_buffer(data, columns, encoding='utf-8', replace_sep=None): |
|
419 """ |
|
420 Create a StringIO buffer for 'COPY FROM' command. |
|
421 Deals with Unicode, Int, Float, Date... |
|
422 """ |
|
423 # Create a list rather than directly create a StringIO |
|
424 # to correctly write lines separated by '\n' in a single step |
|
425 rows = [] |
|
426 if isinstance(data[0], (tuple, list)): |
|
427 columns = range(len(data[0])) |
|
428 for row in data: |
|
429 # Iterate over the different columns and the different values |
|
430 # and try to convert them to a correct datatype. |
|
431 # If an error is raised, do not continue. |
|
432 formatted_row = [] |
|
433 for col in columns: |
|
434 value = row[col] |
|
435 if value is None: |
|
436 value = 'NULL' |
|
437 elif isinstance(value, (long, int, float)): |
|
438 value = str(value) |
|
439 elif isinstance(value, (str, unicode)): |
|
440 # Remove separators used in string formatting |
|
441 for _char in (u'\t', u'\r', u'\n'): |
|
442 if _char in value: |
|
443 # If a replace_sep is given, replace |
|
444 # the separator instead of returning None |
|
445 # (and thus avoid empty buffer) |
|
446 if replace_sep: |
|
447 value = value.replace(_char, replace_sep) |
|
448 else: |
|
449 return |
|
450 value = value.replace('\\', r'\\') |
|
451 if value is None: |
|
452 return |
|
453 if isinstance(value, unicode): |
|
454 value = value.encode(encoding) |
|
455 elif isinstance(value, (date, datetime)): |
|
456 # Do not use strftime, as it yields issue |
|
457 # with date < 1900 |
|
458 value = '%04d-%02d-%02d' % (value.year, |
|
459 value.month, |
|
460 value.day) |
|
461 else: |
|
462 return None |
|
463 # We push the value to the new formatted row |
|
464 # if the value is not None and could be converted to a string. |
|
465 formatted_row.append(value) |
|
466 rows.append('\t'.join(formatted_row)) |
|
467 return StringIO('\n'.join(rows)) |
302 |
468 |
303 |
469 |
304 # object stores ################################################################# |
470 # object stores ################################################################# |
305 |
471 |
306 class ObjectStore(object): |
472 class ObjectStore(object): |
753 |
919 |
754 def gen_created_by(self, entity): |
920 def gen_created_by(self, entity): |
755 return self.session.user.eid |
921 return self.session.user.eid |
756 def gen_owned_by(self, entity): |
922 def gen_owned_by(self, entity): |
757 return self.session.user.eid |
923 return self.session.user.eid |
|
924 |
|
925 |
|
926 ########################################################################### |
|
927 ## SQL object store ####################################################### |
|
928 ########################################################################### |
|
929 class SQLGenObjectStore(NoHookRQLObjectStore): |
|
930 """Controller of the data import process. This version is based |
|
931 on direct insertions throught SQL command (COPY FROM or execute many). |
|
932 |
|
933 >>> store = SQLGenObjectStore(session) |
|
934 >>> store.create_entity('Person', ...) |
|
935 >>> store.flush() |
|
936 """ |
|
937 |
|
938 def __init__(self, session, dump_output_dir=None, nb_threads_statement=3): |
|
939 """ |
|
940 Initialize a SQLGenObjectStore. |
|
941 |
|
942 Parameters: |
|
943 |
|
944 - session: session on the cubicweb instance |
|
945 - dump_output_dir: a directory to dump failed statements |
|
946 for easier recovery. Default is None (no dump). |
|
947 - nb_threads_statement: number of threads used |
|
948 for SQL insertion (default is 3). |
|
949 """ |
|
950 super(SQLGenObjectStore, self).__init__(session) |
|
951 ### hijack default source |
|
952 self.source = SQLGenSourceWrapper( |
|
953 self.source, session.vreg.schema, |
|
954 dump_output_dir=dump_output_dir, |
|
955 nb_threads_statement=nb_threads_statement) |
|
956 ### XXX This is done in super().__init__(), but should be |
|
957 ### redone here to link to the correct source |
|
958 self.add_relation = self.source.add_relation |
|
959 self.indexes_etypes = {} |
|
960 |
|
961 def flush(self): |
|
962 """Flush data to the database""" |
|
963 self.source.flush() |
|
964 |
|
965 def relate(self, subj_eid, rtype, obj_eid, subjtype=None): |
|
966 if subj_eid is None or obj_eid is None: |
|
967 return |
|
968 # XXX Could subjtype be inferred ? |
|
969 self.source.add_relation(self.session, subj_eid, rtype, obj_eid, |
|
970 self.rschema(rtype).inlined, subjtype) |
|
971 |
|
972 def drop_indexes(self, etype): |
|
973 """Drop indexes for a given entity type""" |
|
974 if etype not in self.indexes_etypes: |
|
975 cu = self.session.cnxset['system'] |
|
976 def index_to_attr(index): |
|
977 """turn an index name to (database) attribute name""" |
|
978 return index.replace(etype.lower(), '').replace('idx', '').strip('_') |
|
979 indices = [(index, index_to_attr(index)) |
|
980 for index in self.source.dbhelper.list_indices(cu, etype) |
|
981 # Do not consider 'cw_etype_pkey' index |
|
982 if not index.endswith('key')] |
|
983 self.indexes_etypes[etype] = indices |
|
984 for index, attr in self.indexes_etypes[etype]: |
|
985 self.session.system_sql('DROP INDEX %s' % index) |
|
986 |
|
987 def create_indexes(self, etype): |
|
988 """Recreate indexes for a given entity type""" |
|
989 for index, attr in self.indexes_etypes.get(etype, []): |
|
990 sql = 'CREATE INDEX %s ON cw_%s(%s)' % (index, etype, attr) |
|
991 self.session.system_sql(sql) |
|
992 |
|
993 |
|
994 ########################################################################### |
|
995 ## SQL Source ############################################################# |
|
996 ########################################################################### |
|
997 |
|
998 class SQLGenSourceWrapper(object): |
|
999 |
|
1000 def __init__(self, system_source, schema, |
|
1001 dump_output_dir=None, nb_threads_statement=3): |
|
1002 self.system_source = system_source |
|
1003 self._sql = threading.local() |
|
1004 # Explicitely backport attributes from system source |
|
1005 self._storage_handler = self.system_source._storage_handler |
|
1006 self.preprocess_entity = self.system_source.preprocess_entity |
|
1007 self.sqlgen = self.system_source.sqlgen |
|
1008 self.copy_based_source = self.system_source.copy_based_source |
|
1009 self.uri = self.system_source.uri |
|
1010 self.eid = self.system_source.eid |
|
1011 # Directory to write temporary files |
|
1012 self.dump_output_dir = dump_output_dir |
|
1013 # Allow to execute code with SQLite backend that does |
|
1014 # not support (yet...) copy_from |
|
1015 # XXX Should be dealt with in logilab.database |
|
1016 spcfrom = system_source.dbhelper.dbapi_module.support_copy_from |
|
1017 self.support_copy_from = spcfrom |
|
1018 self.dbencoding = system_source.dbhelper.dbencoding |
|
1019 self.nb_threads_statement = nb_threads_statement |
|
1020 # initialize thread-local data for main thread |
|
1021 self.init_thread_locals() |
|
1022 self._inlined_rtypes_cache = {} |
|
1023 self._fill_inlined_rtypes_cache(schema) |
|
1024 self.schema = schema |
|
1025 self.do_fti = False |
|
1026 |
|
1027 def _fill_inlined_rtypes_cache(self, schema): |
|
1028 cache = self._inlined_rtypes_cache |
|
1029 for eschema in schema.entities(): |
|
1030 for rschema in eschema.ordered_relations(): |
|
1031 if rschema.inlined: |
|
1032 cache[eschema.type] = SQL_PREFIX + rschema.type |
|
1033 |
|
1034 def init_thread_locals(self): |
|
1035 """initializes thread-local data""" |
|
1036 self._sql.entities = defaultdict(list) |
|
1037 self._sql.relations = {} |
|
1038 self._sql.inlined_relations = {} |
|
1039 # keep track, for each eid of the corresponding data dict |
|
1040 self._sql.eid_insertdicts = {} |
|
1041 |
|
1042 def flush(self): |
|
1043 print 'starting flush' |
|
1044 _entities_sql = self._sql.entities |
|
1045 _relations_sql = self._sql.relations |
|
1046 _inlined_relations_sql = self._sql.inlined_relations |
|
1047 _insertdicts = self._sql.eid_insertdicts |
|
1048 try: |
|
1049 # try, for each inlined_relation, to find if we're also creating |
|
1050 # the host entity (i.e. the subject of the relation). |
|
1051 # In that case, simply update the insert dict and remove |
|
1052 # the need to make the |
|
1053 # UPDATE statement |
|
1054 for statement, datalist in _inlined_relations_sql.iteritems(): |
|
1055 new_datalist = [] |
|
1056 # for a given inlined relation, |
|
1057 # browse each couple to be inserted |
|
1058 for data in datalist: |
|
1059 keys = list(data) |
|
1060 # For inlined relations, it exists only two case: |
|
1061 # (rtype, cw_eid) or (cw_eid, rtype) |
|
1062 if keys[0] == 'cw_eid': |
|
1063 rtype = keys[1] |
|
1064 else: |
|
1065 rtype = keys[0] |
|
1066 updated_eid = data['cw_eid'] |
|
1067 if updated_eid in _insertdicts: |
|
1068 _insertdicts[updated_eid][rtype] = data[rtype] |
|
1069 else: |
|
1070 # could not find corresponding insert dict, keep the |
|
1071 # UPDATE query |
|
1072 new_datalist.append(data) |
|
1073 _inlined_relations_sql[statement] = new_datalist |
|
1074 _import_statements(self.system_source.get_connection, |
|
1075 _entities_sql.items() |
|
1076 + _relations_sql.items() |
|
1077 + _inlined_relations_sql.items(), |
|
1078 dump_output_dir=self.dump_output_dir, |
|
1079 nb_threads=self.nb_threads_statement, |
|
1080 support_copy_from=self.support_copy_from, |
|
1081 encoding=self.dbencoding) |
|
1082 except: |
|
1083 print 'failed to flush' |
|
1084 finally: |
|
1085 _entities_sql.clear() |
|
1086 _relations_sql.clear() |
|
1087 _insertdicts.clear() |
|
1088 _inlined_relations_sql.clear() |
|
1089 print 'flush done' |
|
1090 |
|
1091 def add_relation(self, session, subject, rtype, object, |
|
1092 inlined=False, subjtype=None): |
|
1093 if inlined: |
|
1094 _sql = self._sql.inlined_relations |
|
1095 data = {'cw_eid': subject, SQL_PREFIX + rtype: object} |
|
1096 if subjtype is None: |
|
1097 # Try to infer it |
|
1098 targets = [t.type for t in |
|
1099 self.schema.rschema(rtype).targets()] |
|
1100 if len(targets) == 1: |
|
1101 subjtype = targets[0] |
|
1102 else: |
|
1103 raise ValueError('You should give the subject etype for ' |
|
1104 'inlined relation %s' |
|
1105 ', as it cannot be inferred' % rtype) |
|
1106 statement = self.sqlgen.update(SQL_PREFIX + subjtype, |
|
1107 data, ['cw_eid']) |
|
1108 else: |
|
1109 _sql = self._sql.relations |
|
1110 data = {'eid_from': subject, 'eid_to': object} |
|
1111 statement = self.sqlgen.insert('%s_relation' % rtype, data) |
|
1112 if statement in _sql: |
|
1113 _sql[statement].append(data) |
|
1114 else: |
|
1115 _sql[statement] = [data] |
|
1116 |
|
1117 def add_entity(self, session, entity): |
|
1118 with self._storage_handler(entity, 'added'): |
|
1119 attrs = self.preprocess_entity(entity) |
|
1120 rtypes = self._inlined_rtypes_cache.get(entity.__regid__, ()) |
|
1121 if isinstance(rtypes, str): |
|
1122 rtypes = (rtypes,) |
|
1123 for rtype in rtypes: |
|
1124 if rtype not in attrs: |
|
1125 attrs[rtype] = None |
|
1126 sql = self.sqlgen.insert(SQL_PREFIX + entity.__regid__, attrs) |
|
1127 self._sql.eid_insertdicts[entity.eid] = attrs |
|
1128 self._append_to_entities(sql, attrs) |
|
1129 |
|
1130 def _append_to_entities(self, sql, attrs): |
|
1131 self._sql.entities[sql].append(attrs) |
|
1132 |
|
1133 def _handle_insert_entity_sql(self, session, sql, attrs): |
|
1134 # We have to overwrite the source given in parameters |
|
1135 # as here, we directly use the system source |
|
1136 attrs['source'] = 'system' |
|
1137 attrs['asource'] = self.system_source.uri |
|
1138 self._append_to_entities(sql, attrs) |
|
1139 |
|
1140 def _handle_is_relation_sql(self, session, sql, attrs): |
|
1141 self._append_to_entities(sql, attrs) |
|
1142 |
|
1143 def _handle_is_instance_of_sql(self, session, sql, attrs): |
|
1144 self._append_to_entities(sql, attrs) |
|
1145 |
|
1146 def _handle_source_relation_sql(self, session, sql, attrs): |
|
1147 self._append_to_entities(sql, attrs) |
|
1148 |
|
1149 # XXX add_info is similar to the one in NativeSQLSource. It is rewritten |
|
1150 # here to correctly used the _handle_xxx of the SQLGenSourceWrapper. This |
|
1151 # part should be rewritten in a more clearly way. |
|
1152 def add_info(self, session, entity, source, extid, complete): |
|
1153 """add type and source info for an eid into the system table""" |
|
1154 # begin by inserting eid/type/source/extid into the entities table |
|
1155 if extid is not None: |
|
1156 assert isinstance(extid, str) |
|
1157 extid = b64encode(extid) |
|
1158 uri = 'system' if source.copy_based_source else source.uri |
|
1159 attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid, |
|
1160 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} |
|
1161 self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) |
|
1162 # insert core relations: is, is_instance_of and cw_source |
|
1163 try: |
|
1164 self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', |
|
1165 (entity.eid, eschema_eid(session, entity.e_schema))) |
|
1166 except IndexError: |
|
1167 # during schema serialization, skip |
|
1168 pass |
|
1169 else: |
|
1170 for eschema in entity.e_schema.ancestors() + [entity.e_schema]: |
|
1171 self._handle_is_relation_sql(session, |
|
1172 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', |
|
1173 (entity.eid, eschema_eid(session, eschema))) |
|
1174 if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 |
|
1175 self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', |
|
1176 (entity.eid, source.eid)) |
|
1177 # now we can update the full text index |
|
1178 if self.do_fti and self.need_fti_indexation(entity.__regid__): |
|
1179 if complete: |
|
1180 entity.complete(entity.e_schema.indexable_attributes()) |
|
1181 self.index_entity(session, entity=entity) |