# copyright 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Defines the central class for the CubicWeb RQL server: the repository.The repository is an abstraction allowing execution of rql queries againstdata sources. Most of the work is actually done in helper classes. Therepository mainly:* brings these classes all together to provide a single access point to a cubicweb instance.* handles session management* provides method for pyro registration, to call if pyro is enabled"""from__future__importwith_statement__docformat__="restructuredtext en"importsysimportthreadingimportQueuefromos.pathimportjoinfromdatetimeimportdatetimefromtimeimporttime,localtime,strftimefromlogilab.common.decoratorsimportcachedfromlogilab.common.compatimportanyfromlogilab.commonimportflattenfromyamsimportBadSchemaDefinitionfromyams.schemaimportrole_namefromrqlimportRQLSyntaxErrorfromcubicwebimport(CW_SOFTWARE_ROOT,CW_MIGRATION_MAP,QueryError,UnknownEid,AuthenticationError,ExecutionError,ETypeNotSupportedBySources,MultiSourcesError,BadConnectionId,Unauthorized,ValidationError,RepositoryError,UniqueTogetherError,typed_eid,onevent)fromcubicwebimportcwvreg,schema,serverfromcubicweb.serverimportutils,hook,pool,querier,sourcesfromcubicweb.server.sessionimportSession,InternalSession,InternalManager, \security_enabledfromcubicweb.server.ssplannerimportEditedEntitydefdel_existing_rel_if_needed(session,eidfrom,rtype,eidto):"""delete existing relation when adding a new one if card is 1 or ? have to be done once the new relation has been inserted to avoid having an entity without a relation for some time this kind of behaviour has to be done in the repository so we don't have hooks order hazardness """# skip that for internal session or if integrity explicitly disabled## XXX we should imo rely on the orm to first fetch existing entity if any# then delete it.ifsession.is_internal_session \ornotsession.is_hook_category_activated('activeintegrity'):returncard=session.schema_rproperty(rtype,eidfrom,eidto,'cardinality')# one may be tented to check for neweids but this may cause more than one# relation even with '1?' cardinality if thoses relations are added in the# same transaction where the entity is being created. This never occurs from# the web interface but may occurs during test or dbapi connection (though# not expected for this). So: don't do it, we pretend to ensure repository# consistency.## notes:# * inlined relations will be implicitly deleted for the subject entity# * we don't want read permissions to be applied but we want delete# permission to be checkedifcard[0]in'1?'andnotsession.repo.schema.rschema(rtype).inlined:withsecurity_enabled(session,read=False):session.execute('DELETE X %s Y WHERE X eid %%(x)s, ''NOT Y eid %%(y)s'%rtype,{'x':eidfrom,'y':eidto})ifcard[1]in'1?':withsecurity_enabled(session,read=False):session.execute('DELETE X %s Y WHERE Y eid %%(y)s, ''NOT X eid %%(x)s'%rtype,{'x':eidfrom,'y':eidto})classRepository(object):"""a repository provides access to a set of persistent storages for entities and relations XXX protect pyro access """def__init__(self,config,vreg=None):self.config=configifvregisNone:vreg=cwvreg.CubicWebVRegistry(config)self.vreg=vregself.pyro_registered=Falseself.info('starting repository from %s',self.config.apphome)# dictionary of opened sessionsself._sessions={}# list of functions to be called at regular intervalself._looping_tasks=[]# list of running threadsself._running_threads=[]# initial schema, should be build or replaced latterself.schema=schema.CubicWebSchema(config.appid)self.vreg.schema=self.schema# until actual schema is loaded...# querier helper, need to be created after sources initializationself.querier=querier.QuerierHelper(self,self.schema)# sourcesself.sources=[]self.sources_by_uri={}# shutdown flagself.shutting_down=False# FIXME: store additional sources info in the system database ?# FIXME: sources should be ordered (add_entity priority)foruri,source_configinconfig.sources().items():ifuri=='admin':# not an actual sourcecontinuesource=self.get_source(uri,source_config)self.sources_by_uri[uri]=sourceifconfig.source_enabled(uri):self.sources.append(source)self.system_source=self.sources_by_uri['system']# ensure system source is the first oneself.sources.remove(self.system_source)self.sources.insert(0,self.system_source)# cache eid -> type / sourceself._type_source_cache={}# cache (extid, source uri) -> eidself._extid_cache={}# open some connections poolsifconfig.open_connections_pools:self.open_connections_pools()@onevent('after-registry-reload',self)deffix_user_classes(self):usercls=self.vreg['etypes'].etype_class('CWUser')forsessioninself._sessions.values():ifnotisinstance(session.user,InternalManager):session.user.__class__=userclsdefopen_connections_pools(self):config=self.configself._available_pools=Queue.Queue()self._available_pools.put_nowait(pool.ConnectionsPool(self.sources))ifconfig.quick_start:# quick start, usually only to get a minimal repository to get cubes# information (eg dump/restore/...)config._cubes=()# only load hooks and entity classes in the registryconfig.cube_appobject_path=set(('hooks','entities'))config.cubicweb_appobject_path=set(('hooks','entities'))self.set_schema(config.load_schema())config['connections-pool-size']=1# will be reinitialized later from cubes found in the databaseconfig._cubes=Noneelifconfig.creating:# repository creationconfig.bootstrap_cubes()self.set_schema(config.load_schema(),resetvreg=False)# need to load the Any and CWUser entity typesetdirectory=join(CW_SOFTWARE_ROOT,'entities')self.vreg.init_registration([etdirectory])formodnamein('__init__','authobjs','wfobjs'):self.vreg.load_file(join(etdirectory,'%s.py'%modname),'cubicweb.entities.%s'%modname)hooksdirectory=join(CW_SOFTWARE_ROOT,'hooks')self.vreg.load_file(join(hooksdirectory,'metadata.py'),'cubicweb.hooks.metadata')elifconfig.read_instance_schema:# normal start: load the instance schema from the databaseself.fill_schema()else:# test start: use the file system schema (quicker)self.warning("set fs instance'schema")config.bootstrap_cubes()self.set_schema(config.load_schema())ifnotconfig.creating:if'CWProperty'inself.schema:self.vreg.init_properties(self.properties())# call source's init method to complete their initialisation if# needed (for instance looking for persistent configuration using an# internal session, which is not possible until pools have been# initialized)forsourceinself.sources:source.init()else:# call init_creating so that for instance native source can# configurate tsearch according to postgres versionforsourceinself.sources:source.init_creating()# close initialization pool and reopen fresh ones for proper# initialization now that we know cubesself._get_pool().close(True)# list of available pools (we can't iterated on Queue instance)self.pools=[]foriinxrange(config['connections-pool-size']):self.pools.append(pool.ConnectionsPool(self.sources))self._available_pools.put_nowait(self.pools[-1])ifconfig.quick_start:config.init_cubes(self.get_cubes())self.hm=hook.HooksManager(self.vreg)# internals ###############################################################defget_source(self,uri,source_config):source_config['uri']=urireturnsources.get_source(source_config,self.schema,self)defset_schema(self,schema,resetvreg=True,rebuildinfered=True):ifrebuildinfered:schema.rebuild_infered_relations()self.info('set schema %s%#x',schema.name,id(schema))ifresetvreg:ifself.config._cubesisNone:self.config.init_cubes(self.get_cubes())# trigger full reload of all appobjectsself.vreg.set_schema(schema)else:self.vreg._set_schema(schema)self.querier.set_schema(schema)# don't use self.sources, we may want to give schema even to disabled# sourcesforsourceinself.sources_by_uri.values():source.set_schema(schema)self.schema=schemadeffill_schema(self):"""lod schema from the repository"""fromcubicweb.server.schemaserialimportdeserialize_schemaself.info('loading schema from the repository')appschema=schema.CubicWebSchema(self.config.appid)self.set_schema(self.config.load_bootstrap_schema(),resetvreg=False)self.debug('deserializing db schema into %s%#x',appschema.name,id(appschema))session=self.internal_session()try:try:deserialize_schema(appschema,session)exceptBadSchemaDefinition:raiseexceptException,ex:importtracebacktraceback.print_exc()raiseException('Is the database initialised ? (cause: %s)'%(ex.argsandex.args[0].strip()or'unknown')), \None,sys.exc_info()[-1]finally:session.close()self.set_schema(appschema)defstart_looping_tasks(self):ifnot(self.config.creatingorself.config.repairingorself.config.quick_start):# call instance level initialisation hooksself.hm.call_hooks('server_startup',repo=self)# register a task to cleanup expired sessionself.cleanup_session_time=self.config['cleanup-session-time']or60*60*24assertself.cleanup_session_time>0cleanup_session_interval=min(60*60,self.cleanup_session_time/3)self.looping_task(cleanup_session_interval,self.clean_sessions)assertisinstance(self._looping_tasks,list),'already started'fori,(interval,func,args)inenumerate(self._looping_tasks):self._looping_tasks[i]=task=utils.LoopTask(interval,func,args)self.info('starting task %s with interval %.2fs',task.name,interval)task.start()# ensure no tasks will be further addedself._looping_tasks=tuple(self._looping_tasks)deflooping_task(self,interval,func,*args):"""register a function to be called every `interval` seconds. looping tasks can only be registered during repository initialization, once done this method will fail. """try:self._looping_tasks.append((interval,func,args))exceptAttributeError:raiseRuntimeError("can't add looping task once the repository is started")defthreaded_task(self,func):"""start function in a separated thread"""t=utils.RepoThread(func,self._running_threads)t.start()#@lockeddef_get_pool(self):try:returnself._available_pools.get(True,timeout=5)exceptQueue.Empty:raiseException('no pool available after 5 secs, probably either a ''bug in code (too many uncommited/rollbacked ''connections) or too much load on the server (in ''which case you can try to set a bigger ''connections pools size)')def_free_pool(self,pool):self._available_pools.put_nowait(pool)defpinfo(self):# XXX: session.pool is accessed from a local storage, would be interesting# to see if there is a pool set in any thread specific data)return'%s: %s (%s)'%(self._available_pools.qsize(),','.join(session.user.loginforsessioninself._sessions.values()ifsession.pool),threading.currentThread())defshutdown(self):"""called on server stop event to properly close opened sessions and connections """assertnotself.shutting_down,'already shutting down'self.shutting_down=Trueself.system_source.shutdown()ifisinstance(self._looping_tasks,tuple):# if tasks have been startedforlooptaskinself._looping_tasks:self.info('canceling task %s...',looptask.name)looptask.cancel()looptask.join()self.info('task %s finished',looptask.name)forthreadinself._running_threads:self.info('waiting thread %s...',thread.getName())thread.join()self.info('thread %s finished',thread.getName())ifnot(self.config.creatingorself.config.repairingorself.config.quick_start):self.hm.call_hooks('server_shutdown',repo=self)self.close_sessions()whilenotself._available_pools.empty():pool=self._available_pools.get_nowait()try:pool.close(True)except:self.exception('error while closing %s'%pool)continueifself.pyro_registered:pyro_unregister(self.config)hits,misses=self.querier.cache_hit,self.querier.cache_misstry:self.info('rql st cache hit/miss: %s/%s (%s%% hits)',hits,misses,(hits*100)/(hits+misses))hits,misses=self.system_source.cache_hit,self.system_source.cache_missself.info('sql cache hit/miss: %s/%s (%s%% hits)',hits,misses,(hits*100)/(hits+misses))nocache=self.system_source.no_cacheself.info('sql cache usage: %s/%s (%s%%)',hits+misses,nocache,((hits+misses)*100)/(hits+misses+nocache))exceptZeroDivisionError:passdef_login_from_email(self,login):session=self.internal_session()try:rset=session.execute('Any L WHERE U login L, U primary_email M, ''M address %(login)s',{'login':login},build_descr=False)ifrset.rowcount==1:login=rset[0][0]finally:session.close()returnlogindefauthenticate_user(self,session,login,**kwargs):"""validate login / password, raise AuthenticationError on failure return associated CWUser instance on success """ifself.vreg.config['allow-email-login']and'@'inlogin:login=self._login_from_email(login)forsourceinself.sources:ifsource.support_entity('CWUser'):try:eid=source.authenticate(session,login,**kwargs)breakexceptAuthenticationError:continueelse:raiseAuthenticationError('authentication failed with all sources')cwuser=self._build_user(session,eid)ifself.config.consider_user_stateand \notcwuser.cw_adapt_to('IWorkflowable').stateincwuser.AUTHENTICABLE_STATES:raiseAuthenticationError('user is not in authenticable state')returncwuserdef_build_user(self,session,eid):"""return a CWUser entity for user with the given eid"""cls=self.vreg['etypes'].etype_class('CWUser')rql=cls.fetch_rql(session.user,['X eid %(x)s'])rset=session.execute(rql,{'x':eid})assertlen(rset)==1,rsetcwuser=rset.get_entity(0,0)# pylint: disable-msg=W0104# prefetch / cache cwuser's groups and properties. This is especially# useful for internal sessions to avoid security insertionscwuser.groupscwuser.propertiesreturncwuser# public (dbapi) interface ################################################defstats(self):# XXX restrict to managers session?"""Return a dictionary containing some statistics about the repository resources usage. This is a public method, not requiring a session id. """results={}querier=self.queriersource=self.system_sourceforsize,maxsize,hits,misses,titlein((len(querier._rql_cache),self.config['rql-cache-size'],querier.cache_hit,querier.cache_miss,'rqlt_st'),(len(source._cache),self.config['rql-cache-size'],source.cache_hit,source.cache_miss,'sql'),):results['%s_cache_size'%title]='%s / %s'%(size,maxsize)results['%s_cache_hit'%title]=hitsresults['%s_cache_miss'%title]=missesresults['%s_cache_hit_percent'%title]=(hits*100)/(hits+misses)results['sql_no_cache']=self.system_source.no_cacheresults['nb_open_sessions']=len(self._sessions)results['nb_active_threads']=threading.activeCount()results['looping_tasks']=', '.join(str(t)fortinself._looping_tasks)results['available_pools']=self._available_pools.qsize()results['threads']=', '.join(sorted(str(t)fortinthreading.enumerate()))returnresultsdefget_schema(self):"""Return the instance schema. This is a public method, not requiring a session id. """try:# necessary to support pickling used by pyroself.schema.__hashmode__='pickle'returnself.schemafinally:self.schema.__hashmode__=Nonedefget_cubes(self):"""Return the list of cubes used by this instance. This is a public method, not requiring a session id. """versions=self.get_versions(not(self.config.creatingorself.config.repairingorself.config.quick_startorself.config.mode=='test'))cubes=list(versions)cubes.remove('cubicweb')returncubesdefget_option_value(self,option,foreid=None):"""Return the value for `option` in the configuration. If `foreid` is specified, the actual repository to which this entity belongs is derefenced and the option value retrieved from it. This is a public method, not requiring a session id. """# XXX we may want to check we don't give sensible informationifforeidisNone:returnself.config[option]_,sourceuri,extid=self.type_and_source_from_eid(foreid)ifsourceuri=='system':returnself.config[option]pool=self._get_pool()try:returnpool.connection(sourceuri).get_option_value(option,extid)finally:self._free_pool(pool)@cacheddefget_versions(self,checkversions=False):"""Return the a dictionary containing cubes used by this instance as key with their version as value, including cubicweb version. This is a public method, not requiring a session id. """fromlogilab.common.changelogimportVersionvcconf={}session=self.internal_session()try:forpk,versioninsession.execute('Any K,V WHERE P is CWProperty, P value V, P pkey K, ''P pkey ~="system.version.%"',build_descr=False):cube=pk.split('.')[-1]# XXX cubicweb migrationifcubeinCW_MIGRATION_MAP:cube=CW_MIGRATION_MAP[cube]version=Version(version)vcconf[cube]=versionifcheckversions:ifcube!='cubicweb':fsversion=self.config.cube_version(cube)else:fsversion=self.config.cubicweb_version()ifversion<fsversion:msg=('instance has %s version %s but %s ''is installed. Run "cubicweb-ctl upgrade".')raiseExecutionError(msg%(cube,version,fsversion))finally:session.close()returnvcconf@cacheddefsource_defs(self):"""Return the a dictionary containing source uris as value and a dictionary describing each source as value. This is a public method, not requiring a session id. """sources=self.config.sources().copy()# remove manager informationsources.pop('admin',None)# remove sensitive informationforuri,sourcedefinsources.iteritems():sourcedef=sourcedef.copy()self.sources_by_uri[uri].remove_sensitive_information(sourcedef)sources[uri]=sourcedefreturnsourcesdefproperties(self):"""Return a result set containing system wide properties. This is a public method, not requiring a session id. """session=self.internal_session()try:# don't use session.execute, we don't want rset.req setreturnself.querier.execute(session,'Any K,V WHERE P is CWProperty,''P pkey K, P value V, NOT P for_user U',build_descr=False)finally:session.close()# XXX protect this method: anonymous should be allowed and registration# pluggeddefregister_user(self,login,password,email=None,**kwargs):"""check a user with the given login exists, if not create it with the given password. This method is designed to be used for anonymous registration on public web site. """session=self.internal_session()# for consistency, keep same error as unique check hook (although not required)errmsg=session._('the value "%s" is already used, use another one')try:if(session.execute('CWUser X WHERE X login %(login)s',{'login':login},build_descr=False)orsession.execute('CWUser X WHERE X use_email C, C address %(login)s',{'login':login},build_descr=False)):qname=role_name('login','subject')raiseValidationError(None,{qname:errmsg%login})# we have to create the useruser=self.vreg['etypes'].etype_class('CWUser')(session)ifisinstance(password,unicode):# password should *always* be utf8 encodedpassword=password.encode('UTF8')kwargs['login']=loginkwargs['upassword']=passwordself.glob_add_entity(session,EditedEntity(user,**kwargs))session.execute('SET X in_group G WHERE X eid %(x)s, G name "users"',{'x':user.eid})ifemailor'@'inlogin:d={'login':login,'email':emailorlogin}ifsession.execute('EmailAddress X WHERE X address %(email)s',d,build_descr=False):qname=role_name('address','subject')raiseValidationError(None,{qname:errmsg%d['email']})session.execute('INSERT EmailAddress X: X address %(email)s, ''U primary_email X, U use_email X ''WHERE U login %(login)s',d,build_descr=False)session.commit()finally:session.close()returnTruedefconnect(self,login,**kwargs):"""open a connection for a given user base_url may be needed to send mails cnxtype indicate if this is a pyro connection or a in-memory connection raise `AuthenticationError` if the authentication failed raise `ConnectionError` if we can't open a connection """# use an internal connectionsession=self.internal_session()# try to get a user objectcnxprops=kwargs.pop('cnxprops',None)try:user=self.authenticate_user(session,login,**kwargs)finally:session.close()session=Session(user,self,cnxprops)user._cw=user.cw_rset.req=sessionuser.cw_clear_relation_cache()self._sessions[session.id]=sessionself.info('opened session %s for user %s',session.id,login)self.hm.call_hooks('session_open',session)# commit session at this point in case write operation has been done# during `session_open` hookssession.commit()returnsession.iddefexecute(self,sessionid,rqlstring,args=None,build_descr=True,txid=None):"""execute a RQL query * rqlstring should be an unicode string or a plain ascii string * args the optional parameters used in the query * build_descr is a flag indicating if the description should be built on select queries """session=self._get_session(sessionid,setpool=True,txid=txid)try:try:rset=self.querier.execute(session,rqlstring,args,build_descr)# NOTE: the web front will (re)build it when needed# e.g in facets# Zeroed to avoid useless overhead with pyrorset._rqlst=Nonereturnrsetexcept(Unauthorized,RQLSyntaxError):raiseexceptValidationError,ex:# need ValidationError normalization here so error may pass# through pyroifhasattr(ex.entity,'eid'):ex.entity=ex.entity.eid# error raised by yamsargs=list(ex.args)args[0]=ex.entityex.args=tuple(args)raiseexcept:# FIXME: check error to catch internal errorsself.exception('unexpected error while executing %s with %s',rqlstring,args)raisefinally:session.reset_pool()defdescribe(self,sessionid,eid,txid=None):"""return a tuple (type, source, extid) for the entity with id <eid>"""session=self._get_session(sessionid,setpool=True,txid=txid)try:returnself.type_and_source_from_eid(eid,session)finally:session.reset_pool()defcheck_session(self,sessionid):"""raise `BadConnectionId` if the connection is no more valid, else return its latest activity timestamp. """returnself._get_session(sessionid,setpool=False).timestampdefget_shared_data(self,sessionid,key,default=None,pop=False,txdata=False):"""return value associated to key in the session's data dictionary or session's transaction's data if `txdata` is true. If pop is True, value will be removed from the dictionnary. If key isn't defined in the dictionnary, value specified by the `default` argument will be returned. """session=self._get_session(sessionid,setpool=False)returnsession.get_shared_data(key,default,pop,txdata)defset_shared_data(self,sessionid,key,value,txdata=False):"""set value associated to `key` in shared data if `txdata` is true, the value will be added to the repository session's transaction's data which are cleared on commit/rollback of the current transaction. """session=self._get_session(sessionid,setpool=False)session.set_shared_data(key,value,txdata)defcommit(self,sessionid,txid=None):"""commit transaction for the session with the given id"""self.debug('begin commit for session %s',sessionid)try:session=self._get_session(sessionid)session.set_tx_data(txid)returnsession.commit()except(ValidationError,Unauthorized):raiseexcept:self.exception('unexpected error')raisedefrollback(self,sessionid,txid=None):"""commit transaction for the session with the given id"""self.debug('begin rollback for session %s',sessionid)try:session=self._get_session(sessionid)session.set_tx_data(txid)session.rollback()except:self.exception('unexpected error')raisedefclose(self,sessionid,txid=None,checkshuttingdown=True):"""close the session with the given id"""session=self._get_session(sessionid,setpool=True,txid=txid,checkshuttingdown=checkshuttingdown)# operation uncommited before close are rollbacked before hook is calledsession.rollback(reset_pool=False)self.hm.call_hooks('session_close',session)# commit session at this point in case write operation has been done# during `session_close` hookssession.commit()session.close()delself._sessions[sessionid]self.info('closed session %s for user %s',sessionid,session.user.login)defuser_info(self,sessionid,props=None):"""this method should be used by client to: * check session id validity * update user information on each user's request (i.e. groups and custom properties) """session=self._get_session(sessionid,setpool=False)ifpropsisnotNone:self.set_session_props(sessionid,props)user=session.userreturnuser.eid,user.login,user.groups,user.propertiesdefset_session_props(self,sessionid,props):"""this method should be used by client to: * check session id validity * update user information on each user's request (i.e. groups and custom properties) """session=self._get_session(sessionid,setpool=False)forprop,valueinprops.items():session.change_property(prop,value)defundoable_transactions(self,sessionid,ueid=None,txid=None,**actionfilters):"""See :class:`cubicweb.dbapi.Connection.undoable_transactions`"""session=self._get_session(sessionid,setpool=True,txid=txid)try:returnself.system_source.undoable_transactions(session,ueid,**actionfilters)finally:session.reset_pool()deftransaction_info(self,sessionid,txuuid,txid=None):"""See :class:`cubicweb.dbapi.Connection.transaction_info`"""session=self._get_session(sessionid,setpool=True,txid=txid)try:returnself.system_source.tx_info(session,txuuid)finally:session.reset_pool()deftransaction_actions(self,sessionid,txuuid,public=True,txid=None):"""See :class:`cubicweb.dbapi.Connection.transaction_actions`"""session=self._get_session(sessionid,setpool=True,txid=txid)try:returnself.system_source.tx_actions(session,txuuid,public)finally:session.reset_pool()defundo_transaction(self,sessionid,txuuid,txid=None):"""See :class:`cubicweb.dbapi.Connection.undo_transaction`"""session=self._get_session(sessionid,setpool=True,txid=txid)try:returnself.system_source.undo_transaction(session,txuuid)finally:session.reset_pool()# public (inter-repository) interface #####################################defentities_modified_since(self,etypes,mtime):"""function designed to be called from an external repository which is using this one as a rql source for synchronization, and return a 3-uple containing : * the local date * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """session=self.internal_session()updatetime=datetime.now()try:modentities,delentities=self.system_source.modified_entities(session,etypes,mtime)returnupdatetime,modentities,delentitiesfinally:session.close()# session handling ########################################################defclose_sessions(self):"""close every opened sessions"""forsessionidinself._sessions.keys():try:self.close(sessionid,checkshuttingdown=False)except:self.exception('error while closing session %s'%sessionid)defclean_sessions(self):"""close sessions not used since an amount of time specified in the configuration """mintime=time()-self.cleanup_session_timeself.debug('cleaning session unused since %s',strftime('%T',localtime(mintime)))nbclosed=0forsessioninself._sessions.values():ifsession.timestamp<mintime:self.close(session.id)nbclosed+=1returnnbcloseddefinternal_session(self,cnxprops=None):"""return a dbapi like connection/cursor using internal user which have every rights on the repository. You'll *have to* commit/rollback or close (rollback implicitly) the session once the job's done, else you'll leak connections pool up to the time where no more pool is available, causing irremediable freeze... """session=InternalSession(self,cnxprops)session.set_pool()returnsessiondef_get_session(self,sessionid,setpool=False,txid=None,checkshuttingdown=True):"""return the user associated to the given session identifier"""ifcheckshuttingdownandself.shutting_down:raiseException('Repository is shutting down')try:session=self._sessions[sessionid]exceptKeyError:raiseBadConnectionId('No such session %s'%sessionid)ifsetpool:session.set_tx_data(txid)# must be done before set_poolsession.set_pool()returnsession# data sources handling #################################################### * correspondance between eid and (type, source)# * correspondance between eid and local id (i.e. specific to a given source)deftype_and_source_from_eid(self,eid,session=None):"""return a tuple (type, source, extid) for the entity with id <eid>"""try:eid=typed_eid(eid)exceptValueError:raiseUnknownEid(eid)try:returnself._type_source_cache[eid]exceptKeyError:ifsessionisNone:session=self.internal_session()reset_pool=Trueelse:reset_pool=Falsetry:etype,uri,extid=self.system_source.eid_type_source(session,eid)finally:ifreset_pool:session.reset_pool()self._type_source_cache[eid]=(etype,uri,extid)ifuri!='system':self._extid_cache[(extid,uri)]=eidreturnetype,uri,extiddefclear_caches(self,eids):etcache=self._type_source_cacheextidcache=self._extid_cacherqlcache=self.querier._rql_cacheforeidineids:try:etype,uri,extid=etcache.pop(typed_eid(eid))# may be a string in some casesrqlcache.pop('%s X WHERE X eid %s'%(etype,eid),None)extidcache.pop((extid,uri),None)exceptKeyError:etype=Nonerqlcache.pop('Any X WHERE X eid %s'%eid,None)forsourceinself.sources:source.clear_eid_cache(eid,etype)deftype_from_eid(self,eid,session=None):"""return the type of the entity with id <eid>"""returnself.type_and_source_from_eid(eid,session)[0]defsource_from_eid(self,eid,session=None):"""return the source for the given entity's eid"""returnself.sources_by_uri[self.type_and_source_from_eid(eid,session)[1]]defquerier_cache_key(self,session,rql,args,eidkeys):cachekey=[rql]forkeyinsorted(eidkeys):try:etype=self.type_from_eid(args[key],session)exceptKeyError:raiseQueryError('bad cache key %s (no value)'%key)exceptTypeError:raiseQueryError('bad cache key %s (value: %r)'%(key,args[key]))cachekey.append(etype)# ensure eid is correctly typed in argsargs[key]=typed_eid(args[key])returntuple(cachekey)defeid2extid(self,source,eid,session=None):"""get local id from an eid"""etype,uri,extid=self.type_and_source_from_eid(eid,session)ifsource.uri!=uri:# eid not from the given sourceraiseUnknownEid(eid)returnextiddefextid2eid(self,source,extid,etype,session=None,insert=True,recreate=False):"""get eid from a local id. An eid is attributed if no record is found"""cachekey=(extid,source.uri)try:returnself._extid_cache[cachekey]exceptKeyError:passreset_pool=FalseifsessionisNone:session=self.internal_session()reset_pool=Trueeid=self.system_source.extid2eid(session,source,extid)ifeidisnotNone:self._extid_cache[cachekey]=eidself._type_source_cache[eid]=(etype,source.uri,extid)# XXX used with extlite (eg vcsfile), probably not needed anymoreifrecreate:entity=source.before_entity_insertion(session,extid,etype,eid)entity._cw_recreating=Trueifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',session,entity=entity)# XXX add fti op ?source.after_entity_insertion(session,extid,entity)ifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',session,entity=entity)ifreset_pool:session.reset_pool()returneidifnotinsert:return# no link between extid and eid, create one using an internal session# since the current session user may not have required permissions to# do necessary stuff and we don't want to commit user session.## Moreover, even if session is already an internal session but is# processing a commit, we have to use another oneifnotsession.is_internal_session:session=self.internal_session()reset_pool=Truetry:eid=self.system_source.create_eid(session)self._extid_cache[cachekey]=eidself._type_source_cache[eid]=(etype,source.uri,extid)entity=source.before_entity_insertion(session,extid,etype,eid)ifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',session,entity=entity)# XXX call add_info with complete=False ?self.add_info(session,entity,source,extid)source.after_entity_insertion(session,extid,entity)ifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',session,entity=entity)else:# minimal meta-datasession.execute('SET X is E WHERE X eid %(x)s, E name %(name)s',{'x':entity.eid,'name':entity.__regid__})session.commit(reset_pool)returneidexcept:session.rollback(reset_pool)raisedefadd_info(self,session,entity,source,extid=None,complete=True):"""add type and source info for an eid into the system table, and index the entity with the full text index """# begin by inserting eid/type/source/extid into the entities tablehook.set_operation(session,'neweids',entity.eid,hook.CleanupNewEidsCacheOp)self.system_source.add_info(session,entity,source,extid,complete)defdelete_info(self,session,entity,sourceuri,extid):"""called by external source when some entity known by the system source has been deleted in the external source """# mark eid as being deleted in session info and setup cache update# operationhook.set_operation(session,'pendingeids',entity.eid,hook.CleanupDeletedEidsCacheOp)self._delete_info(session,entity,sourceuri,extid)def_delete_info(self,session,entity,sourceuri,extid):# attributes=None, relations=None):"""delete system information on deletion of an entity: * delete all remaining relations from/to this entity * call delete info on the system source which will transfer record from the entities table to the deleted_entities table """pendingrtypes=session.transaction_data.get('pendingrtypes',())# delete remaining relations: if user can delete the entity, he can# delete all its relations without security checkingwithsecurity_enabled(session,read=False,write=False):eid=entity.eidforrschema,_,roleinentity.e_schema.relation_definitions():rtype=rschema.typeifrtypeinschema.VIRTUAL_RTYPESorrtypeinpendingrtypes:continueifrole=='subject':# don't skip inlined relation so they are regularly# deleted and so hooks are correctly calledrql='DELETE X %s Y WHERE X eid %%(x)s'%rtypeelse:rql='DELETE Y %s X WHERE X eid %%(x)s'%rtypesession.execute(rql,{'x':eid},build_descr=False)self.system_source.delete_info(session,entity,sourceuri,extid)deflocate_relation_source(self,session,subject,rtype,object):subjsource=self.source_from_eid(subject,session)objsource=self.source_from_eid(object,session)ifnotsubjsourceisobjsource:source=self.system_sourceifnot(subjsource.may_cross_relation(rtype)andobjsource.may_cross_relation(rtype)):raiseMultiSourcesError("relation %s can't be crossed among sources"%rtype)elifnotsubjsource.support_relation(rtype):source=self.system_sourceelse:source=subjsourceifnotsource.support_relation(rtype,True):raiseMultiSourcesError("source %s doesn't support write of %s relation"%(source.uri,rtype))returnsourcedeflocate_etype_source(self,etype):forsourceinself.sources:ifsource.support_entity(etype,1):returnsourceelse:raiseETypeNotSupportedBySources(etype)definit_entity_caches(self,session,entity,source):"""add entity to session entities cache and repo's extid cache. Return entity's ext id if the source isn't the system source. """session.set_entity_cache(entity)suri=source.uriifsuri=='system':extid=Noneelse:extid=source.get_extid(entity)self._extid_cache[(str(extid),suri)]=entity.eidself._type_source_cache[entity.eid]=(entity.__regid__,suri,extid)returnextiddefglob_add_entity(self,session,edited):"""add an entity to the repository the entity eid should originaly be None and a unique eid is assigned to the entity instance """entity=edited.entityentity._cw_is_saved=False# entity has an eid but is not yet saved# init edited_attributes before calling before_add_entity hooksentity.cw_edited=editedeschema=entity.e_schemasource=self.locate_etype_source(entity.__regid__)# allocate an eid to the entity before calling hooksentity.eid=self.system_source.create_eid(session)# set caches asapextid=self.init_entity_caches(session,entity,source)ifserver.DEBUG&server.DBG_REPO:print'ADD entity',self,entity.__regid__,entity.eid,entity.cw_attr_cacherelations=[]ifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',session,entity=entity)forattrinedited.iterkeys():rschema=eschema.subjrels[attr]ifnotrschema.final:# inlined relationrelations.append((attr,edited[attr]))edited.set_defaults()ifsession.is_hook_category_activated('integrity'):edited.check(creation=True)try:source.add_entity(session,entity)exceptUniqueTogetherError,exc:etype,rtypes=exc.argsproblems={}forcolinrtypes:problems[col]=session._('violates unique_together constraints (%s)')%(','.join(rtypes))raiseValidationError(entity.eid,problems)self.add_info(session,entity,source,extid,complete=False)edited.saved=True# prefill entity relation cachesforrschemaineschema.subject_relations():rtype=str(rschema)ifrtypeinschema.VIRTUAL_RTYPES:continueifrschema.final:entity.cw_attr_cache.setdefault(rtype,None)else:entity.cw_set_relation_cache(rtype,'subject',session.empty_rset())forrschemaineschema.object_relations():rtype=str(rschema)ifrtypeinschema.VIRTUAL_RTYPES:continueentity.cw_set_relation_cache(rtype,'object',session.empty_rset())# set inlined relation cache before call to after_add_entityforattr,valueinrelations:session.update_rel_cache_add(entity.eid,attr,value)del_existing_rel_if_needed(session,entity.eid,attr,value)# trigger after_add_entity after after_add_relationifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',session,entity=entity)# call hooks for inlined relationsforattr,valueinrelations:self.hm.call_hooks('before_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)self.hm.call_hooks('after_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)returnentity.eiddefglob_update_entity(self,session,edited):"""replace an entity in the repository the type and the eid of an entity must not be changed """entity=edited.entityifserver.DEBUG&server.DBG_REPO:print'UPDATE entity',entity.__regid__,entity.eid, \entity.cw_attr_cache,editedhm=self.hmeschema=entity.e_schemasession.set_entity_cache(entity)orig_edited=getattr(entity,'cw_edited',None)entity.cw_edited=editedtry:only_inline_rels,need_fti_update=True,Falserelations=[]source=self.source_from_eid(entity.eid,session)forattrinlist(edited):ifattr=='eid':continuerschema=eschema.subjrels[attr]ifrschema.final:ifgetattr(eschema.rdef(attr),'fulltextindexed',False):need_fti_update=Trueonly_inline_rels=Falseelse:# inlined relationprevious_value=entity.related(attr)orNoneifprevious_valueisnotNone:previous_value=previous_value[0][0]# got a result setifprevious_value==entity.cw_attr_cache[attr]:previous_value=Noneelifsource.should_call_hooks:hm.call_hooks('before_delete_relation',session,eidfrom=entity.eid,rtype=attr,eidto=previous_value)relations.append((attr,edited[attr],previous_value))ifsource.should_call_hooks:# call hooks for inlined relationsforattr,value,_tinrelations:hm.call_hooks('before_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)ifnotonly_inline_rels:hm.call_hooks('before_update_entity',session,entity=entity)ifsession.is_hook_category_activated('integrity'):edited.check()try:source.update_entity(session,entity)edited.saved=TrueexceptUniqueTogetherError,exc:etype,rtypes=exc.argsproblems={}forcolinrtypes:problems[col]=session._('violates unique_together constraints (%s)')%(','.join(rtypes))raiseValidationError(entity.eid,problems)self.system_source.update_info(session,entity,need_fti_update)ifsource.should_call_hooks:ifnotonly_inline_rels:hm.call_hooks('after_update_entity',session,entity=entity)forattr,value,prevvalueinrelations:# if the relation is already cached, update existant cacherelcache=entity.cw_relation_cached(attr,'subject')ifprevvalueisnotNone:hm.call_hooks('after_delete_relation',session,eidfrom=entity.eid,rtype=attr,eidto=prevvalue)ifrelcacheisnotNone:session.update_rel_cache_del(entity.eid,attr,prevvalue)del_existing_rel_if_needed(session,entity.eid,attr,value)ifrelcacheisnotNone:session.update_rel_cache_add(entity.eid,attr,value)else:entity.cw_set_relation_cache(attr,'subject',session.eid_rset(value))hm.call_hooks('after_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)finally:iforig_editedisnotNone:entity.cw_edited=orig_editeddefglob_delete_entity(self,session,eid):"""delete an entity and all related entities from the repository"""entity=session.entity_from_eid(eid)etype,sourceuri,extid=self.type_and_source_from_eid(eid,session)ifserver.DEBUG&server.DBG_REPO:print'DELETE entity',etype,eidsource=self.sources_by_uri[sourceuri]ifsource.should_call_hooks:self.hm.call_hooks('before_delete_entity',session,entity=entity)self._delete_info(session,entity,sourceuri,extid)source.delete_entity(session,entity)ifsource.should_call_hooks:self.hm.call_hooks('after_delete_entity',session,entity=entity)# don't clear cache here this is done in a hook on commitdefglob_add_relation(self,session,subject,rtype,object):"""add a relation to the repository"""ifserver.DEBUG&server.DBG_REPO:print'ADD relation',subject,rtype,objectsource=self.locate_relation_source(session,subject,rtype,object)ifsource.should_call_hooks:del_existing_rel_if_needed(session,subject,rtype,object)self.hm.call_hooks('before_add_relation',session,eidfrom=subject,rtype=rtype,eidto=object)source.add_relation(session,subject,rtype,object)rschema=self.schema.rschema(rtype)session.update_rel_cache_add(subject,rtype,object,rschema.symmetric)ifsource.should_call_hooks:self.hm.call_hooks('after_add_relation',session,eidfrom=subject,rtype=rtype,eidto=object)defglob_delete_relation(self,session,subject,rtype,object):"""delete a relation from the repository"""ifserver.DEBUG&server.DBG_REPO:print'DELETE relation',subject,rtype,objectsource=self.locate_relation_source(session,subject,rtype,object)ifsource.should_call_hooks:self.hm.call_hooks('before_delete_relation',session,eidfrom=subject,rtype=rtype,eidto=object)source.delete_relation(session,subject,rtype,object)rschema=self.schema.rschema(rtype)session.update_rel_cache_del(subject,rtype,object,rschema.symmetric)ifrschema.symmetric:# on symmetric relation, we can't now in which sense it's# stored so try to delete bothsource.delete_relation(session,object,rtype,subject)ifsource.should_call_hooks:self.hm.call_hooks('after_delete_relation',session,eidfrom=subject,rtype=rtype,eidto=object)# pyro handling ###########################################################defpyro_register(self,host=''):"""register the repository as a pyro object"""fromlogilab.commonimportpyro_extaspyroconfig=self.configappid='%s.%s'%pyro.ns_group_and_id(config['pyro-instance-id']orconfig.appid,config['pyro-ns-group'])# ensure config['pyro-instance-id'] is a full qualified pyro nameconfig['pyro-instance-id']=appiddaemon=pyro.register_object(self,appid,daemonhost=config['pyro-host'],nshost=config['pyro-ns-host'])self.info('repository registered as a pyro object %s',appid)self.pyro_registered=Truereturndaemon# multi-sources planner helpers ###########################################@cacheddefrel_type_sources(self,rtype):returntuple([sourceforsourceinself.sourcesifsource.support_relation(rtype)orrtypeinsource.dont_cross_relations])@cacheddefcan_cross_relation(self,rtype):returntuple([sourceforsourceinself.sourcesifsource.support_relation(rtype)andrtypeinsource.cross_relations])@cacheddefis_multi_sources_relation(self,rtype):returnany(sourceforsourceinself.sourcesifnotsourceisself.system_sourceandsource.support_relation(rtype))defpyro_unregister(config):"""unregister the repository from the pyro name server"""fromlogilab.common.pyro_extimportns_unregisterappid=config['pyro-instance-id']orconfig.appidns_unregister(appid,config['pyro-ns-group'],config['pyro-ns-host'])fromloggingimportgetLoggerfromcubicwebimportset_log_methodsset_log_methods(Repository,getLogger('cubicweb.repository'))