# HG changeset patch # User Vincent Michel # Date 1355490494 -3600 # Node ID 7ee0752178e5548c09792ba0573512ec01072631 # Parent 7e415f4571550178d6795901458181d76600a62d [dataimport] Add SQL Store for faster import - works ONLY with Postgres for now, as it requires "copy from" command - closes #2410822 This store will use: - copy from for massive insertions. - execute from for update. The API of this store is similar to the other stores. diff -r 7e415f457155 -r 7ee0752178e5 dataimport.py --- a/dataimport.py Mon Nov 26 12:52:33 2012 +0100 +++ b/dataimport.py Fri Dec 14 14:08:14 2012 +0100 @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -66,13 +66,18 @@ """ __docformat__ = "restructuredtext en" +import csv import sys -import csv +import threading import traceback +import cPickle import os.path as osp -from StringIO import StringIO +from collections import defaultdict +from contextlib import contextmanager from copy import copy -from datetime import datetime +from datetime import date, datetime +from time import asctime +from StringIO import StringIO from logilab.common import shellutils, attrdict from logilab.common.date import strptime @@ -80,9 +85,11 @@ from logilab.common.deprecation import deprecated from cubicweb import QueryError +from cubicweb.utils import make_uid from cubicweb.schema import META_RTYPES, VIRTUAL_RTYPES +from cubicweb.server.edition import EditedEntity +from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.server.utils import eschema_eid -from cubicweb.server.edition import EditedEntity def count_lines(stream_or_filename): @@ -299,6 +306,142 @@ if k is not None and len(v) > 1] +# sql generator utility functions ############################################# + + +def _import_statements(sql_connect, statements, nb_threads=3, + dump_output_dir=None, + support_copy_from=True, encoding='utf-8'): + """ + Import a bunch of sql statements, using different threads. + """ + try: + chunksize = (len(statements) / nb_threads) + 1 + threads = [] + for i in xrange(nb_threads): + chunks = statements[i*chunksize:(i+1)*chunksize] + thread = threading.Thread(target=_execmany_thread, + args=(sql_connect, chunks, + dump_output_dir, + support_copy_from, + encoding)) + thread.start() + threads.append(thread) + for t in threads: + t.join() + except Exception: + print 'Error in import statements' + +def _execmany_thread_not_copy_from(cu, statement, data, table=None, + columns=None, encoding='utf-8'): + """ Execute thread without copy from + """ + cu.executemany(statement, data) + +def _execmany_thread_copy_from(cu, statement, data, table, + columns, encoding='utf-8'): + """ Execute thread with copy from + """ + buf = _create_copyfrom_buffer(data, columns, encoding) + if buf is None: + _execmany_thread_not_copy_from(cu, statement, data) + else: + if columns is None: + cu.copy_from(buf, table, null='NULL') + else: + cu.copy_from(buf, table, null='NULL', columns=columns) + +def _execmany_thread(sql_connect, statements, dump_output_dir=None, + support_copy_from=True, encoding='utf-8'): + """ + Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command, + or fallback to execute_many. + """ + if support_copy_from: + execmany_func = _execmany_thread_copy_from + else: + execmany_func = _execmany_thread_not_copy_from + cnx = sql_connect() + cu = cnx.cursor() + try: + for statement, data in statements: + table = None + columns = None + try: + if not statement.startswith('INSERT INTO'): + cu.executemany(statement, data) + continue + table = statement.split()[2] + if isinstance(data[0], (tuple, list)): + columns = None + else: + columns = data[0].keys() + execmany_func(cu, statement, data, table, columns, encoding) + except Exception: + print 'unable to copy data into table %s', table + # Error in import statement, save data in dump_output_dir + if dump_output_dir is not None: + pdata = {'data': data, 'statement': statement, + 'time': asctime(), 'columns': columns} + filename = make_uid() + try: + with open(osp.join(dump_output_dir, + '%s.pickle' % filename), 'w') as fobj: + fobj.write(cPickle.dumps(pdata)) + except IOError: + print 'ERROR while pickling in', dump_output_dir, filename+'.pickle' + pass + cnx.rollback() + raise + finally: + cnx.commit() + cu.close() + +def _create_copyfrom_buffer(data, columns, encoding='utf-8'): + """ + Create a StringIO buffer for 'COPY FROM' command. + Deals with Unicode, Int, Float, Date... + """ + # Create a list rather than directly create a StringIO + # to correctly write lines separated by '\n' in a single step + rows = [] + if isinstance(data[0], (tuple, list)): + columns = range(len(data[0])) + for row in data: + # Iterate over the different columns and the different values + # and try to convert them to a correct datatype. + # If an error is raised, do not continue. + formatted_row = [] + for col in columns: + value = row[col] + if value is None: + value = 'NULL' + elif isinstance(value, (long, int, float)): + value = str(value) + elif isinstance(value, (str, unicode)): + # Remove separators used in string formatting + if u'\t' in value or u'\r' in value or u'\n' in value: + return + value = value.replace('\\', r'\\') + if not value: + return + if isinstance(value, unicode): + value = value.encode(encoding) + elif isinstance(value, (date, datetime)): + # Do not use strftime, as it yields issue + # with date < 1900 + value = '%04d-%02d-%02d' % (value.year, + value.month, + value.day) + else: + return None + # We push the value to the new formatted row + # if the value is not None and could be converted to a string. + formatted_row.append(value) + rows.append('\t'.join(formatted_row)) + return StringIO('\n'.join(rows)) + + # object stores ################################################################# class ObjectStore(object): @@ -753,3 +896,261 @@ return self.session.user.eid def gen_owned_by(self, entity): return self.session.user.eid + + +########################################################################### +## SQL object store ####################################################### +########################################################################### +class SQLGenObjectStore(NoHookRQLObjectStore): + """Controller of the data import process. This version is based + on direct insertions throught SQL command (COPY FROM or execute many). + + >>> store = SQLGenObjectStore(session) + >>> store.create_entity('Person', ...) + >>> store.flush() + """ + + def __init__(self, session, dump_output_dir=None, nb_threads_statement=3): + """ + Initialize a SQLGenObjectStore. + + Parameters: + + - session: session on the cubicweb instance + - dump_output_dir: a directory to dump failed statements + for easier recovery. Default is None (no dump). + - nb_threads_statement: number of threads used + for SQL insertion (default is 3). + """ + super(SQLGenObjectStore, self).__init__(session) + ### hijack default source + self.source = SQLGenSourceWrapper( + self.source, session.vreg.schema, + dump_output_dir=dump_output_dir, + nb_threads_statement=nb_threads_statement) + ### XXX This is done in super().__init__(), but should be + ### redone here to link to the correct source + self.add_relation = self.source.add_relation + self.indexes_etypes = {} + + def flush(self): + """Flush data to the database""" + self.source.flush() + + def relate(self, subj_eid, rtype, obj_eid, subjtype=None): + if subj_eid is None or obj_eid is None: + return + # XXX Could subjtype be inferred ? + self.source.add_relation(self.session, subj_eid, rtype, obj_eid, + self.rschema(rtype).inlined, subjtype) + + def drop_indexes(self, etype): + """Drop indexes for a given entity type""" + if etype not in self.indexes_etypes: + cu = self.session.cnxset['system'] + def index_to_attr(index): + """turn an index name to (database) attribute name""" + return index.replace(etype.lower(), '').replace('idx', '').strip('_') + indices = [(index, index_to_attr(index)) + for index in self.source.dbhelper.list_indices(cu, etype) + # Do not consider 'cw_etype_pkey' index + if not index.endswith('key')] + self.indexes_etypes[etype] = indices + for index, attr in self.indexes_etypes[etype]: + self.session.system_sql('DROP INDEX %s' % index) + + def create_indexes(self, etype): + """Recreate indexes for a given entity type""" + for index, attr in self.indexes_etypes.get(etype, []): + sql = 'CREATE INDEX %s ON cw_%s(%s)' % (index, etype, attr) + self.session.system_sql(sql) + + +########################################################################### +## SQL Source ############################################################# +########################################################################### + +class SQLGenSourceWrapper(object): + + def __init__(self, system_source, schema, + dump_output_dir=None, nb_threads_statement=3): + self.system_source = system_source + self._sql = threading.local() + # Explicitely backport attributes from system source + self._storage_handler = self.system_source._storage_handler + self.preprocess_entity = self.system_source.preprocess_entity + self.sqlgen = self.system_source.sqlgen + self.copy_based_source = self.system_source.copy_based_source + self.uri = self.system_source.uri + self.eid = self.system_source.eid + # Directory to write temporary files + self.dump_output_dir = dump_output_dir + # Allow to execute code with SQLite backend that does + # not support (yet...) copy_from + # XXX Should be dealt with in logilab.database + spcfrom = system_source.dbhelper.dbapi_module.support_copy_from + self.support_copy_from = spcfrom + self.dbencoding = system_source.dbhelper.dbencoding + self.nb_threads_statement = nb_threads_statement + # initialize thread-local data for main thread + self.init_thread_locals() + self._inlined_rtypes_cache = {} + self._fill_inlined_rtypes_cache(schema) + self.schema = schema + self.do_fti = False + + def _fill_inlined_rtypes_cache(self, schema): + cache = self._inlined_rtypes_cache + for eschema in schema.entities(): + for rschema in eschema.ordered_relations(): + if rschema.inlined: + cache[eschema.type] = SQL_PREFIX + rschema.type + + def init_thread_locals(self): + """initializes thread-local data""" + self._sql.entities = defaultdict(list) + self._sql.relations = {} + self._sql.inlined_relations = {} + # keep track, for each eid of the corresponding data dict + self._sql.eid_insertdicts = {} + + def flush(self): + print 'starting flush' + _entities_sql = self._sql.entities + _relations_sql = self._sql.relations + _inlined_relations_sql = self._sql.inlined_relations + _insertdicts = self._sql.eid_insertdicts + try: + # try, for each inlined_relation, to find if we're also creating + # the host entity (i.e. the subject of the relation). + # In that case, simply update the insert dict and remove + # the need to make the + # UPDATE statement + for statement, datalist in _inlined_relations_sql.iteritems(): + new_datalist = [] + # for a given inlined relation, + # browse each couple to be inserted + for data in datalist: + keys = data.keys() + # For inlined relations, it exists only two case: + # (rtype, cw_eid) or (cw_eid, rtype) + if keys[0] == 'cw_eid': + rtype = keys[1] + else: + rtype = keys[0] + updated_eid = data['cw_eid'] + if updated_eid in _insertdicts: + _insertdicts[updated_eid][rtype] = data[rtype] + else: + # could not find corresponding insert dict, keep the + # UPDATE query + new_datalist.append(data) + _inlined_relations_sql[statement] = new_datalist + _import_statements(self.system_source.get_connection, + _entities_sql.items() + + _relations_sql.items() + + _inlined_relations_sql.items(), + dump_output_dir=self.dump_output_dir, + nb_threads=self.nb_threads_statement, + support_copy_from=self.support_copy_from, + encoding=self.dbencoding) + except: + print 'failed to flush' + finally: + _entities_sql.clear() + _relations_sql.clear() + _insertdicts.clear() + _inlined_relations_sql.clear() + print 'flush done' + + def add_relation(self, session, subject, rtype, object, + inlined=False, subjtype=None): + if inlined: + _sql = self._sql.inlined_relations + data = {'cw_eid': subject, SQL_PREFIX + rtype: object} + if subjtype is None: + # Try to infer it + targets = [t.type for t in + self.schema.rschema(rtype).targets()] + if len(targets) == 1: + subjtype = targets[0] + else: + raise ValueError('You should give the subject etype for ' + 'inlined relation %s' + ', as it cannot be inferred' % rtype) + statement = self.sqlgen.update(SQL_PREFIX + subjtype, + data, ['cw_eid']) + else: + _sql = self._sql.relations + data = {'eid_from': subject, 'eid_to': object} + statement = self.sqlgen.insert('%s_relation' % rtype, data) + if statement in _sql: + _sql[statement].append(data) + else: + _sql[statement] = [data] + + def add_entity(self, session, entity): + with self._storage_handler(entity, 'added'): + attrs = self.preprocess_entity(entity) + rtypes = self._inlined_rtypes_cache.get(entity.__regid__, ()) + if isinstance(rtypes, str): + rtypes = (rtypes,) + for rtype in rtypes: + if rtype not in attrs: + attrs[rtype] = None + sql = self.sqlgen.insert(SQL_PREFIX + entity.__regid__, attrs) + self._sql.eid_insertdicts[entity.eid] = attrs + self._append_to_entities(sql, attrs) + + def _append_to_entities(self, sql, attrs): + self._sql.entities[sql].append(attrs) + + def _handle_insert_entity_sql(self, session, sql, attrs): + # We have to overwrite the source given in parameters + # as here, we directly use the system source + attrs['source'] = 'system' + attrs['asource'] = self.system_source.uri + self._append_to_entities(sql, attrs) + + def _handle_is_relation_sql(self, session, sql, attrs): + self._append_to_entities(sql, attrs) + + def _handle_is_instance_of_sql(self, session, sql, attrs): + self._append_to_entities(sql, attrs) + + def _handle_source_relation_sql(self, session, sql, attrs): + self._append_to_entities(sql, attrs) + + # XXX add_info is similar to the one in NativeSQLSource. It is rewritten + # here to correctly used the _handle_xxx of the SQLGenSourceWrapper. This + # part should be rewritten in a more clearly way. + def add_info(self, session, entity, source, extid, complete): + """add type and source info for an eid into the system table""" + # begin by inserting eid/type/source/extid into the entities table + if extid is not None: + assert isinstance(extid, str) + extid = b64encode(extid) + uri = 'system' if source.copy_based_source else source.uri + attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid, + 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} + self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) + # insert core relations: is, is_instance_of and cw_source + try: + self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(session, entity.e_schema))) + except IndexError: + # during schema serialization, skip + pass + else: + for eschema in entity.e_schema.ancestors() + [entity.e_schema]: + self._handle_is_relation_sql(session, + 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(session, eschema))) + if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 + self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, source.eid)) + # now we can update the full text index + if self.do_fti and self.need_fti_indexation(entity.__regid__): + if complete: + entity.complete(entity.e_schema.indexable_attributes()) + self.index_entity(session, entity=entity) diff -r 7e415f457155 -r 7ee0752178e5 doc/3.16.rst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/3.16.rst Fri Dec 14 14:08:14 2012 +0100 @@ -0,0 +1,21 @@ +What's new in CubicWeb 3.16? +============================ + +New functionnalities +-------------------- + +* Add a new dataimport store (`SQLGenObjectStore`). This store enables a fast + import of data (entity creation, link creation) in CubicWeb, by directly + flushing information in SQL. This may only be used with PostgreSQL, as it + requires the 'COPY FROM' command. + +API changes +----------- + +Unintrusive API changes +----------------------- + + + +User interface changes +---------------------- diff -r 7e415f457155 -r 7ee0752178e5 doc/book/en/devrepo/dataimport.rst --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/book/en/devrepo/dataimport.rst Fri Dec 14 14:08:14 2012 +0100 @@ -0,0 +1,58 @@ +. -*- coding: utf-8 -*- + +.. _dataimport: + +Dataimport +========== + +*CubicWeb* is designed to manipulate huge of amount of data, and provides helper functions to do so. +These functions insert data within different levels of the *CubicWeb* API, +allowing different speed/security tradeoffs. Those keeping all the *CubicWeb* hooks +and security will be slower but the possible errors in insertion +(bad data types, integrity error, ...) will be raised. + +These dataimport function are provided in the file `dataimport.py`. + +All the stores have the following API:: + + >>> store = ObjectStore() + >>> user = store.create_entity('CWUser', login=u'johndoe') + >>> group = store.create_entity('CWUser', name=u'unknown') + >>> store.relate(user.eid, 'in_group', group.eid) + + +ObjectStore +----------- + +This store keeps objects in memory for *faster* validation. It may be useful +in development mode. However, as it will not enforce the constraints of the schema, +it may miss some problems. + + + +RQLObjectStore +-------------- + +This store works with an actual RQL repository, and it may be used in production mode. + + +NoHookRQLObjectStore +-------------------- + +This store works similarly to the *RQLObjectStore* but bypasses some *CubicWeb* hooks to be faster. + + +SQLGenObjectStore +----------------- + +This store relies on *COPY FROM*/execute many sql commands to directly push data using SQL commands +rather than using the whole *CubicWeb* API. For now, **it only works with PostgresSQL** as it requires +the *COPY FROM* command. + +The API is similar to the other stores, but **it requires a flush** after some imports to copy data +in the database (these flushes may be multiples through the processes, or be done only once at the +end if there is no memory issue):: + + >>> store = SQLGenObjectStore(session) + >>> store.create_entity('Person', ...) + >>> store.flush() diff -r 7e415f457155 -r 7ee0752178e5 server/sources/native.py --- a/server/sources/native.py Mon Nov 26 12:52:33 2012 +0100 +++ b/server/sources/native.py Fri Dec 14 14:08:14 2012 +0100 @@ -961,6 +961,14 @@ cnx.commit() return eid + def _handle_is_relation_sql(self, session, sql, attrs): + """ Handler for specific is_relation sql that may be + overwritten in some stores""" + self.doexec(session, sql % attrs) + + _handle_insert_entity_sql = doexec + _handle_is_instance_of_sql = _handle_source_relation_sql = _handle_is_relation_sql + def add_info(self, session, entity, source, extid, complete): """add type and source info for an eid into the system table""" # begin by inserting eid/type/source/extid into the entities table @@ -970,21 +978,22 @@ uri = 'system' if source.copy_based_source else source.uri attrs = {'type': entity.__regid__, 'eid': entity.eid, 'extid': extid, 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} - self.doexec(session, self.sqlgen.insert('entities', attrs), attrs) + self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) # insert core relations: is, is_instance_of and cw_source try: - self.doexec(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)' - % (entity.eid, eschema_eid(session, entity.e_schema))) + self._handle_is_relation_sql(session, 'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(session, entity.e_schema))) except IndexError: # during schema serialization, skip pass else: for eschema in entity.e_schema.ancestors() + [entity.e_schema]: - self.doexec(session, 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)' - % (entity.eid, eschema_eid(session, eschema))) + self._handle_is_relation_sql(session, + 'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, eschema_eid(session, eschema))) if 'CWSource' in self.schema and source.eid is not None: # else, cw < 3.10 - self.doexec(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) ' - 'VALUES (%s,%s)' % (entity.eid, source.eid)) + self._handle_is_relation_sql(session, 'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)', + (entity.eid, source.eid)) # now we can update the full text index if self.do_fti and self.need_fti_indexation(entity.__regid__): if complete: