[session] use a dedicated class to track cnxset
We introduce a new CnxSetTracker to track `cnxset` used by Transaction and
allows to wait for them. This new class does not use Thread ID not thread
joining to work. This allows to use multiple transaction per thread and a
transaction in multiple thread.
The class itself is totally threadsafe by the Transaction is still not
thread safe.
The old _threads_in_transaction attribute is dropped in favor of a new logic
based on this object. The registration of cnxset used is not done by the
Transaction itself. tx.cnset is a property handling the Consistency of its value
with the CnxSetTracker instance.
Note: The CnxSetTracker instance only track transaction id, not transaction
itself, So not reference cycle are created.
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Integrity checking tool for instances:* integrity of a CubicWeb repository. Hum actually only the system database is checked."""__docformat__="restructuredtext en"importsysfromdatetimeimportdatetimefromlogilab.common.shellutilsimportProgressBarfromcubicweb.schemaimportPURE_VIRTUAL_RTYPES,VIRTUAL_RTYPESfromcubicweb.server.sqlutilsimportSQL_PREFIXdefnotify_fixed(fix):iffix:sys.stderr.write(' [FIXED]')sys.stderr.write('\n')defhas_eid(session,sqlcursor,eid,eids):"""return true if the eid is a valid eid"""ifeidineids:returneids[eid]sqlcursor.execute('SELECT type, source FROM entities WHERE eid=%s'%eid)try:etype,source=sqlcursor.fetchone()exceptException:eids[eid]=FalsereturnFalseifsourceandsource!='system':try:# insert eid *and* etype to attempt checking entity has not been# replaced by another subsquently to a restore of an old dumpifsession.execute('Any X WHERE X is %s, X eid %%(x)s'%etype,{'x':eid}):eids[eid]=TruereturnTrueexceptException:# TypeResolverError, Unauthorized...passeids[eid]=FalsereturnFalsesqlcursor.execute('SELECT * FROM %s%s WHERE %seid=%s'%(SQL_PREFIX,etype,SQL_PREFIX,eid))result=sqlcursor.fetchall()iflen(result)==0:eids[eid]=FalsereturnFalseeliflen(result)>1:msg=(' More than one entity with eid %s exists in source !\n'' WARNING : Unable to fix this, do it yourself !\n')sys.stderr.write(msg%eid)eids[eid]=TruereturnTrue# XXX move to yams?defetype_fti_containers(eschema,_done=None):if_doneisNone:_done=set()_done.add(eschema)containers=tuple(eschema.fulltext_containers())ifcontainers:forrschema,targetincontainers:iftarget=='object':targets=rschema.objects(eschema)else:targets=rschema.subjects(eschema)fortargeteschemaintargets:iftargeteschemain_done:continue_done.add(targeteschema)forcontainerinetype_fti_containers(targeteschema,_done):yieldcontainerelse:yieldeschemadefreindex_entities(schema,session,withpb=True,etypes=None):"""reindex all entities in the repository"""# deactivate modification_date hook since we don't want them# to be updated due to the reindexationrepo=session.repocursor=session.cnxset['system']dbhelper=session.repo.system_source.dbhelperifnotdbhelper.has_fti_table(cursor):print'no text index table'dbhelper.init_fti(cursor)repo.system_source.do_fti=True# ensure full-text indexation is activatedifetypesisNone:print'Reindexing entities'etypes=set()foreschemainschema.entities():ifeschema.final:continueindexable_attrs=tuple(eschema.indexable_attributes())# generatorifnotindexable_attrs:continueforcontainerinetype_fti_containers(eschema):etypes.add(container)# clear fti table firstsession.system_sql('DELETE FROM %s'%dbhelper.fti_table)else:print'Reindexing entities of type %s'% \', '.join(sorted(str(e)foreinetypes))# clear fti table first. Use subquery for sql compatibilitysession.system_sql("DELETE FROM %s WHERE EXISTS(SELECT 1 FROM ENTITIES ""WHERE eid=%s AND type IN (%s))"%(dbhelper.fti_table,dbhelper.fti_uid_attr,','.join("'%s'"%etypeforetypeinetypes)))ifwithpb:pb=ProgressBar(len(etypes)+1)pb.update()# reindex entities by generating rql queries which set all indexable# attribute to their current valuesource=repo.system_sourceforeschemainetypes:etype_class=session.vreg['etypes'].etype_class(str(eschema))forfti_rqlinetype_class.cw_fti_index_rql_queries(session):rset=session.execute(fti_rql)source.fti_index_entities(session,rset.entities())# clear entity cache to avoid high memory consumption on big tablessession.drop_entity_cache()ifwithpb:pb.update()defcheck_schema(schema,session,eids,fix=1):"""check serialized schema"""print'Checking serialized schema'unique_constraints=('SizeConstraint','FormatConstraint','VocabularyConstraint','RQLVocabularyConstraint')rql=('Any COUNT(X),RN,SN,ON,CTN GROUPBY RN,SN,ON,CTN ORDERBY 1 ''WHERE X is CWConstraint, R constrained_by X, ''R relation_type RT, RT name RN, R from_entity ST, ST name SN, ''R to_entity OT, OT name ON, X cstrtype CT, CT name CTN')forcount,rn,sn,on,cstrnameinsession.execute(rql):ifcount==1:continueifcstrnameinunique_constraints:print"ERROR: got %s%r constraints on relation %s.%s.%s"%(count,cstrname,sn,rn,on)iffix:print'dunno how to fix, do it yourself'defcheck_text_index(schema,session,eids,fix=1):"""check all entities registered in the text index"""print'Checking text index'msg=' Entity with eid %s exists in the text index but in no source (autofix will remove from text index)'cursor=session.system_sql('SELECT uid FROM appears;')forrowincursor.fetchall():eid=row[0]ifnothas_eid(session,cursor,eid,eids):sys.stderr.write(msg%eid)iffix:session.system_sql('DELETE FROM appears WHERE uid=%s;'%eid)notify_fixed(fix)defcheck_entities(schema,session,eids,fix=1):"""check all entities registered in the repo system table"""print'Checking entities system table'# system table but no sourcemsg=' Entity with eid %s exists in the system table but in no source (autofix will delete the entity)'cursor=session.system_sql('SELECT eid FROM entities;')forrowincursor.fetchall():eid=row[0]ifnothas_eid(session,cursor,eid,eids):sys.stderr.write(msg%eid)iffix:session.system_sql('DELETE FROM entities WHERE eid=%s;'%eid)notify_fixed(fix)# source in entities, but no relation cw_sourceapplcwversion=session.repo.get_versions().get('cubicweb')ifapplcwversion>=(3,13,1):# entities.asource appeared in 3.13.1cursor=session.system_sql('SELECT e.eid FROM entities as e, cw_CWSource as s ''WHERE s.cw_name=e.asource AND ''NOT EXISTS(SELECT 1 FROM cw_source_relation as cs '' WHERE cs.eid_from=e.eid AND cs.eid_to=s.cw_eid) ''ORDER BY e.eid')msg=(' Entity with eid %s refers to source in entities table, ''but is missing relation cw_source (autofix will create the relation)\n')forrowincursor.fetchall():sys.stderr.write(msg%row[0])iffix:session.system_sql('INSERT INTO cw_source_relation (eid_from, eid_to) ''SELECT e.eid, s.cw_eid FROM entities as e, cw_CWSource as s ''WHERE s.cw_name=e.asource AND NOT EXISTS(SELECT 1 FROM cw_source_relation as cs '' WHERE cs.eid_from=e.eid AND cs.eid_to=s.cw_eid)')notify_fixed(True)# inconsistencies for 'is'msg=' %s #%s is missing relation "is" (autofix will create the relation)\n'cursor=session.system_sql('SELECT e.type, e.eid FROM entities as e, cw_CWEType as s ''WHERE s.cw_name=e.type AND NOT EXISTS(SELECT 1 FROM is_relation as cs '' WHERE cs.eid_from=e.eid AND cs.eid_to=s.cw_eid) ''ORDER BY e.eid')forrowincursor.fetchall():sys.stderr.write(msg%row)iffix:session.system_sql('INSERT INTO is_relation (eid_from, eid_to) ''SELECT e.eid, s.cw_eid FROM entities as e, cw_CWEType as s ''WHERE s.cw_name=e.type AND NOT EXISTS(SELECT 1 FROM is_relation as cs '' WHERE cs.eid_from=e.eid AND cs.eid_to=s.cw_eid)')notify_fixed(True)# inconsistencies for 'is_instance_of'msg=' %s #%s is missing relation "is_instance_of" (autofix will create the relation)\n'cursor=session.system_sql('SELECT e.type, e.eid FROM entities as e, cw_CWEType as s ''WHERE s.cw_name=e.type AND NOT EXISTS(SELECT 1 FROM is_instance_of_relation as cs '' WHERE cs.eid_from=e.eid AND cs.eid_to=s.cw_eid) ''ORDER BY e.eid')forrowincursor.fetchall():sys.stderr.write(msg%row)iffix:session.system_sql('INSERT INTO is_instance_of_relation (eid_from, eid_to) ''SELECT e.eid, s.cw_eid FROM entities as e, cw_CWEType as s ''WHERE s.cw_name=e.type AND NOT EXISTS(SELECT 1 FROM is_instance_of_relation as cs '' WHERE cs.eid_from=e.eid AND cs.eid_to=s.cw_eid)')notify_fixed(True)print'Checking entities tables'msg=' Entity with eid %s exists in the %s table but not in the system table (autofix will delete the entity)'foreschemainschema.entities():ifeschema.final:continuetable=SQL_PREFIX+eschema.typecolumn=SQL_PREFIX+'eid'cursor=session.system_sql('SELECT %s FROM %s;'%(column,table))forrowincursor.fetchall():eid=row[0]# eids is full since we have fetched everything from the entities table,# no need to call has_eidifnoteidineidsornoteids[eid]:sys.stderr.write(msg%(eid,eschema.type))iffix:session.system_sql('DELETE FROM %s WHERE %s=%s;'%(table,column,eid))notify_fixed(fix)defbad_related_msg(rtype,target,eid,fix):msg=' A relation %s with %s eid %s exists but no such entity in sources'sys.stderr.write(msg%(rtype,target,eid))notify_fixed(fix)defcheck_relations(schema,session,eids,fix=1):"""check that eids referenced by relations are registered in the repo system table """print'Checking relations'forrschemainschema.relations():ifrschema.finalorrschema.typeinPURE_VIRTUAL_RTYPES:continueifrschema.inlined:forsubjtypeinrschema.subjects():table=SQL_PREFIX+str(subjtype)column=SQL_PREFIX+str(rschema)sql='SELECT %s FROM %s WHERE %s IS NOT NULL;'%(column,table,column)cursor=session.system_sql(sql)forrowincursor.fetchall():eid=row[0]ifnothas_eid(session,cursor,eid,eids):bad_related_msg(rschema,'object',eid,fix)iffix:sql='UPDATE %s SET %s=NULL WHERE %s=%s;'%(table,column,column,eid)session.system_sql(sql)continuetry:cursor=session.system_sql('SELECT eid_from FROM %s_relation;'%rschema)exceptExceptionasex:# usually because table doesn't existprint'ERROR',excontinueforrowincursor.fetchall():eid=row[0]ifnothas_eid(session,cursor,eid,eids):bad_related_msg(rschema,'subject',eid,fix)iffix:sql='DELETE FROM %s_relation WHERE eid_from=%s;'%(rschema,eid)session.system_sql(sql)cursor=session.system_sql('SELECT eid_to FROM %s_relation;'%rschema)forrowincursor.fetchall():eid=row[0]ifnothas_eid(session,cursor,eid,eids):bad_related_msg(rschema,'object',eid,fix)iffix:sql='DELETE FROM %s_relation WHERE eid_to=%s;'%(rschema,eid)session.system_sql(sql)defcheck_mandatory_relations(schema,session,eids,fix=1):"""check entities missing some mandatory relation"""print'Checking mandatory relations'msg='%s #%s is missing mandatory %s relation %s (autofix will delete the entity)'forrschemainschema.relations():ifrschema.finalorrschemainPURE_VIRTUAL_RTYPESorrschemain('is','is_instance_of'):continuesmandatory=set()omandatory=set()forrdefinrschema.rdefs.itervalues():ifrdef.cardinality[0]in'1+':smandatory.add(rdef.subject)ifrdef.cardinality[1]in'1+':omandatory.add(rdef.object)forrole,etypesin(('subject',smandatory),('object',omandatory)):foretypeinetypes:ifrole=='subject':rql='Any X WHERE NOT X %s Y, X is %s'%(rschema,etype)else:rql='Any X WHERE NOT Y %s X, X is %s'%(rschema,etype)forentityinsession.execute(rql).entities():sys.stderr.write(msg%(entity.__regid__,entity.eid,role,rschema))iffix:#if entity.cw_describe()['source']['uri'] == 'system': XXXentity.cw_delete()# XXX this is BRUTAL!notify_fixed(fix)defcheck_mandatory_attributes(schema,session,eids,fix=1):"""check for entities stored in the system source missing some mandatory attribute """print'Checking mandatory attributes'msg='%s #%s is missing mandatory attribute %s (autofix will delete the entity)'forrschemainschema.relations():ifnotrschema.finalorrschemainVIRTUAL_RTYPES:continueforrdefinrschema.rdefs.itervalues():ifrdef.cardinality[0]in'1+':rql='Any X WHERE X %s NULL, X is %s, X cw_source S, S name "system"'%(rschema,rdef.subject)forentityinsession.execute(rql).entities():sys.stderr.write(msg%(entity.__regid__,entity.eid,rschema))iffix:entity.cw_delete()notify_fixed(fix)defcheck_metadata(schema,session,eids,fix=1):"""check entities has required metadata FIXME: rewrite using RQL queries ? """print'Checking metadata'cursor=session.system_sql("SELECT DISTINCT type FROM entities;")eidcolumn=SQL_PREFIX+'eid'msg=' %s with eid %s has no %s (autofix will set it to now)'foretype,incursor.fetchall():table=SQL_PREFIX+etypeforrel,defaultin(('creation_date',datetime.now()),('modification_date',datetime.now()),):column=SQL_PREFIX+relcursor=session.system_sql("SELECT %s FROM %s WHERE %s is NULL"%(eidcolumn,table,column))foreid,incursor.fetchall():sys.stderr.write(msg%(etype,eid,rel))iffix:session.system_sql("UPDATE %s SET %s=%%(v)s WHERE %s=%s ;"%(table,column,eidcolumn,eid),{'v':default})notify_fixed(fix)defcheck(repo,cnx,checks,reindex,fix,withpb=True):"""check integrity of instance's repository, using given user and password to locally connect to the repository (no running cubicweb server needed) """session=repo._get_session(cnx.sessionid,setcnxset=True)# yo, launch checksifchecks:eids_cache={}withsession.security_enabled(read=False,write=False):# ensure no read securityforcheckinchecks:check_func=globals()['check_%s'%check]check_func(repo.schema,session,eids_cache,fix=fix)iffix:session.commit()else:printifnotfix:print'WARNING: Diagnostic run, nothing has been corrected'ifreindex:session.rollback()session.set_cnxset()reindex_entities(repo.schema,session,withpb=withpb)session.commit()