# copyright 2003-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Defines the central class for the CubicWeb RQL server: the repository.The repository is an abstraction allowing execution of rql queries againstdata sources. Most of the work is actually done in helper classes. Therepository mainly:* brings these classes all together to provide a single access point to a cubicweb instance.* handles session management* provides method for pyro registration, to call if pyro is enabled"""__docformat__="restructuredtext en"importsysimportthreadingimportQueuefromwarningsimportwarnfromitertoolsimportchainfromos.pathimportjoinfromdatetimeimportdatetimefromtimeimporttime,localtime,strftimefromwarningsimportwarnfromlogilab.common.decoratorsimportcached,clear_cachefromlogilab.common.compatimportanyfromlogilab.commonimportflattenfromyamsimportBadSchemaDefinitionfromyams.schemaimportrole_namefromrqlimportRQLSyntaxErrorfromrql.utilsimportrqlvar_makerfromcubicwebimport(CW_SOFTWARE_ROOT,CW_MIGRATION_MAP,QueryError,UnknownEid,AuthenticationError,ExecutionError,ETypeNotSupportedBySources,MultiSourcesError,BadConnectionId,Unauthorized,ValidationError,RepositoryError,UniqueTogetherError,onevent)fromcubicwebimportcwvreg,schema,serverfromcubicweb.serverimportShuttingDown,utils,hook,pool,querier,sourcesfromcubicweb.server.sessionimportSession,InternalSession,InternalManagerfromcubicweb.server.ssplannerimportEditedEntityNO_CACHE_RELATIONS=set([('owned_by','object'),('created_by','object'),('cw_source','object'),])defprefill_entity_caches(entity):session=entity._cw# prefill entity relation cachesforrschemainentity.e_schema.subject_relations():rtype=str(rschema)ifrtypeinschema.VIRTUAL_RTYPESor(rtype,'subject')inNO_CACHE_RELATIONS:continueifrschema.final:entity.cw_attr_cache.setdefault(rtype,None)else:entity.cw_set_relation_cache(rtype,'subject',session.empty_rset())forrschemainentity.e_schema.object_relations():rtype=str(rschema)ifrtypeinschema.VIRTUAL_RTYPESor(rtype,'object')inNO_CACHE_RELATIONS:continueentity.cw_set_relation_cache(rtype,'object',session.empty_rset())defdel_existing_rel_if_needed(session,eidfrom,rtype,eidto):"""delete existing relation when adding a new one if card is 1 or ? have to be done once the new relation has been inserted to avoid having an entity without a relation for some time this kind of behaviour has to be done in the repository so we don't have hooks order hazardness """# skip that if integrity explicitly disabledifnotsession.is_hook_category_activated('activeintegrity'):returnrdef=session.rtype_eids_rdef(rtype,eidfrom,eidto)card=rdef.cardinality# one may be tented to check for neweids but this may cause more than one# relation even with '1?' cardinality if thoses relations are added in the# same transaction where the entity is being created. This never occurs from# the web interface but may occurs during test or dbapi connection (though# not expected for this). So: don't do it, we pretend to ensure repository# consistency.## notes:# * inlined relations will be implicitly deleted for the subject entity# * we don't want read permissions to be applied but we want delete# permission to be checkedifcard[0]in'1?':withsession.security_enabled(read=False):session.execute('DELETE X %s Y WHERE X eid %%(x)s, ''NOT Y eid %%(y)s'%rtype,{'x':eidfrom,'y':eidto})ifcard[1]in'1?':withsession.security_enabled(read=False):session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ''NOT X eid %%(x)s'%rtype,{'x':eidfrom,'y':eidto})defpreprocess_inlined_relations(session,entity):"""when an entity is added, check if it has some inlined relation which requires to be extrated for proper call hooks """relations=[]activeintegrity=session.is_hook_category_activated('activeintegrity')eschema=entity.e_schemaforattrinentity.cw_edited:rschema=eschema.subjrels[attr]ifnotrschema.final:# inlined relationvalue=entity.cw_edited[attr]relations.append((attr,value))session.update_rel_cache_add(entity.eid,attr,value)rdef=session.rtype_eids_rdef(attr,entity.eid,value)ifrdef.cardinality[1]in'1?'andactiveintegrity:withsession.security_enabled(read=False):session.execute('DELETE X %s Y WHERE Y eid %%(y)s'%attr,{'x':entity.eid,'y':value})returnrelationsclassNullEventBus(object):defpublish(self,msg):passdefadd_subscription(self,topic,callback):passdefstart(self):passdefstop(self):passclassRepository(object):"""a repository provides access to a set of persistent storages for entities and relations XXX protect pyro access """def__init__(self,config,tasks_manager=None,vreg=None):self.config=configifvregisNone:vreg=cwvreg.CWRegistryStore(config)self.vreg=vregself._tasks_manager=tasks_managerself.pyro_registered=Falseself.pyro_uri=None# every pyro client is handled in its own thread; map these threads to# the session we opened for them so we can clean up when they go awayself._pyro_sessions={}self.app_instances_bus=NullEventBus()self.info('starting repository from %s',self.config.apphome)# dictionary of opened sessionsself._sessions={}# list of functions to be called at regular interval# list of running threadsself._running_threads=[]# initial schema, should be build or replaced latterself.schema=schema.CubicWebSchema(config.appid)self.vreg.schema=self.schema# until actual schema is loaded...# shutdown flagself.shutting_down=False# sources (additional sources info in the system database)self.system_source=self.get_source('native','system',config.sources()['system'].copy())self.sources=[self.system_source]self.sources_by_uri={'system':self.system_source}# querier helper, need to be created after sources initializationself.querier=querier.QuerierHelper(self,self.schema)# cache eid -> (type, physical source, extid, actual source)self._type_source_cache={}# cache (extid, source uri) -> eidself._extid_cache={}# open some connection setsifconfig.init_cnxset_pool:self.init_cnxset_pool()# the hooks managerself.hm=hook.HooksManager(self.vreg)# registry hook to fix user class on registry reload@onevent('after-registry-reload',self)deffix_user_classes(self):# After registery reload the 'CWUser' class used for CWEtype# changed. To any existing user object have a different class than# the new loaded one. We are hot fixing this.usercls=self.vreg['etypes'].etype_class('CWUser')forsessioninself._sessions.itervalues():ifnotisinstance(session.user,InternalManager):session.user.__class__=userclsdefinit_cnxset_pool(self):"""should be called bootstrap_repository, as this is what it does"""config=self.configself._cnxsets_pool=Queue.Queue()# 0. init a cnxset that will be used to fetch bootstrap information from# the databaseself._cnxsets_pool.put_nowait(pool.ConnectionsSet(self.sources))# 1. set used cubesifconfig.creatingornotconfig.read_instance_schema:config.bootstrap_cubes()else:self.set_schema(self.config.load_bootstrap_schema(),resetvreg=False)config.init_cubes(self.get_cubes())# 2. load schemaifconfig.quick_start:# quick start: only to get a minimal repository to get cubes# information (eg dump/restore/...)## restrict appobject_path to only load hooks and entity classes in# the registryconfig.cube_appobject_path=set(('hooks','entities'))config.cubicweb_appobject_path=set(('hooks','entities'))# limit connections pool to 1config['connections-pool-size']=1ifconfig.quick_startorconfig.creatingornotconfig.read_instance_schema:# load schema from the file systemifnotconfig.creating:self.warning("set fs instance'schema")self.set_schema(config.load_schema(expand_cubes=True))else:# normal start: load the instance schema from the databaseself.info('loading schema from the repository')self.set_schema(self.deserialize_schema())# 3. initialize data sourcesifconfig.creating:# call init_creating so that for instance native source can# configurate tsearch according to postgres versionforsourceinself.sources:source.init_creating()else:self.init_sources_from_database()if'CWProperty'inself.schema:self.vreg.init_properties(self.properties())# 4. close initialization connection set and reopen fresh ones for# proper initializationself._get_cnxset().close(True)self.cnxsets=[]# list of available cnxsets (can't iterate on a Queue)foriinxrange(config['connections-pool-size']):self.cnxsets.append(pool.ConnectionsSet(self.sources))self._cnxsets_pool.put_nowait(self.cnxsets[-1])# internals ###############################################################definit_sources_from_database(self):self.sources_by_eid={}ifself.config.quick_start \ornot'CWSource'inself.schema:# # 3.10 migrationself.system_source.init_creating()returnwithself.internal_session()assession:# FIXME: sources should be ordered (add_entity priority)forsourceentinsession.execute('Any S, SN, SA, SC WHERE S is_instance_of CWSource, ''S name SN, S type SA, S config SC').entities():ifsourceent.name=='system':self.system_source.eid=sourceent.eidself.sources_by_eid[sourceent.eid]=self.system_sourceself.system_source.init(True,sourceent)continueself.add_source(sourceent,add_to_cnxsets=False)def_clear_planning_caches(self):forcachein('source_defs','is_multi_sources_relation','can_cross_relation','rel_type_sources'):clear_cache(self,cache)defadd_source(self,sourceent,add_to_cnxsets=True):source=self.get_source(sourceent.type,sourceent.name,sourceent.host_config,sourceent.eid)self.sources_by_eid[sourceent.eid]=sourceself.sources_by_uri[sourceent.name]=sourceifself.config.source_enabled(source):# call source's init method to complete their initialisation if# needed (for instance looking for persistent configuration using an# internal session, which is not possible until connections sets have been# initialized)source.init(True,sourceent)ifnotsource.copy_based_source:warn('[3.18] old multi-source system will go away in the next version',DeprecationWarning)self.sources.append(source)self.querier.set_planner()ifadd_to_cnxsets:forcnxsetinself.cnxsets:cnxset.add_source(source)else:source.init(False,sourceent)self._clear_planning_caches()defremove_source(self,uri):source=self.sources_by_uri.pop(uri)delself.sources_by_eid[source.eid]ifself.config.source_enabled(source)andnotsource.copy_based_source:self.sources.remove(source)self.querier.set_planner()forcnxsetinself.cnxsets:cnxset.remove_source(source)self._clear_planning_caches()defget_source(self,type,uri,source_config,eid=None):# set uri and type in source config so it's available through# source_defs()source_config['uri']=urisource_config['type']=typereturnsources.get_source(type,source_config,self,eid)defset_schema(self,schema,resetvreg=True):self.info('set schema %s%#x',schema.name,id(schema))ifresetvreg:# trigger full reload of all appobjectsself.vreg.set_schema(schema)else:self.vreg._set_schema(schema)self.querier.set_schema(schema)# don't use self.sources, we may want to give schema even to disabled# sourcesforsourceinself.sources_by_uri.itervalues():source.set_schema(schema)self.schema=schemadefdeserialize_schema(self):"""load schema from the database"""fromcubicweb.server.schemaserialimportdeserialize_schemaappschema=schema.CubicWebSchema(self.config.appid)self.debug('deserializing db schema into %s%#x',appschema.name,id(appschema))withself.internal_session()assession:try:deserialize_schema(appschema,session)exceptBadSchemaDefinition:raiseexceptExceptionasex:importtracebacktraceback.print_exc()raise(Exception('Is the database initialised ? (cause: %s)'%ex),None,sys.exc_info()[-1])returnappschemadef_prepare_startup(self):"""Prepare "Repository as a server" for startup. * trigger server startup hook, * register session clean up task. """ifnot(self.config.creatingorself.config.repairingorself.config.quick_start):# call instance level initialisation hooksself.hm.call_hooks('server_startup',repo=self)# register a task to cleanup expired sessionself.cleanup_session_time=self.config['cleanup-session-time']or60*60*24assertself.cleanup_session_time>0cleanup_session_interval=min(60*60,self.cleanup_session_time/3)assertself._tasks_managerisnotNone,"This Repository is not intended to be used as a server"self._tasks_manager.add_looping_task(cleanup_session_interval,self.clean_sessions)defstart_looping_tasks(self):"""Actual "Repository as a server" startup. * trigger server startup hook, * register session clean up task, * start all tasks. XXX Other startup related stuffs are done elsewhere. In Repository XXX __init__ or in external codes (various server managers). """self._prepare_startup()assertself._tasks_managerisnotNone,"This Repository is not intended to be used as a server"self._tasks_manager.start()deflooping_task(self,interval,func,*args):"""register a function to be called every `interval` seconds. looping tasks can only be registered during repository initialization, once done this method will fail. """assertself._tasks_managerisnotNone,"This Repository is not intended to be used as a server"self._tasks_manager.add_looping_task(interval,func,*args)defthreaded_task(self,func):"""start function in a separated thread"""utils.RepoThread(func,self._running_threads).start()#@lockeddef_get_cnxset(self):try:returnself._cnxsets_pool.get(True,timeout=5)exceptQueue.Empty:raiseException('no connections set available after 5 secs, probably either a ''bug in code (too many uncommited/rolled back ''connections) or too much load on the server (in ''which case you can try to set a bigger ''connections pool size)')def_free_cnxset(self,cnxset):self._cnxsets_pool.put_nowait(cnxset)defpinfo(self):# XXX: session.cnxset is accessed from a local storage, would be interesting# to see if there is a cnxset set in any thread specific data)return'%s: %s (%s)'%(self._cnxsets_pool.qsize(),','.join(session.user.loginforsessioninself._sessions.itervalues()ifsession.cnxset),threading.currentThread())defshutdown(self):"""called on server stop event to properly close opened sessions and connections """assertnotself.shutting_down,'already shutting down'ifnot(self.config.creatingorself.config.repairingorself.config.quick_start):# then, the system source is still availableself.hm.call_hooks('before_server_shutdown',repo=self)self.shutting_down=Trueself.system_source.shutdown()ifself._tasks_managerisnotNone:self._tasks_manager.stop()ifnot(self.config.creatingorself.config.repairingorself.config.quick_start):self.hm.call_hooks('server_shutdown',repo=self)forthreadinself._running_threads:self.info('waiting thread %s...',thread.getName())thread.join()self.info('thread %s finished',thread.getName())self.close_sessions()whilenotself._cnxsets_pool.empty():cnxset=self._cnxsets_pool.get_nowait()try:cnxset.close(True)exceptException:self.exception('error while closing %s'%cnxset)continueifself.pyro_registered:ifself._use_pyrons():pyro_unregister(self.config)self.pyro_uri=Nonehits,misses=self.querier.cache_hit,self.querier.cache_misstry:self.info('rql st cache hit/miss: %s/%s (%s%% hits)',hits,misses,(hits*100)/(hits+misses))hits,misses=self.system_source.cache_hit,self.system_source.cache_missself.info('sql cache hit/miss: %s/%s (%s%% hits)',hits,misses,(hits*100)/(hits+misses))nocache=self.system_source.no_cacheself.info('sql cache usage: %s/%s (%s%%)',hits+misses,nocache,((hits+misses)*100)/(hits+misses+nocache))exceptZeroDivisionError:passdefcheck_auth_info(self,session,login,authinfo):"""validate authentication, raise AuthenticationError on failure, return associated CWUser's eid on success. """# iter on sources_by_uri then check enabled source since sources doesn't# contain copy based sourcesforsourceinself.sources_by_uri.itervalues():ifself.config.source_enabled(source)andsource.support_entity('CWUser'):try:returnsource.authenticate(session,login,**authinfo)exceptAuthenticationError:continueelse:raiseAuthenticationError('authentication failed with all sources')defauthenticate_user(self,session,login,**authinfo):"""validate login / password, raise AuthenticationError on failure return associated CWUser instance on success """eid=self.check_auth_info(session,login,authinfo)cwuser=self._build_user(session,eid)ifself.config.consider_user_stateand \notcwuser.cw_adapt_to('IWorkflowable').stateincwuser.AUTHENTICABLE_STATES:raiseAuthenticationError('user is not in authenticable state')returncwuserdef_build_user(self,session,eid):"""return a CWUser entity for user with the given eid"""cls=self.vreg['etypes'].etype_class('CWUser')st=cls.fetch_rqlst(session.user,ordermethod=None)st.add_eid_restriction(st.get_variable('X'),'x','Substitute')rset=session.execute(st.as_string(),{'x':eid})assertlen(rset)==1,rsetcwuser=rset.get_entity(0,0)# pylint: disable=W0104# prefetch / cache cwuser's groups and properties. This is especially# useful for internal sessions to avoid security insertionscwuser.groupscwuser.propertiesreturncwuser# public (dbapi) interface ################################################defstats(self):# XXX restrict to managers session?"""Return a dictionary containing some statistics about the repository resources usage. This is a public method, not requiring a session id. """results={}querier=self.queriersource=self.system_sourceforsize,maxsize,hits,misses,titlein((len(querier._rql_cache),self.config['rql-cache-size'],querier.cache_hit,querier.cache_miss,'rqlt_st'),(len(source._cache),self.config['rql-cache-size'],source.cache_hit,source.cache_miss,'sql'),):results['%s_cache_size'%title]='%s / %s'%(size,maxsize)results['%s_cache_hit'%title]=hitsresults['%s_cache_miss'%title]=missesresults['%s_cache_hit_percent'%title]=(hits*100)/(hits+misses)results['type_source_cache_size']=len(self._type_source_cache)results['extid_cache_size']=len(self._extid_cache)results['sql_no_cache']=self.system_source.no_cacheresults['nb_open_sessions']=len(self._sessions)results['nb_active_threads']=threading.activeCount()looping_tasks=self._tasks_manager._looping_tasksresults['looping_tasks']=', '.join(str(t)fortinlooping_tasks)results['available_cnxsets']=self._cnxsets_pool.qsize()results['threads']=', '.join(sorted(str(t)fortinthreading.enumerate()))returnresultsdefgc_stats(self,nmax=20):"""Return a dictionary containing some statistics about the repository memory usage. This is a public method, not requiring a session id. nmax is the max number of (most) referenced object returned as the 'referenced' result """fromcubicweb._gcdebugimportgc_infofromcubicweb.appobjectimportAppObjectfromcubicweb.rsetimportResultSetfromcubicweb.dbapiimportConnection,Cursorfromcubicweb.web.requestimportCubicWebRequestBasefromrql.stmtsimportUnionlookupclasses=(AppObject,Union,ResultSet,Connection,Cursor,CubicWebRequestBase)try:fromcubicweb.server.sessionimportSession,InternalSessionlookupclasses+=(InternalSession,Session)exceptImportError:pass# no server part installedresults={}counters,ocounters,garbage=gc_info(lookupclasses,viewreferrersclasses=())values=sorted(counters.iteritems(),key=lambdax:x[1],reverse=True)results['lookupclasses']=valuesvalues=sorted(ocounters.iteritems(),key=lambdax:x[1],reverse=True)[:nmax]results['referenced']=valuesresults['unreachable']=len(garbage)returnresultsdefget_schema(self):"""Return the instance schema. This is a public method, not requiring a session id. """returnself.schemadefget_cubes(self):"""Return the list of cubes used by this instance. This is a public method, not requiring a session id. """versions=self.get_versions(not(self.config.creatingorself.config.repairingorself.config.quick_startorself.config.mode=='test'))cubes=list(versions)cubes.remove('cubicweb')returncubesdefget_option_value(self,option,foreid=None):"""Return the value for `option` in the configuration. If `foreid` is specified, the actual repository to which this entity belongs is derefenced and the option value retrieved from it. This is a public method, not requiring a session id. """# XXX we may want to check we don't give sensible information# XXX the only cube using 'foreid', apycot, stop used this, we probably# want to drop this argumentifforeidisNone:returnself.config[option]_,sourceuri,extid,_=self.type_and_source_from_eid(foreid)ifsourceuri=='system':returnself.config[option]cnxset=self._get_cnxset()try:cnx=cnxset.connection(sourceuri)# needed to check connection is valid and usable by the current# threadnewcnx=self.sources_by_uri[sourceuri].check_connection(cnx)ifnewcnxisnotNone:cnx=newcnxreturncnx.get_option_value(option,extid)finally:self._free_cnxset(cnxset)@cacheddefget_versions(self,checkversions=False):"""Return the a dictionary containing cubes used by this instance as key with their version as value, including cubicweb version. This is a public method, not requiring a session id. """fromlogilab.common.changelogimportVersionvcconf={}withself.internal_session()assession:forpk,versioninsession.execute('Any K,V WHERE P is CWProperty, P value V, P pkey K, ''P pkey ~="system.version.%"',build_descr=False):cube=pk.split('.')[-1]# XXX cubicweb migrationifcubeinCW_MIGRATION_MAP:cube=CW_MIGRATION_MAP[cube]version=Version(version)vcconf[cube]=versionifcheckversions:ifcube!='cubicweb':fsversion=self.config.cube_version(cube)else:fsversion=self.config.cubicweb_version()ifversion<fsversion:msg=('instance has %s version %s but %s ''is installed. Run "cubicweb-ctl upgrade".')raiseExecutionError(msg%(cube,version,fsversion))returnvcconf@cacheddefsource_defs(self):"""Return the a dictionary containing source uris as value and a dictionary describing each source as value. This is a public method, not requiring a session id. """sources={}# remove sensitive informationforuri,sourceinself.sources_by_uri.iteritems():sources[uri]=source.public_configreturnsourcesdefproperties(self):"""Return a result set containing system wide properties. This is a public method, not requiring a session id. """withself.internal_session()assession:# don't use session.execute, we don't want rset.req setreturnself.querier.execute(session,'Any K,V WHERE P is CWProperty,''P pkey K, P value V, NOT P for_user U',build_descr=False)# XXX protect this method: anonymous should be allowed and registration# pluggeddefregister_user(self,login,password,email=None,**kwargs):"""check a user with the given login exists, if not create it with the given password. This method is designed to be used for anonymous registration on public web site. """withself.internal_session()assession:# for consistency, keep same error as unique check hook (although not required)errmsg=session._('the value "%s" is already used, use another one')if(session.execute('CWUser X WHERE X login %(login)s',{'login':login},build_descr=False)orsession.execute('CWUser X WHERE X use_email C, C address %(login)s',{'login':login},build_descr=False)):qname=role_name('login','subject')raiseValidationError(None,{qname:errmsg%login})# we have to create the useruser=self.vreg['etypes'].etype_class('CWUser')(session)ifisinstance(password,unicode):# password should *always* be utf8 encodedpassword=password.encode('UTF8')kwargs['login']=loginkwargs['upassword']=passwordself.glob_add_entity(session,EditedEntity(user,**kwargs))session.execute('SET X in_group G WHERE X eid %(x)s, G name "users"',{'x':user.eid})ifemailor'@'inlogin:d={'login':login,'email':emailorlogin}ifsession.execute('EmailAddress X WHERE X address %(email)s',d,build_descr=False):qname=role_name('address','subject')raiseValidationError(None,{qname:errmsg%d['email']})session.execute('INSERT EmailAddress X: X address %(email)s, ''U primary_email X, U use_email X ''WHERE U login %(login)s',d,build_descr=False)session.commit()returnTruedeffind_users(self,fetch_attrs,**query_attrs):"""yield user attributes for cwusers matching the given query_attrs (the result set cannot survive this method call) This can be used by low-privileges account (anonymous comes to mind). `fetch_attrs`: tuple of attributes to be fetched `query_attrs`: dict of attr/values to restrict the query """assertquery_attrsifnothasattr(self,'_cwuser_attrs'):cwuser=self.schema['CWUser']self._cwuser_attrs=set(str(rschema)forrschema,_eschemaincwuser.attribute_definitions()ifnotrschema.meta)cwuserattrs=self._cwuser_attrsforkinchain(fetch_attrs,query_attrs):ifknotincwuserattrs:raiseException('bad input for find_user')withself.internal_session()assession:varmaker=rqlvar_maker()vars=[(attr,varmaker.next())forattrinfetch_attrs]rql='Any %s WHERE X is CWUser, '%','.join(var[1]forvarinvars)rql+=','.join('X %s%s'%(var[0],var[1])forvarinvars)+','rset=session.execute(rql+','.join('X %s%%(%s)s'%(attr,attr)forattrinquery_attrs),query_attrs)returnrset.rowsdefconnect(self,login,**kwargs):"""open a connection for a given user raise `AuthenticationError` if the authentication failed raise `ConnectionError` if we can't open a connection """cnxprops=kwargs.pop('cnxprops',None)# use an internal connectionwithself.internal_session()assession:# try to get a user objectuser=self.authenticate_user(session,login,**kwargs)session=Session(user,self,cnxprops)ifthreading.currentThread()inself._pyro_sessions:# assume no pyro client does one get_repository followed by# multiple repo.connectassertself._pyro_sessions[threading.currentThread()]==Noneself.debug('record session %s',session)self._pyro_sessions[threading.currentThread()]=sessionuser._cw=user.cw_rset.req=sessionuser.cw_clear_relation_cache()self._sessions[session.id]=sessionself.info('opened session %s for user %s',session.id,login)self.hm.call_hooks('session_open',session)# commit session at this point in case write operation has been done# during `session_open` hookssession.commit()returnsession.iddefexecute(self,sessionid,rqlstring,args=None,build_descr=True,txid=None):"""execute a RQL query * rqlstring should be an unicode string or a plain ascii string * args the optional parameters used in the query * build_descr is a flag indicating if the description should be built on select queries """session=self._get_session(sessionid,setcnxset=True,txid=txid)try:try:rset=self.querier.execute(session,rqlstring,args,build_descr)# NOTE: the web front will (re)build it when needed# e.g in facets# Zeroed to avoid useless overhead with pyrorset._rqlst=Nonereturnrsetexcept(ValidationError,Unauthorized,RQLSyntaxError):raiseexceptException:# FIXME: check error to catch internal errorsself.exception('unexpected error while executing %s with %s',rqlstring,args)raisefinally:session.free_cnxset()defdescribe(self,sessionid,eid,txid=None):"""return a tuple `(type, physical source uri, extid, actual source uri)` for the entity of the given `eid` """session=self._get_session(sessionid,setcnxset=True,txid=txid)try:returnself.type_and_source_from_eid(eid,session)finally:session.free_cnxset()defcheck_session(self,sessionid):"""raise `BadConnectionId` if the connection is no more valid, else return its latest activity timestamp. """returnself._get_session(sessionid,setcnxset=False).timestampdefget_shared_data(self,sessionid,key,default=None,pop=False,txdata=False):"""return value associated to key in the session's data dictionary or session's transaction's data if `txdata` is true. If pop is True, value will be removed from the dictionary. If key isn't defined in the dictionary, value specified by the `default` argument will be returned. """session=self._get_session(sessionid,setcnxset=False)returnsession.get_shared_data(key,default,pop,txdata)defset_shared_data(self,sessionid,key,value,txdata=False):"""set value associated to `key` in shared data if `txdata` is true, the value will be added to the repository session's transaction's data which are cleared on commit/rollback of the current transaction. """session=self._get_session(sessionid,setcnxset=False)session.set_shared_data(key,value,txdata)defcommit(self,sessionid,txid=None):"""commit transaction for the session with the given id"""self.debug('begin commit for session %s',sessionid)try:session=self._get_session(sessionid)session.set_tx(txid)returnsession.commit()except(ValidationError,Unauthorized):raiseexceptException:self.exception('unexpected error')raisedefrollback(self,sessionid,txid=None):"""commit transaction for the session with the given id"""self.debug('begin rollback for session %s',sessionid)try:session=self._get_session(sessionid)session.set_tx(txid)session.rollback()exceptException:self.exception('unexpected error')raisedefclose(self,sessionid,txid=None,checkshuttingdown=True):"""close the session with the given id"""session=self._get_session(sessionid,setcnxset=True,txid=txid,checkshuttingdown=checkshuttingdown)# operation uncommited before close are rolled back before hook is calledsession.rollback(free_cnxset=False)self.hm.call_hooks('session_close',session)# commit session at this point in case write operation has been done# during `session_close` hookssession.commit()session.close()ifthreading.currentThread()inself._pyro_sessions:self._pyro_sessions[threading.currentThread()]=Nonedelself._sessions[sessionid]self.info('closed session %s for user %s',sessionid,session.user.login)defcall_service(self,sessionid,regid,async,**kwargs):""" See :class:`cubicweb.dbapi.Connection.call_service` and :class:`cubicweb.server.Service` """session=self._get_session(sessionid)returnself._call_service_with_session(session,regid,async,**kwargs)def_call_service_with_session(self,session,regid,async,**kwargs):ifasync:self.info('calling service %s asynchronously',regid)deftask():session.set_cnxset()try:service=session.vreg['services'].select(regid,session,**kwargs)returnservice.call(**kwargs)finally:session.rollback()# free cnxsetself.threaded_task(task)else:self.info('calling service %s synchronously',regid)session.set_cnxset()try:service=session.vreg['services'].select(regid,session,**kwargs)returnservice.call(**kwargs)finally:session.free_cnxset()defuser_info(self,sessionid,props=None):"""this method should be used by client to: * check session id validity * update user information on each user's request (i.e. groups and custom properties) """user=self._get_session(sessionid,setcnxset=False).userreturnuser.eid,user.login,user.groups,user.propertiesdefundoable_transactions(self,sessionid,ueid=None,txid=None,**actionfilters):"""See :class:`cubicweb.dbapi.Connection.undoable_transactions`"""session=self._get_session(sessionid,setcnxset=True,txid=txid)try:returnself.system_source.undoable_transactions(session,ueid,**actionfilters)finally:session.free_cnxset()deftransaction_info(self,sessionid,txuuid,txid=None):"""See :class:`cubicweb.dbapi.Connection.transaction_info`"""session=self._get_session(sessionid,setcnxset=True,txid=txid)try:returnself.system_source.tx_info(session,txuuid)finally:session.free_cnxset()deftransaction_actions(self,sessionid,txuuid,public=True,txid=None):"""See :class:`cubicweb.dbapi.Connection.transaction_actions`"""session=self._get_session(sessionid,setcnxset=True,txid=txid)try:returnself.system_source.tx_actions(session,txuuid,public)finally:session.free_cnxset()defundo_transaction(self,sessionid,txuuid,txid=None):"""See :class:`cubicweb.dbapi.Connection.undo_transaction`"""session=self._get_session(sessionid,setcnxset=True,txid=txid)try:returnself.system_source.undo_transaction(session,txuuid)finally:session.free_cnxset()# public (inter-repository) interface #####################################defentities_modified_since(self,etypes,mtime):"""function designed to be called from an external repository which is using this one as a rql source for synchronization, and return a 3-uple containing : * the local date * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """withself.internal_session()assession:updatetime=datetime.utcnow()modentities,delentities=self.system_source.modified_entities(session,etypes,mtime)returnupdatetime,modentities,delentities# session handling ########################################################defclose_sessions(self):"""close every opened sessions"""forsessionidinlist(self._sessions):try:self.close(sessionid,checkshuttingdown=False)exceptException:# XXX BaseException?self.exception('error while closing session %s'%sessionid)defclean_sessions(self):"""close sessions not used since an amount of time specified in the configuration """mintime=time()-self.cleanup_session_timeself.debug('cleaning session unused since %s',strftime('%T',localtime(mintime)))nbclosed=0forsessioninself._sessions.values():ifsession.timestamp<mintime:self.close(session.id)nbclosed+=1returnnbcloseddefinternal_session(self,cnxprops=None,safe=False):"""return a dbapi like connection/cursor using internal user which have every rights on the repository. The `safe` argument is a boolean flag telling if integrity hooks should be activated or not. *YOU HAVE TO* commit/rollback or close (rollback implicitly) the session once the job's done, else you'll leak connections set up to the time where no one is available, causing irremediable freeze... """session=InternalSession(self,cnxprops,safe)session.set_cnxset()returnsessiondef_get_session(self,sessionid,setcnxset=False,txid=None,checkshuttingdown=True):"""return the session associated with the given session identifier"""ifcheckshuttingdownandself.shutting_down:raiseShuttingDown('Repository is shutting down')try:session=self._sessions[sessionid]exceptKeyError:raiseBadConnectionId('No such session %s'%sessionid)ifsetcnxset:session.set_tx(txid)# must be done before set_cnxsetsession.set_cnxset()returnsession# data sources handling #################################################### * correspondance between eid and (type, source)# * correspondance between eid and local id (i.e. specific to a given source)deftype_and_source_from_eid(self,eid,session=None):"""return a tuple `(type, physical source uri, extid, actual source uri)` for the entity of the given `eid` """try:eid=int(eid)exceptValueError:raiseUnknownEid(eid)try:returnself._type_source_cache[eid]exceptKeyError:ifsessionisNone:session=self.internal_session()free_cnxset=Trueelse:free_cnxset=Falsetry:etype,uri,extid,auri=self.system_source.eid_type_source(session,eid)finally:iffree_cnxset:session.free_cnxset()self._type_source_cache[eid]=(etype,uri,extid,auri)ifuri!='system':self._extid_cache[(extid,uri)]=eidreturnetype,uri,extid,auridefclear_caches(self,eids):etcache=self._type_source_cacheextidcache=self._extid_cacherqlcache=self.querier._rql_cacheforeidineids:try:etype,uri,extid,auri=etcache.pop(int(eid))# may be a string in some casesrqlcache.pop(('%s X WHERE X eid %s'%(etype,eid),),None)extidcache.pop((extid,uri),None)exceptKeyError:etype=Nonerqlcache.pop(('Any X WHERE X eid %s'%eid,),None)forsourceinself.sources:source.clear_eid_cache(eid,etype)deftype_from_eid(self,eid,session=None):"""return the type of the entity with id <eid>"""returnself.type_and_source_from_eid(eid,session)[0]defsource_from_eid(self,eid,session=None):"""return the source for the given entity's eid"""returnself.sources_by_uri[self.type_and_source_from_eid(eid,session)[1]]defquerier_cache_key(self,session,rql,args,eidkeys):cachekey=[rql]forkeyinsorted(eidkeys):try:etype=self.type_from_eid(args[key],session)exceptKeyError:raiseQueryError('bad cache key %s (no value)'%key)exceptTypeError:raiseQueryError('bad cache key %s (value: %r)'%(key,args[key]))cachekey.append(etype)# ensure eid is correctly typed in argsargs[key]=int(args[key])returntuple(cachekey)defeid2extid(self,source,eid,session=None):"""get local id from an eid"""etype,uri,extid,_=self.type_and_source_from_eid(eid,session)ifsource.uri!=uri:# eid not from the given sourceraiseUnknownEid(eid)returnextiddefextid2eid(self,source,extid,etype,session=None,insert=True,complete=True,commit=True,sourceparams=None):"""Return eid from a local id. If the eid is a negative integer, that means the entity is known but has been copied back to the system source hence should be ignored. If no record is found, ie the entity is not known yet: 1. an eid is attributed 2. the source's :meth:`before_entity_insertion` method is called to build the entity instance 3. unless source's :attr:`should_call_hooks` tell otherwise, 'before_add_entity' hooks are called 4. record is added into the system source 5. the source's :meth:`after_entity_insertion` method is called to complete building of the entity instance 6. unless source's :attr:`should_call_hooks` tell otherwise, 'before_add_entity' hooks are called """uri='system'ifsource.copy_based_sourceelsesource.uricachekey=(extid,uri)try:returnself._extid_cache[cachekey]exceptKeyError:passfree_cnxset=FalseifsessionisNone:session=self.internal_session()free_cnxset=Trueeid=self.system_source.extid2eid(session,uri,extid)ifeidisnotNone:self._extid_cache[cachekey]=eidself._type_source_cache[eid]=(etype,uri,extid,source.uri)iffree_cnxset:session.free_cnxset()returneidifnotinsert:return# no link between extid and eid, create one using an internal session# since the current session user may not have required permissions to# do necessary stuff and we don't want to commit user session.## Moreover, even if session is already an internal session but is# processing a commit, we have to use another oneifnotsession.is_internal_session:session=self.internal_session()free_cnxset=Truetry:eid=self.system_source.create_eid(session)self._extid_cache[cachekey]=eidself._type_source_cache[eid]=(etype,uri,extid,source.uri)entity=source.before_entity_insertion(session,extid,etype,eid,sourceparams)ifsource.should_call_hooks:# get back a copy of operation for later restore if necessary,# see belowpending_operations=session.pending_operations[:]self.hm.call_hooks('before_add_entity',session,entity=entity)self.add_info(session,entity,source,extid,complete=complete)source.after_entity_insertion(session,extid,entity,sourceparams)ifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',session,entity=entity)ifcommitorfree_cnxset:session.commit(free_cnxset)returneidexceptException:ifcommitorfree_cnxset:session.rollback(free_cnxset)else:# XXX do some cleanup manually so that the transaction has a# chance to be commited, with simply this entity discardedself._extid_cache.pop(cachekey,None)self._type_source_cache.pop(eid,None)if'entity'inlocals():hook.CleanupDeletedEidsCacheOp.get_instance(session).add_data(entity.eid)self.system_source.delete_info_multi(session,[entity],uri)ifsource.should_call_hooks:session._tx.pending_operations=pending_operationsraisedefadd_info(self,session,entity,source,extid=None,complete=True):"""add type and source info for an eid into the system table, and index the entity with the full text index """# begin by inserting eid/type/source/extid into the entities tablehook.CleanupNewEidsCacheOp.get_instance(session).add_data(entity.eid)self.system_source.add_info(session,entity,source,extid,complete)defdelete_info(self,session,entity,sourceuri,scleanup=None):"""called by external source when some entity known by the system source has been deleted in the external source """# mark eid as being deleted in session info and setup cache update# operationhook.CleanupDeletedEidsCacheOp.get_instance(session).add_data(entity.eid)self._delete_info(session,entity,sourceuri,scleanup)def_delete_info(self,session,entity,sourceuri,scleanup=None):"""delete system information on deletion of an entity: * delete all remaining relations from/to this entity * call delete info on the system source which will transfer record from the entities table to the deleted_entities table When scleanup is specified, it's expected to be the source's eid, in which case we'll specify the target's relation source so that this source is ignored. E.g. we want to delete relations stored locally, as the deletion information comes from the external source, it's its responsability to have cleaned-up its own relations. """pendingrtypes=session.transaction_data.get('pendingrtypes',())ifscleanupisnotNone:source=self.sources_by_eid[scleanup]# delete remaining relations: if user can delete the entity, he can# delete all its relations without security checkingwithsession.security_enabled(read=False,write=False):eid=entity.eidforrschema,_,roleinentity.e_schema.relation_definitions():rtype=rschema.typeifrtypeinschema.VIRTUAL_RTYPESorrtypeinpendingrtypes:continueifrole=='subject':# don't skip inlined relation so they are regularly# deleted and so hooks are correctly calledrql='DELETE X %s Y WHERE X eid %%(x)s'%rtypeelse:rql='DELETE Y %s X WHERE X eid %%(x)s'%rtypeifscleanupisnotNone:# if the relation can't be crossed, nothing to cleanup (we# would get a BadRQLQuery from the multi-sources planner).# This may still leave some junk if the mapping has changed# at some point, but one can still run db-check to catch# thoseifnotsourceinself.can_cross_relation(rtype):continue# source cleaning: only delete relations stored locally# (here, scleanuprql+=', NOT (Y cw_source S, S eid %(seid)s)'try:session.execute(rql,{'x':eid,'seid':scleanup},build_descr=False)exceptException:ifself.config.mode=='test':raiseself.exception('error while cascading delete for entity %s ''from %s. RQL: %s',entity,sourceuri,rql)self.system_source.delete_info_multi(session,[entity],sourceuri)def_delete_info_multi(self,session,entities,sourceuri,scleanup=None):"""same as _delete_info but accepts a list of entities with the same etype and belinging to the same source. """pendingrtypes=session.transaction_data.get('pendingrtypes',())ifscleanupisnotNone:source=self.sources_by_eid[scleanup]# delete remaining relations: if user can delete the entity, he can# delete all its relations without security checkingwithsession.security_enabled(read=False,write=False):in_eids=','.join([str(_e.eid)for_einentities])forrschema,_,roleinentities[0].e_schema.relation_definitions():rtype=rschema.typeifrtypeinschema.VIRTUAL_RTYPESorrtypeinpendingrtypes:continueifrole=='subject':# don't skip inlined relation so they are regularly# deleted and so hooks are correctly calledrql='DELETE X %s Y WHERE X eid IN (%s)'%(rtype,in_eids)else:rql='DELETE Y %s X WHERE X eid IN (%s)'%(rtype,in_eids)ifscleanupisnotNone:# if the relation can't be crossed, nothing to cleanup (we# would get a BadRQLQuery from the multi-sources planner).# This may still leave some junk if the mapping has changed# at some point, but one can still run db-check to catch# thoseifnotsourceinself.can_cross_relation(rtype):continue# source cleaning: only delete relations stored locallyrql+=', NOT (Y cw_source S, S eid %(seid)s)'try:session.execute(rql,{'seid':scleanup},build_descr=False)exceptValidationError:raiseexceptUnauthorized:self.exception('Unauthorized exception while cascading delete for entity %s ''from %s. RQL: %s.\nThis should not happen since security is disabled here.',entities,sourceuri,rql)raiseexceptException:ifself.config.mode=='test':raiseself.exception('error while cascading delete for entity %s ''from %s. RQL: %s',entities,sourceuri,rql)self.system_source.delete_info_multi(session,entities,sourceuri)deflocate_relation_source(self,session,subject,rtype,object):subjsource=self.source_from_eid(subject,session)objsource=self.source_from_eid(object,session)ifnotsubjsourceisobjsource:source=self.system_sourceifnot(subjsource.may_cross_relation(rtype)andobjsource.may_cross_relation(rtype)):raiseMultiSourcesError("relation %s can't be crossed among sources"%rtype)elifnotsubjsource.support_relation(rtype):source=self.system_sourceelse:source=subjsourceifnotsource.support_relation(rtype,True):raiseMultiSourcesError("source %s doesn't support write of %s relation"%(source.uri,rtype))returnsourcedeflocate_etype_source(self,etype):forsourceinself.sources:ifsource.support_entity(etype,1):returnsourceelse:raiseETypeNotSupportedBySources(etype)definit_entity_caches(self,session,entity,source):"""add entity to session entities cache and repo's extid cache. Return entity's ext id if the source isn't the system source. """session.set_entity_cache(entity)suri=source.uriifsuri=='system':extid=Noneelse:ifsource.copy_based_source:suri='system'extid=source.get_extid(entity)self._extid_cache[(str(extid),suri)]=entity.eidself._type_source_cache[entity.eid]=(entity.cw_etype,suri,extid,source.uri)returnextiddefglob_add_entity(self,session,edited):"""add an entity to the repository the entity eid should originaly be None and a unique eid is assigned to the entity instance """entity=edited.entityentity._cw_is_saved=False# entity has an eid but is not yet saved# init edited_attributes before calling before_add_entity hooksentity.cw_edited=editedsource=self.locate_etype_source(entity.cw_etype)# allocate an eid to the entity before calling hooksentity.eid=self.system_source.create_eid(session)# set caches asapextid=self.init_entity_caches(session,entity,source)ifserver.DEBUG&server.DBG_REPO:print'ADD entity',self,entity.cw_etype,entity.eid,editedprefill_entity_caches(entity)ifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',session,entity=entity)relations=preprocess_inlined_relations(session,entity)edited.set_defaults()ifsession.is_hook_category_activated('integrity'):edited.check(creation=True)try:source.add_entity(session,entity)exceptUniqueTogetherErrorasexc:userhdlr=session.vreg['adapters'].select('IUserFriendlyError',session,entity=entity,exc=exc)userhdlr.raise_user_exception()self.add_info(session,entity,source,extid,complete=False)edited.saved=entity._cw_is_saved=True# trigger after_add_entity after after_add_relationifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',session,entity=entity)# call hooks for inlined relationsforattr,valueinrelations:self.hm.call_hooks('before_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)self.hm.call_hooks('after_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)returnentity.eiddefglob_update_entity(self,session,edited):"""replace an entity in the repository the type and the eid of an entity must not be changed """entity=edited.entityifserver.DEBUG&server.DBG_REPO:print'UPDATE entity',entity.cw_etype,entity.eid, \entity.cw_attr_cache,editedhm=self.hmeschema=entity.e_schemasession.set_entity_cache(entity)orig_edited=getattr(entity,'cw_edited',None)entity.cw_edited=editedtry:only_inline_rels,need_fti_update=True,Falserelations=[]source=self.source_from_eid(entity.eid,session)forattrinlist(edited):ifattr=='eid':continuerschema=eschema.subjrels[attr]ifrschema.final:ifgetattr(eschema.rdef(attr),'fulltextindexed',False):need_fti_update=Trueonly_inline_rels=Falseelse:# inlined relationprevious_value=entity.related(attr)orNoneifprevious_valueisnotNone:previous_value=previous_value[0][0]# got a result setifprevious_value==entity.cw_attr_cache[attr]:previous_value=Noneelifsource.should_call_hooks:hm.call_hooks('before_delete_relation',session,eidfrom=entity.eid,rtype=attr,eidto=previous_value)relations.append((attr,edited[attr],previous_value))ifsource.should_call_hooks:# call hooks for inlined relationsforattr,value,_tinrelations:hm.call_hooks('before_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)ifnotonly_inline_rels:hm.call_hooks('before_update_entity',session,entity=entity)ifsession.is_hook_category_activated('integrity'):edited.check()try:source.update_entity(session,entity)edited.saved=TrueexceptUniqueTogetherErrorasexc:userhdlr=session.vreg['adapters'].select('IUserFriendlyError',session,entity=entity,exc=exc)userhdlr.raise_user_exception()self.system_source.update_info(session,entity,need_fti_update)ifsource.should_call_hooks:ifnotonly_inline_rels:hm.call_hooks('after_update_entity',session,entity=entity)forattr,value,prevvalueinrelations:# if the relation is already cached, update existant cacherelcache=entity.cw_relation_cached(attr,'subject')ifprevvalueisnotNone:hm.call_hooks('after_delete_relation',session,eidfrom=entity.eid,rtype=attr,eidto=prevvalue)ifrelcacheisnotNone:session.update_rel_cache_del(entity.eid,attr,prevvalue)del_existing_rel_if_needed(session,entity.eid,attr,value)ifrelcacheisnotNone:session.update_rel_cache_add(entity.eid,attr,value)else:entity.cw_set_relation_cache(attr,'subject',session.eid_rset(value))hm.call_hooks('after_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)finally:iforig_editedisnotNone:entity.cw_edited=orig_editeddefglob_delete_entities(self,session,eids):"""delete a list of entities and all related entities from the repository"""# mark eids as being deleted in session info and setup cache update# operation (register pending eids before actual deletion to avoid# multiple call to glob_delete_entities)op=hook.CleanupDeletedEidsCacheOp.get_instance(session)ifnotisinstance(eids,(set,frozenset)):warn('[3.13] eids should be given as a set',DeprecationWarning,stacklevel=2)eids=frozenset(eids)eids=eids-op._containerop._container|=eidsdata_by_etype_source={}# values are ([list of eids],# [list of extid],# [list of entities])## WARNING: the way this dictionary is populated is heavily optimized# and does not use setdefault on purpose. Unless a new release# of the Python interpreter advertises large perf improvements# in setdefault, this should not be changed without profiling.foreidineids:etype,sourceuri,extid,_=self.type_and_source_from_eid(eid,session)# XXX should cache entity's cw_metainformationentity=session.entity_from_eid(eid,etype)try:data_by_etype_source[(etype,sourceuri)].append(entity)exceptKeyError:data_by_etype_source[(etype,sourceuri)]=[entity]for(etype,sourceuri),entitiesindata_by_etype_source.iteritems():ifserver.DEBUG&server.DBG_REPO:print'DELETE entities',etype,[entity.eidforentityinentities]source=self.sources_by_uri[sourceuri]ifsource.should_call_hooks:self.hm.call_hooks('before_delete_entity',session,entities=entities)ifsession.deleted_in_transaction(source.eid):# source is being deleted, think to give scleanup argumentself._delete_info_multi(session,entities,sourceuri,scleanup=source.eid)else:self._delete_info_multi(session,entities,sourceuri)source.delete_entities(session,entities)ifsource.should_call_hooks:self.hm.call_hooks('after_delete_entity',session,entities=entities)# don't clear cache here, it is done in a hook on commitdefglob_add_relation(self,session,subject,rtype,object):"""add a relation to the repository"""self.glob_add_relations(session,{rtype:[(subject,object)]})defglob_add_relations(self,session,relations):"""add several relations to the repository relations is a dictionary rtype: [(subj_eid, obj_eid), ...] """sources={}subjects_by_types={}objects_by_types={}activintegrity=session.is_hook_category_activated('activeintegrity')forrtype,eids_subj_objinrelations.iteritems():ifserver.DEBUG&server.DBG_REPO:forsubjeid,objeidineids_subj_obj:print'ADD relation',subjeid,rtype,objeidforsubjeid,objeidineids_subj_obj:source=self.locate_relation_source(session,subjeid,rtype,objeid)ifsourcenotinsources:relations_by_rtype={}sources[source]=relations_by_rtypeelse:relations_by_rtype=sources[source]ifrtypeinrelations_by_rtype:relations_by_rtype[rtype].append((subjeid,objeid))else:relations_by_rtype[rtype]=[(subjeid,objeid)]ifnotactivintegrity:continue# take care to relation of cardinality '?1', as all eids will# be inserted later, we've remove duplicated eids since they# won't be catched by `del_existing_rel_if_needed`rdef=session.rtype_eids_rdef(rtype,subjeid,objeid)card=rdef.cardinalityifcard[0]in'?1':withsession.security_enabled(read=False):session.execute('DELETE X %s Y WHERE X eid %%(x)s, ''NOT Y eid %%(y)s'%rtype,{'x':subjeid,'y':objeid})subjects=subjects_by_types.setdefault(rdef,{})ifsubjeidinsubjects:delrelations_by_rtype[rtype][subjects[subjeid]]subjects[subjeid]=len(relations_by_rtype[rtype])-1continuesubjects[subjeid]=len(relations_by_rtype[rtype])-1ifcard[1]in'?1':withsession.security_enabled(read=False):session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ''NOT X eid %%(x)s'%rtype,{'x':subjeid,'y':objeid})objects=objects_by_types.setdefault(rdef,{})ifobjeidinobjects:delrelations_by_rtype[rtype][objects[objeid]]objects[objeid]=len(relations_by_rtype[rtype])continueobjects[objeid]=len(relations_by_rtype[rtype])forsource,relations_by_rtypeinsources.iteritems():ifsource.should_call_hooks:forrtype,source_relationsinrelations_by_rtype.iteritems():self.hm.call_hooks('before_add_relation',session,rtype=rtype,eids_from_to=source_relations)forrtype,source_relationsinrelations_by_rtype.iteritems():source.add_relations(session,rtype,source_relations)rschema=self.schema.rschema(rtype)forsubjeid,objeidinsource_relations:session.update_rel_cache_add(subjeid,rtype,objeid,rschema.symmetric)ifsource.should_call_hooks:forrtype,source_relationsinrelations_by_rtype.iteritems():self.hm.call_hooks('after_add_relation',session,rtype=rtype,eids_from_to=source_relations)defglob_delete_relation(self,session,subject,rtype,object):"""delete a relation from the repository"""ifserver.DEBUG&server.DBG_REPO:print'DELETE relation',subject,rtype,objectsource=self.locate_relation_source(session,subject,rtype,object)ifsource.should_call_hooks:self.hm.call_hooks('before_delete_relation',session,eidfrom=subject,rtype=rtype,eidto=object)source.delete_relation(session,subject,rtype,object)rschema=self.schema.rschema(rtype)session.update_rel_cache_del(subject,rtype,object,rschema.symmetric)ifsource.should_call_hooks:self.hm.call_hooks('after_delete_relation',session,eidfrom=subject,rtype=rtype,eidto=object)# pyro handling ###########################################################@property@cacheddefpyro_appid(self):fromlogilab.commonimportpyro_extaspyroconfig=self.configappid='%s.%s'%pyro.ns_group_and_id(config['pyro-instance-id']orconfig.appid,config['pyro-ns-group'])# ensure config['pyro-instance-id'] is a full qualified pyro nameconfig['pyro-instance-id']=appidreturnappiddef_use_pyrons(self):"""return True if the pyro-ns-host is set to something else than NO_PYRONS, meaning we want to go through a pyro nameserver"""returnself.config['pyro-ns-host']!='NO_PYRONS'defpyro_register(self,host=''):"""register the repository as a pyro object"""fromlogilab.commonimportpyro_extaspyrodaemon=pyro.register_object(self,self.pyro_appid,daemonhost=self.config['pyro-host'],nshost=self.config['pyro-ns-host'],use_pyrons=self._use_pyrons())self.info('repository registered as a pyro object %s',self.pyro_appid)self.pyro_uri=pyro.get_object_uri(self.pyro_appid)self.info('pyro uri is: %s',self.pyro_uri)self.pyro_registered=True# register a looping task to regularly ensure we're still registered# into the pyro name serverifself._use_pyrons():self.looping_task(60*10,self._ensure_pyro_ns)pyro_sessions=self._pyro_sessions# install hacky function to free cnxsetdefhandleConnection(conn,tcpserver,sessions=pyro_sessions):sessions[threading.currentThread()]=Nonereturntcpserver.getAdapter().__class__.handleConnection(tcpserver.getAdapter(),conn,tcpserver)daemon.getAdapter().handleConnection=handleConnectiondefremoveConnection(conn,sessions=pyro_sessions):daemon.__class__.removeConnection(daemon,conn)session=sessions.pop(threading.currentThread(),None)ifsessionisNone:# client was not yet connected to the reporeturnifnotsession.closed:self.close(session.id)daemon.removeConnection=removeConnectionreturndaemondef_ensure_pyro_ns(self):ifnotself._use_pyrons():returnfromlogilab.commonimportpyro_extaspyropyro.ns_reregister(self.pyro_appid,nshost=self.config['pyro-ns-host'])self.info('repository re-registered as a pyro object %s',self.pyro_appid)# multi-sources planner helpers ###########################################@cacheddefrel_type_sources(self,rtype):warn('[3.18] old multi-source system will go away in the next version',DeprecationWarning)returntuple([sourceforsourceinself.sourcesifsource.support_relation(rtype)orrtypeinsource.dont_cross_relations])@cacheddefcan_cross_relation(self,rtype):warn('[3.18] old multi-source system will go away in the next version',DeprecationWarning)returntuple([sourceforsourceinself.sourcesifsource.support_relation(rtype)andrtypeinsource.cross_relations])@cacheddefis_multi_sources_relation(self,rtype):warn('[3.18] old multi-source system will go away in the next version',DeprecationWarning)returnany(sourceforsourceinself.sourcesifnotsourceisself.system_sourceandsource.support_relation(rtype))# these are overridden by set_log_methods below# only defining here to prevent pylint from complaininginfo=warning=error=critical=exception=debug=lambdamsg,*a,**kw:Nonedefpyro_unregister(config):"""unregister the repository from the pyro name server"""fromlogilab.common.pyro_extimportns_unregisterappid=config['pyro-instance-id']orconfig.appidns_unregister(appid,config['pyro-ns-group'],config['pyro-ns-host'])fromloggingimportgetLoggerfromcubicwebimportset_log_methodsset_log_methods(Repository,getLogger('cubicweb.repository'))