[fti] Improve big table reindexation
* Slice the reindexing in batches of 1000 entities.
* Make the output more verbose.
Closes #3621392
# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Postgres specific store"""importthreadingimportwarningsimportcPickleimportos.pathasospfromStringIOimportStringIOfromtimeimportasctimefromdatetimeimportdate,datetime,timefromcollectionsimportdefaultdictfrombase64importb64encodefromcubicweb.utilsimportmake_uidfromcubicweb.server.sqlutilsimportSQL_PREFIXfromcubicweb.dataimport.storesimportNoHookRQLObjectStoredef_import_statements(sql_connect,statements,nb_threads=3,dump_output_dir=None,support_copy_from=True,encoding='utf-8'):""" Import a bunch of sql statements, using different threads. """try:chunksize=(len(statements)/nb_threads)+1threads=[]foriinxrange(nb_threads):chunks=statements[i*chunksize:(i+1)*chunksize]thread=threading.Thread(target=_execmany_thread,args=(sql_connect,chunks,dump_output_dir,support_copy_from,encoding))thread.start()threads.append(thread)fortinthreads:t.join()exceptException:print'Error in import statements'def_execmany_thread_not_copy_from(cu,statement,data,table=None,columns=None,encoding='utf-8'):""" Execute thread without copy from """cu.executemany(statement,data)def_execmany_thread_copy_from(cu,statement,data,table,columns,encoding='utf-8'):""" Execute thread with copy from """buf=_create_copyfrom_buffer(data,columns,encoding=encoding)ifbufisNone:_execmany_thread_not_copy_from(cu,statement,data)else:ifcolumnsisNone:cu.copy_from(buf,table,null='NULL')else:cu.copy_from(buf,table,null='NULL',columns=columns)def_execmany_thread(sql_connect,statements,dump_output_dir=None,support_copy_from=True,encoding='utf-8'):""" Execute sql statement. If 'INSERT INTO', try to use 'COPY FROM' command, or fallback to execute_many. """ifsupport_copy_from:execmany_func=_execmany_thread_copy_fromelse:execmany_func=_execmany_thread_not_copy_fromcnx=sql_connect()cu=cnx.cursor()try:forstatement,datainstatements:table=Nonecolumns=Nonetry:ifnotstatement.startswith('INSERT INTO'):cu.executemany(statement,data)continuetable=statement.split()[2]ifisinstance(data[0],(tuple,list)):columns=Noneelse:columns=list(data[0])execmany_func(cu,statement,data,table,columns,encoding)exceptException:print'unable to copy data into table %s'%table# Error in import statement, save data in dump_output_dirifdump_output_dirisnotNone:pdata={'data':data,'statement':statement,'time':asctime(),'columns':columns}filename=make_uid()try:withopen(osp.join(dump_output_dir,'%s.pickle'%filename),'w')asfobj:fobj.write(cPickle.dumps(pdata))exceptIOError:print'ERROR while pickling in',dump_output_dir,filename+'.pickle'passcnx.rollback()raisefinally:cnx.commit()cu.close()def_copyfrom_buffer_convert_None(value,**opts):'''Convert None value to "NULL"'''return'NULL'def_copyfrom_buffer_convert_number(value,**opts):'''Convert a number into its string representation'''returnstr(value)def_copyfrom_buffer_convert_string(value,**opts):'''Convert string value. Recognized keywords: :encoding: resulting string encoding (default: utf-8) '''encoding=opts.get('encoding','utf-8')escape_chars=((u'\\',ur'\\'),(u'\t',u'\\t'),(u'\r',u'\\r'),(u'\n',u'\\n'))forchar,replaceinescape_chars:value=value.replace(char,replace)ifisinstance(value,unicode):value=value.encode(encoding)returnvaluedef_copyfrom_buffer_convert_date(value,**opts):'''Convert date into "YYYY-MM-DD"'''# Do not use strftime, as it yields issue with date < 1900# (http://bugs.python.org/issue1777412)return'%04d-%02d-%02d'%(value.year,value.month,value.day)def_copyfrom_buffer_convert_datetime(value,**opts):'''Convert date into "YYYY-MM-DD HH:MM:SS.UUUUUU"'''# Do not use strftime, as it yields issue with date < 1900# (http://bugs.python.org/issue1777412)return'%s%s'%(_copyfrom_buffer_convert_date(value,**opts),_copyfrom_buffer_convert_time(value,**opts))def_copyfrom_buffer_convert_time(value,**opts):'''Convert time into "HH:MM:SS.UUUUUU"'''return'%02d:%02d:%02d.%06d'%(value.hour,value.minute,value.second,value.microsecond)# (types, converter) list._COPYFROM_BUFFER_CONVERTERS=[(type(None),_copyfrom_buffer_convert_None),((long,int,float),_copyfrom_buffer_convert_number),(basestring,_copyfrom_buffer_convert_string),(datetime,_copyfrom_buffer_convert_datetime),(date,_copyfrom_buffer_convert_date),(time,_copyfrom_buffer_convert_time),]def_create_copyfrom_buffer(data,columns=None,**convert_opts):""" Create a StringIO buffer for 'COPY FROM' command. Deals with Unicode, Int, Float, Date... (see ``converters``) :data: a sequence/dict of tuples :columns: list of columns to consider (default to all columns) :converter_opts: keyword arguements given to converters """# Create a list rather than directly create a StringIO# to correctly write lines separated by '\n' in a single steprows=[]ifcolumnsisNone:ifisinstance(data[0],(tuple,list)):columns=range(len(data[0]))elifisinstance(data[0],dict):columns=data[0].keys()else:raiseValueError('Could not get columns: you must provide columns.')forrowindata:# Iterate over the different columns and the different values# and try to convert them to a correct datatype.# If an error is raised, do not continue.formatted_row=[]forcolincolumns:try:value=row[col]exceptKeyError:warnings.warn(u"Column %s is not accessible in row %s"%(col,row),RuntimeWarning)# XXX 'value' set to None so that the import does not end in# error.# Instead, the extra keys are set to NULL from the# database point of view.value=Nonefortypes,converterin_COPYFROM_BUFFER_CONVERTERS:ifisinstance(value,types):value=converter(value,**convert_opts)breakelse:raiseValueError("Unsupported value type %s"%type(value))# We push the value to the new formatted row# if the value is not None and could be converted to a string.formatted_row.append(value)rows.append('\t'.join(formatted_row))returnStringIO('\n'.join(rows))classSQLGenObjectStore(NoHookRQLObjectStore):"""Controller of the data import process. This version is based on direct insertions throught SQL command (COPY FROM or execute many). >>> store = SQLGenObjectStore(cnx) >>> store.create_entity('Person', ...) >>> store.flush() """def__init__(self,cnx,dump_output_dir=None,nb_threads_statement=3):""" Initialize a SQLGenObjectStore. Parameters: - cnx: connection on the cubicweb instance - dump_output_dir: a directory to dump failed statements for easier recovery. Default is None (no dump). - nb_threads_statement: number of threads used for SQL insertion (default is 3). """super(SQLGenObjectStore,self).__init__(cnx)### hijack default sourceself.source=SQLGenSourceWrapper(self.source,cnx.vreg.schema,dump_output_dir=dump_output_dir,nb_threads_statement=nb_threads_statement)### XXX This is done in super().__init__(), but should be### redone here to link to the correct sourceself.add_relation=self.source.add_relationself.indexes_etypes={}defflush(self):"""Flush data to the database"""self.source.flush()defrelate(self,subj_eid,rtype,obj_eid,**kwargs):ifsubj_eidisNoneorobj_eidisNone:return# XXX Could subjtype be inferred ?self.source.add_relation(self._cnx,subj_eid,rtype,obj_eid,self.rschema(rtype).inlined,**kwargs)ifself.rschema(rtype).symmetric:self.source.add_relation(self._cnx,obj_eid,rtype,subj_eid,self.rschema(rtype).inlined,**kwargs)defdrop_indexes(self,etype):"""Drop indexes for a given entity type"""ifetypenotinself.indexes_etypes:cu=self._cnx.cnxset.cudefindex_to_attr(index):"""turn an index name to (database) attribute name"""returnindex.replace(etype.lower(),'').replace('idx','').strip('_')indices=[(index,index_to_attr(index))forindexinself.source.dbhelper.list_indices(cu,etype)# Do not consider 'cw_etype_pkey' indexifnotindex.endswith('key')]self.indexes_etypes[etype]=indicesforindex,attrinself.indexes_etypes[etype]:self._cnx.system_sql('DROP INDEX %s'%index)defcreate_indexes(self,etype):"""Recreate indexes for a given entity type"""forindex,attrinself.indexes_etypes.get(etype,[]):sql='CREATE INDEX %s ON cw_%s(%s)'%(index,etype,attr)self._cnx.system_sql(sql)############################################################################# SQL Source ########################################################################################################################################classSQLGenSourceWrapper(object):def__init__(self,system_source,schema,dump_output_dir=None,nb_threads_statement=3):self.system_source=system_sourceself._sql=threading.local()# Explicitely backport attributes from system sourceself._storage_handler=self.system_source._storage_handlerself.preprocess_entity=self.system_source.preprocess_entityself.sqlgen=self.system_source.sqlgenself.uri=self.system_source.uriself.eid=self.system_source.eid# Directory to write temporary filesself.dump_output_dir=dump_output_dir# Allow to execute code with SQLite backend that does# not support (yet...) copy_from# XXX Should be dealt with in logilab.databasespcfrom=system_source.dbhelper.dbapi_module.support_copy_fromself.support_copy_from=spcfromself.dbencoding=system_source.dbhelper.dbencodingself.nb_threads_statement=nb_threads_statement# initialize thread-local data for main threadself.init_thread_locals()self._inlined_rtypes_cache={}self._fill_inlined_rtypes_cache(schema)self.schema=schemaself.do_fti=Falsedef_fill_inlined_rtypes_cache(self,schema):cache=self._inlined_rtypes_cacheforeschemainschema.entities():forrschemaineschema.ordered_relations():ifrschema.inlined:cache[eschema.type]=SQL_PREFIX+rschema.typedefinit_thread_locals(self):"""initializes thread-local data"""self._sql.entities=defaultdict(list)self._sql.relations={}self._sql.inlined_relations={}# keep track, for each eid of the corresponding data dictself._sql.eid_insertdicts={}defflush(self):print'starting flush'_entities_sql=self._sql.entities_relations_sql=self._sql.relations_inlined_relations_sql=self._sql.inlined_relations_insertdicts=self._sql.eid_insertdictstry:# try, for each inlined_relation, to find if we're also creating# the host entity (i.e. the subject of the relation).# In that case, simply update the insert dict and remove# the need to make the# UPDATE statementforstatement,datalistin_inlined_relations_sql.iteritems():new_datalist=[]# for a given inlined relation,# browse each couple to be insertedfordataindatalist:keys=list(data)# For inlined relations, it exists only two case:# (rtype, cw_eid) or (cw_eid, rtype)ifkeys[0]=='cw_eid':rtype=keys[1]else:rtype=keys[0]updated_eid=data['cw_eid']ifupdated_eidin_insertdicts:_insertdicts[updated_eid][rtype]=data[rtype]else:# could not find corresponding insert dict, keep the# UPDATE querynew_datalist.append(data)_inlined_relations_sql[statement]=new_datalist_import_statements(self.system_source.get_connection,_entities_sql.items()+_relations_sql.items()+_inlined_relations_sql.items(),dump_output_dir=self.dump_output_dir,nb_threads=self.nb_threads_statement,support_copy_from=self.support_copy_from,encoding=self.dbencoding)finally:_entities_sql.clear()_relations_sql.clear()_insertdicts.clear()_inlined_relations_sql.clear()defadd_relation(self,cnx,subject,rtype,object,inlined=False,**kwargs):ifinlined:_sql=self._sql.inlined_relationsdata={'cw_eid':subject,SQL_PREFIX+rtype:object}subjtype=kwargs.get('subjtype')ifsubjtypeisNone:# Try to infer ittargets=[t.typefortinself.schema.rschema(rtype).subjects()]iflen(targets)==1:subjtype=targets[0]else:raiseValueError('You should give the subject etype for ''inlined relation %s'', as it cannot be inferred: ''this type is given as keyword argument ''``subjtype``'%rtype)statement=self.sqlgen.update(SQL_PREFIX+subjtype,data,['cw_eid'])else:_sql=self._sql.relationsdata={'eid_from':subject,'eid_to':object}statement=self.sqlgen.insert('%s_relation'%rtype,data)ifstatementin_sql:_sql[statement].append(data)else:_sql[statement]=[data]defadd_entity(self,cnx,entity):withself._storage_handler(entity,'added'):attrs=self.preprocess_entity(entity)rtypes=self._inlined_rtypes_cache.get(entity.cw_etype,())ifisinstance(rtypes,str):rtypes=(rtypes,)forrtypeinrtypes:ifrtypenotinattrs:attrs[rtype]=Nonesql=self.sqlgen.insert(SQL_PREFIX+entity.cw_etype,attrs)self._sql.eid_insertdicts[entity.eid]=attrsself._append_to_entities(sql,attrs)def_append_to_entities(self,sql,attrs):self._sql.entities[sql].append(attrs)def_handle_insert_entity_sql(self,cnx,sql,attrs):# We have to overwrite the source given in parameters# as here, we directly use the system sourceattrs['asource']=self.system_source.uriself._append_to_entities(sql,attrs)def_handle_is_relation_sql(self,cnx,sql,attrs):self._append_to_entities(sql,attrs)def_handle_is_instance_of_sql(self,cnx,sql,attrs):self._append_to_entities(sql,attrs)def_handle_source_relation_sql(self,cnx,sql,attrs):self._append_to_entities(sql,attrs)# add_info is _copypasted_ from the one in NativeSQLSource. We want it# there because it will use the _handlers of the SQLGenSourceWrapper, which# are not like the ones in the native source.defadd_info(self,cnx,entity,source,extid):"""add type and source info for an eid into the system table"""# begin by inserting eid/type/source/extid into the entities tableifextidisnotNone:assertisinstance(extid,str)extid=b64encode(extid)attrs={'type':entity.cw_etype,'eid':entity.eid,'extid':extid,'asource':source.uri}self._handle_insert_entity_sql(cnx,self.sqlgen.insert('entities',attrs),attrs)# insert core relations: is, is_instance_of and cw_sourcetry:self._handle_is_relation_sql(cnx,'INSERT INTO is_relation(eid_from,eid_to) VALUES (%s,%s)',(entity.eid,eschema_eid(cnx,entity.e_schema)))exceptIndexError:# during schema serialization, skippasselse:foreschemainentity.e_schema.ancestors()+[entity.e_schema]:self._handle_is_relation_sql(cnx,'INSERT INTO is_instance_of_relation(eid_from,eid_to) VALUES (%s,%s)',(entity.eid,eschema_eid(cnx,eschema)))if'CWSource'inself.schemaandsource.eidisnotNone:# else, cw < 3.10self._handle_is_relation_sql(cnx,'INSERT INTO cw_source_relation(eid_from,eid_to) VALUES (%s,%s)',(entity.eid,source.eid))# now we can update the full text indexifself.do_ftiandself.need_fti_indexation(entity.cw_etype):self.index_entity(cnx,entity=entity)