B [schema update] may remove has_text unexpectedly, give another argument to fix it
"""Defines the central class for the CubicWeb RQL server: the repository.The repository is an abstraction allowing execution of rql queries againstdata sources. Most of the work is actually done in helper classes. Therepository mainly:* brings these classes all together to provide a single access point to a cubicweb instance.* handles session management* provides method for pyro registration, to call if pyro is enabled:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"importsysimportQueuefromos.pathimportjoin,existsfromdatetimeimportdatetimefromtimeimporttime,localtime,strftimefromlogilab.common.decoratorsimportcachedfromyamsimportBadSchemaDefinitionfromrqlimportRQLSyntaxErrorfromcubicwebimport(CW_SOFTWARE_ROOT,UnknownEid,AuthenticationError,ETypeNotSupportedBySources,RTypeNotSupportedBySources,BadConnectionId,Unauthorized,ValidationError,ExecutionError,typed_eid,CW_MIGRATION_MAP)fromcubicweb.cwvregimportCubicWebRegistryfromcubicweb.schemaimportVIRTUAL_RTYPES,CubicWebSchemafromcubicwebimportserverfromcubicweb.server.utilsimportRepoThread,LoopTaskfromcubicweb.server.poolimportConnectionsPool,LateOperation,SingleLastOperationfromcubicweb.server.sessionimportSession,InternalSessionfromcubicweb.server.querierimportQuerierHelperfromcubicweb.server.sourcesimportget_sourcefromcubicweb.server.hooksmanagerimportHooksManagerfromcubicweb.server.hookhelperimportrpropertyclassCleanupEidTypeCacheOp(SingleLastOperation):"""on rollback of a insert query or commit of delete query, we have to clear repository's cache from no more valid entries NOTE: querier's rqlst/solutions cache may have been polluted too with queries such as Any X WHERE X eid 32 if 32 has been rollbacked however generated queries are unpredictable and analysing all the cache probably too expensive. Notice that there is no pb when using args to specify eids instead of giving them into the rql string. """defcommit_event(self):"""the observed connections pool has been rollbacked, remove inserted eid from repository type/source cache """try:self.repo.clear_caches(self.session.transaction_data['pendingeids'])exceptKeyError:passdefrollback_event(self):"""the observed connections pool has been rollbacked, remove inserted eid from repository type/source cache """try:self.repo.clear_caches(self.session.transaction_data['neweids'])exceptKeyError:passclassFTIndexEntityOp(LateOperation):"""operation to delay entity full text indexation to commit since fti indexing may trigger discovery of other entities, it should be triggered on precommit, not commit, and this should be done after other precommit operation which may add relations to the entity """defprecommit_event(self):session=self.sessionentity=self.entityifentity.eidinsession.transaction_data.get('pendingeids',()):return# entity added and deleted in the same transactionsession.repo.system_source.fti_unindex_entity(session,entity.eid)forcontainerinentity.fti_containers():session.repo.index_entity(session,container)defcommit_event(self):passdefdel_existing_rel_if_needed(session,eidfrom,rtype,eidto):"""delete existing relation when adding a new one if card is 1 or ? have to be done once the new relation has been inserted to avoid having an entity without a relation for some time this kind of behaviour has to be done in the repository so we don't have hooks order hazardness """# skip delete queries (only?) if session is an internal session. This is# hooks responsability to ensure they do not violate relation's cardinalityifsession.is_super_session:returncard=rproperty(session,rtype,eidfrom,eidto,'cardinality')# one may be tented to check for neweids but this may cause more than one# relation even with '1?' cardinality if thoses relations are added in the# same transaction where the entity is being created. This never occurs from# the web interface but may occurs during test or dbapi connection (though# not expected for this). So: don't do it, we pretend to ensure repository# consistency.ifcard[0]in'1?':rschema=session.repo.schema.rschema(rtype)ifnotrschema.inlined:session.unsafe_execute('DELETE X %s Y WHERE X eid %%(x)s, NOT Y eid %%(y)s'%rtype,{'x':eidfrom,'y':eidto},'x')ifcard[1]in'1?':session.unsafe_execute('DELETE X %s Y WHERE NOT X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':eidfrom,'y':eidto},'y')classRepository(object):"""a repository provides access to a set of persistent storages for entities and relations XXX protect pyro access """def__init__(self,config,vreg=None,debug=False):self.config=configifvregisNone:vreg=CubicWebRegistry(config,debug)self.vreg=vregself.pyro_registered=Falseself.info('starting repository from %s',self.config.apphome)# dictionary of opened sessionsself._sessions={}# list of functions to be called at regular intervalself._looping_tasks=[]# list of running threadsself._running_threads=[]# initial schema, should be build or replaced latterself.schema=CubicWebSchema(config.appid)# querier helper, need to be created after sources initializationself.querier=QuerierHelper(self,self.schema)# should we reindex in changes?self.do_fti=notconfig['delay-full-text-indexation']# sourcesself.sources=[]self.sources_by_uri={}# FIXME: store additional sources info in the system database ?# FIXME: sources should be ordered (add_entity priority)foruri,source_configinconfig.sources().items():ifuri=='admin':# not an actual sourcecontinuesource=self.get_source(uri,source_config)self.sources_by_uri[uri]=sourceself.sources.append(source)self.system_source=self.sources_by_uri['system']# ensure system source is the first oneself.sources.remove(self.system_source)self.sources.insert(0,self.system_source)# cache eid -> type / sourceself._type_source_cache={}# cache (extid, source uri) -> eidself._extid_cache={}# create the hooks managerself.hm=HooksManager(self.schema)# open some connections poolsself._available_pools=Queue.Queue()self._available_pools.put_nowait(ConnectionsPool(self.sources))ifconfig.read_instance_schema:# normal start: load the instance schema from the databaseself.fill_schema()elifconfig.bootstrap_schema:# usually during repository creationself.warning("set fs instance'schema as bootstrap schema")config.bootstrap_cubes()self.set_bootstrap_schema(self.config.load_schema())# need to load the Any and CWUser entity typesself.vreg.schema=self.schemaetdirectory=join(CW_SOFTWARE_ROOT,'entities')self.vreg.init_registration([etdirectory])self.vreg.load_file(join(etdirectory,'__init__.py'),'cubicweb.entities.__init__')self.vreg.load_file(join(etdirectory,'authobjs.py'),'cubicweb.entities.authobjs')else:# test start: use the file system schema (quicker)self.warning("set fs instance'schema")config.bootstrap_cubes()self.set_schema(self.config.load_schema())ifnotconfig.creating:if'CWProperty'inself.schema:self.vreg.init_properties(self.properties())# call source's init method to complete their initialisation if# needed (for instance looking for persistent configuration using an# internal session, which is not possible until pools have been# initialized)forsourceinself.sources:source.init()else:# call init_creating so for instance native source can configurate# tsearch according to postgres versionforsourceinself.sources:source.init_creating()# close initialization pool and reopen fresh ones for proper# initialization now that we know cubesself._get_pool().close(True)# list of available pools (we can't iterated on Queue instance)self.pools=[]foriinxrange(config['connections-pool-size']):self.pools.append(ConnectionsPool(self.sources))self._available_pools.put_nowait(self.pools[-1])self._shutting_down=Falseifnot(config.creatingorconfig.repairing):# call instance level initialisation hooksself.hm.call_hooks('server_startup',repo=self)# register a task to cleanup expired sessionself.looping_task(self.config['session-time']/3.,self.clean_sessions)# internals ###############################################################defget_source(self,uri,source_config):source_config['uri']=urireturnget_source(source_config,self.schema,self)defset_schema(self,schema,resetvreg=True):schema.rebuild_infered_relations()self.info('set schema %s%#x',schema.name,id(schema))self.debug(', '.join(sorted(str(e)foreinschema.entities())))self.querier.set_schema(schema)forsourceinself.sources:source.set_schema(schema)self.schema=schemaifresetvreg:# full reload of all appobjectsself.vreg.reset()self.vreg.set_schema(schema)self.hm.set_schema(schema)self.hm.register_system_hooks(self.config)# instance specific hooksifself.config.instance_hooks:self.info('loading instance hooks')self.hm.register_hooks(self.config.load_hooks(self.vreg))deffill_schema(self):"""lod schema from the repository"""fromcubicweb.server.schemaserialimportdeserialize_schemaself.info('loading schema from the repository')appschema=CubicWebSchema(self.config.appid)self.set_bootstrap_schema(self.config.load_bootstrap_schema())self.debug('deserializing db schema into %s%#x',appschema.name,id(appschema))session=self.internal_session()try:try:deserialize_schema(appschema,session)exceptBadSchemaDefinition:raiseexceptException,ex:importtracebacktraceback.print_exc()raiseException('Is the database initialised ? (cause: %s)'%(ex.argsandex.args[0].strip()or'unknown')), \None,sys.exc_info()[-1]self.info('set the actual schema')# XXX have to do this since CWProperty isn't in the bootstrap schema# it'll be redone in set_schemaself.set_bootstrap_schema(appschema)# 2.49 migrationifexists(join(self.config.apphome,'vc.conf')):session.set_pool()ifnot'template'infile(join(self.config.apphome,'vc.conf')).read():# remaning from cubicweb < 2.38...session.execute('DELETE CWProperty X WHERE X pkey "system.version.template"')session.commit()finally:session.close()self.config.init_cubes(self.get_cubes())self.set_schema(appschema)defset_bootstrap_schema(self,schema):"""disable hooks when setting a bootstrap schema, but restore the configuration for the next time """config=self.config# XXX refactorconfig.core_hooks=Falseconfig.usergroup_hooks=Falseconfig.schema_hooks=Falseconfig.notification_hooks=Falseconfig.instance_hooks=Falseself.set_schema(schema,resetvreg=False)config.core_hooks=Trueconfig.usergroup_hooks=Trueconfig.schema_hooks=Trueconfig.notification_hooks=Trueconfig.instance_hooks=Truedefstart_looping_tasks(self):assertisinstance(self._looping_tasks,list),'already started'fori,(interval,func)inenumerate(self._looping_tasks):self._looping_tasks[i]=task=LoopTask(interval,func)self.info('starting task %s with interval %.2fs',task.name,interval)task.start()# ensure no tasks will be further addedself._looping_tasks=tuple(self._looping_tasks)deflooping_task(self,interval,func):"""register a function to be called every `interval` seconds. looping tasks can only be registered during repository initialization, once done this method will fail. """try:self._looping_tasks.append((interval,func))exceptAttributeError:raiseRuntimeError("can't add looping task once the repository is started")defthreaded_task(self,func):"""start function in a separated thread"""t=RepoThread(func,self._running_threads)t.start()#@lockeddef_get_pool(self):try:returnself._available_pools.get(True,timeout=5)exceptQueue.Empty:raiseException('no pool available after 5 secs, probably either a ''bug in code (to many uncommited/rollbacked ''connections) or to much load on the server (in ''which case you can try to set a bigger ''connections pools size)')def_free_pool(self,pool):self._available_pools.put_nowait(pool)defpinfo(self):# XXX: session.pool is accessed from a local storage, would be interesting# to see if there is a pool set in any thread specific data)importthreadingreturn'%s: %s (%s)'%(self._available_pools.qsize(),','.join(session.user.loginforsessioninself._sessions.values()ifsession.pool),threading.currentThread())defshutdown(self):"""called on server stop event to properly close opened sessions and connections """self._shutting_down=Trueifisinstance(self._looping_tasks,tuple):# if tasks have been startedforlooptaskinself._looping_tasks:self.info('canceling task %s...',looptask.name)looptask.cancel()looptask.join()self.info('task %s finished',looptask.name)forthreadinself._running_threads:self.info('waiting thread %s...',thread.name)thread.join()self.info('thread %s finished',thread.name)self.hm.call_hooks('server_shutdown',repo=self)self.close_sessions()whilenotself._available_pools.empty():pool=self._available_pools.get_nowait()try:pool.close(True)except:self.exception('error while closing %s'%pool)continueifself.pyro_registered:pyro_unregister(self.config)hits,misses=self.querier.cache_hit,self.querier.cache_misstry:self.info('rqlt st cache hit/miss: %s/%s (%s%% hits)',hits,misses,(hits*100)/(hits+misses))hits,misses=self.system_source.cache_hit,self.system_source.cache_missself.info('sql cache hit/miss: %s/%s (%s%% hits)',hits,misses,(hits*100)/(hits+misses))nocache=self.system_source.no_cacheself.info('sql cache usage: %s/%s (%s%%)',hits+misses,nocache,((hits+misses)*100)/(hits+misses+nocache))exceptZeroDivisionError:passdef_login_from_email(self,login):session=self.internal_session()try:rset=session.execute('Any L WHERE U login L, U primary_email M, ''M address %(login)s',{'login':login})ifrset.rowcount==1:login=rset[0][0]finally:session.close()returnlogindefauthenticate_user(self,session,login,password):"""validate login / password, raise AuthenticationError on failure return associated CWUser instance on success """ifself.vreg.config['allow-email-login']and'@'inlogin:login=self._login_from_email(login)forsourceinself.sources:ifsource.support_entity('CWUser'):try:eid=source.authenticate(session,login,password)breakexceptAuthenticationError:continueelse:raiseAuthenticationError('authentication failed with all sources')cwuser=self._build_user(session,eid)ifself.config.consider_user_stateand \notcwuser.stateincwuser.AUTHENTICABLE_STATES:raiseAuthenticationError('user is not in authenticable state')returncwuserdef_build_user(self,session,eid):"""return a CWUser entity for user with the given eid"""cls=self.vreg.etype_class('CWUser')rql=cls.fetch_rql(session.user,['X eid %(x)s'])rset=session.execute(rql,{'x':eid},'x')assertlen(rset)==1,rsetcwuser=rset.get_entity(0,0)# pylint: disable-msg=W0104# prefetch / cache cwuser's groups and properties. This is especially# useful for internal sessions to avoid security insertionscwuser.groupscwuser.propertiesreturncwuser# public (dbapi) interface ################################################defget_schema(self):"""return the instance schema. This is a public method, not requiring a session id """try:# necessary to support pickling used by pyroself.schema.__hashmode__='pickle'returnself.schemafinally:self.schema.__hashmode__=Nonedefget_cubes(self):"""return the list of cubes used by this instance. This is a public method, not requiring a session id. """versions=self.get_versions(not(self.config.creatingorself.config.repairing))cubes=list(versions)cubes.remove('cubicweb')returncubes@cacheddefget_versions(self,checkversions=False):"""return the a dictionary containing cubes used by this instance as key with their version as value, including cubicweb version. This is a public method, not requiring a session id. """fromlogilab.common.changelogimportVersionvcconf={}session=self.internal_session()try:forpk,versioninsession.execute('Any K,V WHERE P is CWProperty, P value V, P pkey K, ''P pkey ~="system.version.%"',build_descr=False):cube=pk.split('.')[-1]# XXX cubicweb migrationifcubeinCW_MIGRATION_MAP:cube=CW_MIGRATION_MAP[cube]version=Version(version)vcconf[cube]=versionifcheckversions:ifcube!='cubicweb':fsversion=self.config.cube_version(cube)else:fsversion=self.config.cubicweb_version()ifversion<fsversion:msg=('instance has %s version %s but %s ''is installed. Run "cubicweb-ctl upgrade".')raiseExecutionError(msg%(cube,version,fsversion))finally:session.close()returnvcconf@cacheddefsource_defs(self):sources=self.config.sources().copy()# remove manager informationsources.pop('admin',None)# remove sensitive informationforuri,sourcedefinsources.iteritems():sourcedef=sourcedef.copy()self.sources_by_uri[uri].remove_sensitive_information(sourcedef)sources[uri]=sourcedefreturnsourcesdefproperties(self):"""return a result set containing system wide properties"""session=self.internal_session()try:returnsession.execute('Any K,V WHERE P is CWProperty,''P pkey K, P value V, NOT P for_user U',build_descr=False)finally:session.close()defregister_user(self,login,password,email=None,**kwargs):"""check a user with the given login exists, if not create it with the given password. This method is designed to be used for anonymous registration on public web site. """# XXX should not be called from web interfacesession=self.internal_session()# for consistency, keep same error as unique check hook (although not required)errmsg=session._('the value "%s" is already used, use another one')try:if(session.execute('CWUser X WHERE X login %(login)s',{'login':login})orsession.execute('CWUser X WHERE X use_email C, C address %(login)s',{'login':login})):raiseValidationError(None,{'login':errmsg%login})# we have to create the useruser=self.vreg.etype_class('CWUser')(session,None)ifisinstance(password,unicode):# password should *always* be utf8 encodedpassword=password.encode('UTF8')kwargs['login']=loginkwargs['upassword']=passworduser.update(kwargs)self.glob_add_entity(session,user)session.execute('SET X in_group G WHERE X eid %(x)s, G name "users"',{'x':user.eid})ifemailor'@'inlogin:d={'login':login,'email':emailorlogin}ifsession.execute('EmailAddress X WHERE X address %(email)s',d):raiseValidationError(None,{'address':errmsg%d['email']})session.execute('INSERT EmailAddress X: X address %(email)s, ''U primary_email X, U use_email X WHERE U login %(login)s',d)session.commit()finally:session.close()returnTruedefconnect(self,login,password,cnxprops=None):"""open a connection for a given user base_url may be needed to send mails cnxtype indicate if this is a pyro connection or a in-memory connection raise `AuthenticationError` if the authentication failed raise `ConnectionError` if we can't open a connection """# use an internal connectionsession=self.internal_session()# try to get a user objecttry:user=self.authenticate_user(session,login,password)finally:session.close()session=Session(user,self,cnxprops)user.req=user.rset.req=sessionuser.clear_related_cache()self._sessions[session.id]=sessionself.info('opened %s',session)self.hm.call_hooks('session_open',session=session)# commit session at this point in case write operation has been done# during `session_open` hookssession.commit()returnsession.iddefexecute(self,sessionid,rqlstring,args=None,eid_key=None,build_descr=True):"""execute a RQL query * rqlstring should be an unicode string or a plain ascii string * args the optional parameters used in the query * build_descr is a flag indicating if the description should be built on select queries """session=self._get_session(sessionid,setpool=True)try:try:returnself.querier.execute(session,rqlstring,args,eid_key,build_descr)except(Unauthorized,RQLSyntaxError):raiseexceptValidationError,ex:# need ValidationError normalization here so error may pass# through pyroifhasattr(ex.entity,'eid'):ex.entity=ex.entity.eid# error raised by yamsargs=list(ex.args)args[0]=ex.entityex.args=tuple(args)raiseexcept:# FIXME: check error to catch internal errorsself.exception('unexpected error')raisefinally:session.reset_pool()defdescribe(self,sessionid,eid):"""return a tuple (type, source, extid) for the entity with id <eid>"""session=self._get_session(sessionid,setpool=True)try:returnself.type_and_source_from_eid(eid,session)finally:session.reset_pool()defcheck_session(self,sessionid):"""raise `BadSessionId` if the connection is no more valid"""self._get_session(sessionid,setpool=False)defget_shared_data(self,sessionid,key,default=None,pop=False):"""return the session's data dictionary"""session=self._get_session(sessionid,setpool=False)returnsession.get_shared_data(key,default,pop)defset_shared_data(self,sessionid,key,value,querydata=False):"""set value associated to `key` in shared data if `querydata` is true, the value will be added to the repository session's query data which are cleared on commit/rollback of the current transaction, and won't be available through the connexion, only on the repository side. """session=self._get_session(sessionid,setpool=False)session.set_shared_data(key,value,querydata)defcommit(self,sessionid):"""commit transaction for the session with the given id"""self.debug('begin commit for session %s',sessionid)try:self._get_session(sessionid).commit()except(ValidationError,Unauthorized):raiseexcept:self.exception('unexpected error')raisedefrollback(self,sessionid):"""commit transaction for the session with the given id"""self.debug('begin rollback for session %s',sessionid)try:self._get_session(sessionid).rollback()except:self.exception('unexpected error')raisedefclose(self,sessionid,checkshuttingdown=True):"""close the session with the given id"""session=self._get_session(sessionid,setpool=True,checkshuttingdown=checkshuttingdown)# operation uncommited before close are rollbacked before hook is calledsession.rollback()self.hm.call_hooks('session_close',session=session)# commit session at this point in case write operation has been done# during `session_close` hookssession.commit()session.close()delself._sessions[sessionid]self.info('closed session %s for user %s',sessionid,session.user.login)defuser_info(self,sessionid,props=None):"""this method should be used by client to: * check session id validity * update user information on each user's request (i.e. groups and custom properties) """session=self._get_session(sessionid,setpool=False)ifpropsisnotNone:self.set_session_props(sessionid,props)user=session.userreturnuser.eid,user.login,user.groups,user.propertiesdefset_session_props(self,sessionid,props):"""this method should be used by client to: * check session id validity * update user information on each user's request (i.e. groups and custom properties) """session=self._get_session(sessionid,setpool=False)# update session propertiesforprop,valueinprops.items():session.change_property(prop,value)# public (inter-repository) interface #####################################defentities_modified_since(self,etypes,mtime):"""function designed to be called from an external repository which is using this one as a rql source for synchronization, and return a 3-uple containing : * the local date * list of (etype, eid) of entities of the given types which have been modified since the given timestamp (actually entities whose full text index content has changed) * list of (etype, eid) of entities of the given types which have been deleted since the given timestamp """session=self.internal_session()updatetime=datetime.now()try:modentities,delentities=self.system_source.modified_entities(session,etypes,mtime)returnupdatetime,modentities,delentitiesfinally:session.close()# session handling ########################################################defclose_sessions(self):"""close every opened sessions"""forsessionidinself._sessions.keys():try:self.close(sessionid,checkshuttingdown=False)except:self.exception('error while closing session %s'%sessionid)defclean_sessions(self):"""close sessions not used since an amount of time specified in the configuration """mintime=time()-self.config['session-time']self.debug('cleaning session unused since %s',strftime('%T',localtime(mintime)))nbclosed=0forsessioninself._sessions.values():ifsession.timestamp<mintime:self.close(session.id)nbclosed+=1returnnbcloseddefinternal_session(self,cnxprops=None):"""return a dbapi like connection/cursor using internal user which have every rights on the repository. You'll *have to* commit/rollback or close (rollback implicitly) the session once the job's done, else you'll leak connections pool up to the time where no more pool is available, causing irremediable freeze... """session=InternalSession(self,cnxprops)session.set_pool()returnsessiondef_get_session(self,sessionid,setpool=False,checkshuttingdown=True):"""return the user associated to the given session identifier"""ifcheckshuttingdownandself._shutting_down:raiseException('Repository is shutting down')try:session=self._sessions[sessionid]exceptKeyError:raiseBadConnectionId('No such session %s'%sessionid)ifsetpool:session.set_pool()returnsession# data sources handling #################################################### * correspondance between eid and (type, source)# * correspondance between eid and local id (i.e. specific to a given source)# * searchable text indexesdeftype_and_source_from_eid(self,eid,session=None):"""return a tuple (type, source, extid) for the entity with id <eid>"""try:eid=typed_eid(eid)exceptValueError:raiseUnknownEid(eid)try:returnself._type_source_cache[eid]exceptKeyError:ifsessionisNone:session=self.internal_session()reset_pool=Trueelse:reset_pool=Falsetry:etype,uri,extid=self.system_source.eid_type_source(session,eid)finally:ifreset_pool:session.reset_pool()self._type_source_cache[eid]=(etype,uri,extid)ifuri!='system':self._extid_cache[(extid,uri)]=eidreturnetype,uri,extiddefclear_caches(self,eids):etcache=self._type_source_cacheextidcache=self._extid_cacherqlcache=self.querier._rql_cacheforeidineids:try:etype,uri,extid=etcache.pop(typed_eid(eid))# may be a string in some casesrqlcache.pop('%s X WHERE X eid %s'%(etype,eid),None)extidcache.pop((extid,uri),None)exceptKeyError:etype=Nonerqlcache.pop('Any X WHERE X eid %s'%eid,None)forsourceinself.sources:source.clear_eid_cache(eid,etype)deftype_from_eid(self,eid,session=None):"""return the type of the entity with id <eid>"""returnself.type_and_source_from_eid(eid,session)[0]defsource_from_eid(self,eid,session=None):"""return the source for the given entity's eid"""returnself.sources_by_uri[self.type_and_source_from_eid(eid,session)[1]]defeid2extid(self,source,eid,session=None):"""get local id from an eid"""etype,uri,extid=self.type_and_source_from_eid(eid,session)ifsource.uri!=uri:# eid not from the given sourceraiseUnknownEid(eid)returnextiddefextid2eid(self,source,extid,etype,session=None,insert=True,recreate=False):"""get eid from a local id. An eid is attributed if no record is found"""cachekey=(extid,source.uri)try:returnself._extid_cache[cachekey]exceptKeyError:passreset_pool=FalseifsessionisNone:session=self.internal_session()reset_pool=Trueeid=self.system_source.extid2eid(session,source,extid)ifeidisnotNone:self._extid_cache[cachekey]=eidself._type_source_cache[eid]=(etype,source.uri,extid)ifrecreate:entity=source.before_entity_insertion(session,extid,etype,eid)entity._cw_recreating=Trueifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',etype,session,entity)# XXX add fti op ?source.after_entity_insertion(session,extid,entity)ifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',etype,session,entity)ifreset_pool:session.reset_pool()returneidifnotinsert:return# no link between extid and eid, create one using an internal session# since the current session user may not have required permissions to# do necessary stuff and we don't want to commit user session.## Moreover, even if session is already an internal session but is# processing a commit, we have to use another oneifnotsession.is_internal_session:session=self.internal_session()reset_pool=Truetry:eid=self.system_source.create_eid(session)self._extid_cache[cachekey]=eidself._type_source_cache[eid]=(etype,source.uri,extid)entity=source.before_entity_insertion(session,extid,etype,eid)ifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',etype,session,entity)# XXX call add_info with complete=False ?self.add_info(session,entity,source,extid)source.after_entity_insertion(session,extid,entity)ifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',etype,session,entity)else:# minimal meta-datasession.execute('SET X is E WHERE X eid %(x)s, E name %(name)s',{'x':entity.eid,'name':entity.id},'x')session.commit(reset_pool)returneidexcept:session.rollback(reset_pool)raisedefadd_info(self,session,entity,source,extid=None,complete=True):"""add type and source info for an eid into the system table, and index the entity with the full text index """# begin by inserting eid/type/source/extid into the entities tableself.system_source.add_info(session,entity,source,extid)ifcomplete:entity.complete(entity.e_schema.indexable_attributes())new=session.transaction_data.setdefault('neweids',set())new.add(entity.eid)# now we can update the full text indexifself.do_fti:FTIndexEntityOp(session,entity=entity)CleanupEidTypeCacheOp(session)defdelete_info(self,session,eid):self._prepare_delete_info(session,eid)self._delete_info(session,eid)def_prepare_delete_info(self,session,eid):"""prepare the repository for deletion of an entity: * update the fti * mark eid as being deleted in session info * setup cache update operation """self.system_source.fti_unindex_entity(session,eid)pending=session.transaction_data.setdefault('pendingeids',set())pending.add(eid)CleanupEidTypeCacheOp(session)def_delete_info(self,session,eid):"""delete system information on deletion of an entity: * delete all relations on this entity * transfer record from the entities table to the deleted_entities table """etype,uri,extid=self.type_and_source_from_eid(eid,session)self._clear_eid_relations(session,etype,eid)self.system_source.delete_info(session,eid,etype,uri,extid)def_clear_eid_relations(self,session,etype,eid):"""when a entity is deleted, build and execute rql query to delete all its relations """rql=[]eschema=self.schema.eschema(etype)forrschema,targetschemas,xineschema.relation_definitions():rtype=rschema.typeifrtypeinVIRTUAL_RTYPES:continuevar='%s%s'%(rtype.upper(),x.upper())ifx=='subject':# don't skip inlined relation so they are regularly# deleted and so hooks are correctly calledrql.append('X %s%s'%(rtype,var))else:rql.append('%s%s X'%(var,rtype))rql='DELETE %s WHERE X eid %%(x)s'%','.join(rql)# unsafe_execute since we suppose that if user can delete the entity,# he can delete all its relations without security checkingsession.unsafe_execute(rql,{'x':eid},'x',build_descr=False)defindex_entity(self,session,entity):"""full text index a modified entity"""alreadydone=session.transaction_data.setdefault('indexedeids',set())ifentity.eidinalreadydone:self.info('skipping reindexation of %s, already done',entity.eid)returnalreadydone.add(entity.eid)self.system_source.fti_index_entity(session,entity)deflocate_relation_source(self,session,subject,rtype,object):subjsource=self.source_from_eid(subject,session)objsource=self.source_from_eid(object,session)ifnot(subjsourceisobjsourceandsubjsource.support_relation(rtype,1)):source=self.system_sourceifnotsource.support_relation(rtype,1):raiseRTypeNotSupportedBySources(rtype)else:source=subjsourcereturnsourcedeflocate_etype_source(self,etype):forsourceinself.sources:ifsource.support_entity(etype,1):returnsourceelse:raiseETypeNotSupportedBySources(etype)defglob_add_entity(self,session,entity):"""add an entity to the repository the entity eid should originaly be None and a unique eid is assigned to the entity instance """entity=entity.pre_add_hook()eschema=entity.e_schemaetype=str(eschema)source=self.locate_etype_source(etype)# attribute an eid to the entity before calling hooksentity.set_eid(self.system_source.create_eid(session))ifserver.DEBUG&server.DBG_REPO:print'ADD entity',etype,entity.eid,dict(entity)entity._is_saved=False# entity has an eid but is not yet savedrelations=[]# if inlined relations are specified, fill entity's related cache to# avoid unnecessary queriesforattrinentity.keys():rschema=eschema.subject_relation(attr)ifnotrschema.is_final():# inlined relationentity.set_related_cache(attr,'subject',entity.req.eid_rset(entity[attr]))relations.append((attr,entity[attr]))ifsource.should_call_hooks:self.hm.call_hooks('before_add_entity',etype,session,entity)entity.set_defaults()entity.check(creation=True)source.add_entity(session,entity)ifsource.uri!='system':extid=source.get_extid(entity)self._extid_cache[(str(extid),source.uri)]=entity.eidelse:extid=Noneself.add_info(session,entity,source,extid,complete=False)entity._is_saved=True# entity has an eid and is saved# trigger after_add_entity after after_add_relationifsource.should_call_hooks:self.hm.call_hooks('after_add_entity',etype,session,entity)# call hooks for inlined relationsforattr,valueinrelations:self.hm.call_hooks('before_add_relation',attr,session,entity.eid,attr,value)self.hm.call_hooks('after_add_relation',attr,session,entity.eid,attr,value)returnentity.eiddefglob_update_entity(self,session,entity):"""replace an entity in the repository the type and the eid of an entity must not be changed """etype=str(entity.e_schema)ifserver.DEBUG&server.DBG_REPO:print'UPDATE entity',etype,entity.eid,dict(entity)entity.check()eschema=entity.e_schemaonly_inline_rels,need_fti_update=True,Falserelations=[]forattrinentity.keys():ifattr=='eid':continuerschema=eschema.subject_relation(attr)ifrschema.is_final():ifeschema.rproperty(attr,'fulltextindexed'):need_fti_update=Trueonly_inline_rels=Falseelse:# inlined relationprevious_value=entity.related(attr)ifprevious_value:previous_value=previous_value[0][0]# got a result setself.hm.call_hooks('before_delete_relation',attr,session,entity.eid,attr,previous_value)entity.set_related_cache(attr,'subject',entity.req.eid_rset(entity[attr]))relations.append((attr,entity[attr],previous_value))source=self.source_from_eid(entity.eid,session)ifsource.should_call_hooks:# call hooks for inlined relationsforattr,value,_inrelations:self.hm.call_hooks('before_add_relation',attr,session,entity.eid,attr,value)ifnotonly_inline_rels:self.hm.call_hooks('before_update_entity',etype,session,entity)source.update_entity(session,entity)ifnotonly_inline_rels:ifneed_fti_updateandself.do_fti:# reindex the entity only if this query is updating at least# one indexable attributeFTIndexEntityOp(session,entity=entity)ifsource.should_call_hooks:self.hm.call_hooks('after_update_entity',etype,session,entity)ifsource.should_call_hooks:forattr,value,prevvalueinrelations:ifprevvalue:self.hm.call_hooks('after_delete_relation',attr,session,entity.eid,attr,prevvalue)del_existing_rel_if_needed(session,entity.eid,attr,value)self.hm.call_hooks('after_add_relation',attr,session,entity.eid,attr,value)defglob_delete_entity(self,session,eid):"""delete an entity and all related entities from the repository"""# call delete_info before hooksself._prepare_delete_info(session,eid)etype,uri,extid=self.type_and_source_from_eid(eid,session)ifserver.DEBUG&server.DBG_REPO:print'DELETE entity',etype,eidsource=self.sources_by_uri[uri]ifsource.should_call_hooks:self.hm.call_hooks('before_delete_entity',etype,session,eid)self._delete_info(session,eid)source.delete_entity(session,etype,eid)ifsource.should_call_hooks:self.hm.call_hooks('after_delete_entity',etype,session,eid)# don't clear cache here this is done in a hook on commitdefglob_add_relation(self,session,subject,rtype,object):"""add a relation to the repository"""ifserver.DEBUG&server.DBG_REPO:print'ADD relation',subject,rtype,objectsource=self.locate_relation_source(session,subject,rtype,object)ifsource.should_call_hooks:del_existing_rel_if_needed(session,subject,rtype,object)self.hm.call_hooks('before_add_relation',rtype,session,subject,rtype,object)source.add_relation(session,subject,rtype,object)ifsource.should_call_hooks:self.hm.call_hooks('after_add_relation',rtype,session,subject,rtype,object)defglob_delete_relation(self,session,subject,rtype,object):"""delete a relation from the repository"""ifserver.DEBUG&server.DBG_REPO:print'DELETE relation',subject,rtype,objectsource=self.locate_relation_source(session,subject,rtype,object)ifsource.should_call_hooks:self.hm.call_hooks('before_delete_relation',rtype,session,subject,rtype,object)source.delete_relation(session,subject,rtype,object)ifself.schema.rschema(rtype).symetric:# on symetric relation, we can't now in which sense it's# stored so try to delete bothsource.delete_relation(session,object,rtype,subject)ifsource.should_call_hooks:self.hm.call_hooks('after_delete_relation',rtype,session,subject,rtype,object)# pyro handling ###########################################################defpyro_register(self,host=''):"""register the repository as a pyro object"""fromPyroimportcoreport=self.config['pyro-port']nshost,nsgroup=self.config['pyro-ns-host'],self.config['pyro-ns-group']nsgroup=':'+nsgroupcore.initServer(banner=0)daemon=core.Daemon(host=host,port=port)daemon.useNameServer(self.pyro_nameserver(nshost,nsgroup))# use Delegation approachimpl=core.ObjBase()impl.delegateTo(self)nsid=self.config['pyro-id']orself.config.appiddaemon.connect(impl,'%s.%s'%(nsgroup,nsid))msg='repository registered as a pyro object using group %s and id %s'self.info(msg,nsgroup,nsid)self.pyro_registered=Truereturndaemondefpyro_nameserver(self,host=None,group=None):"""locate and bind the the name server to the daemon"""fromPyroimportnaming,errors# locate the name servernameserver=naming.NameServerLocator().getNS(host)ifgroupisnotNone:# make sure our namespace group existstry:nameserver.createGroup(group)excepterrors.NamingError:passreturnnameserver# multi-sources planner helpers ###########################################@cacheddefrel_type_sources(self,rtype):return[sourceforsourceinself.sourcesifsource.support_relation(rtype)orrtypeinsource.dont_cross_relations]@cacheddefcan_cross_relation(self,rtype):return[sourceforsourceinself.sourcesifsource.support_relation(rtype)andrtypeinsource.cross_relations]@cacheddefis_multi_sources_relation(self,rtype):returnany(sourceforsourceinself.sourcesifnotsourceisself.system_sourceandsource.support_relation(rtype))defpyro_unregister(config):"""unregister the repository from the pyro name server"""nshost,nsgroup=config['pyro-ns-host'],config['pyro-ns-group']appid=config['pyro-id']orconfig.appidfromPyroimportcore,naming,errorscore.initClient(banner=False)try:nameserver=naming.NameServerLocator().getNS(nshost)excepterrors.PyroError,ex:# name server not respondingconfig.error('can\'t locate pyro name server: %s',ex)returntry:nameserver.unregister(':%s.%s'%(nsgroup,appid))config.info('%s unregistered from pyro name server',appid)excepterrors.NamingError:config.warning('%s already unregistered from pyro name server',appid)fromloggingimportgetLoggerfromcubicwebimportset_log_methodsset_log_methods(Repository,getLogger('cubicweb.repository'))