"""Repository users' and internal' sessions.:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"importsysimportthreadingfromtimeimporttimefromlogilab.common.deprecationimportdeprecatedfromrql.nodesimportVariableRef,Function,ETYPE_PYOBJ_MAP,etype_from_pyobjfromyamsimportBASE_TYPESfromcubicwebimportBinary,UnknownEidfromcubicweb.reqimportRequestSessionBasefromcubicweb.dbapiimportConnectionPropertiesfromcubicweb.utilsimportmake_uidfromcubicweb.rqlrewriteimportRQLRewriterETYPE_PYOBJ_MAP[Binary]='Bytes'defis_final(rqlst,variable,args):# try to find if this is a final var or notforselectinrqlst.children:forsolinselect.solutions:etype=variable.get_type(sol,args)ifetypeisNone:continueifetypeinBASE_TYPES:returnTruereturnFalsedef_make_description(selected,args,solution):"""return a description for a result set"""description=[]forterminselected:description.append(term.get_type(solution,args))returndescriptionclassSession(RequestSessionBase):"""tie session id, user, connections pool and other session data all together """def__init__(self,user,repo,cnxprops=None,_id=None):super(Session,self).__init__(repo.vreg)self.id=_idormake_uid(user.login.encode('UTF8'))cnxprops=cnxpropsorConnectionProperties('inmemory')self.user=userself.repo=repoself.cnxtype=cnxprops.cnxtypeself.creation=time()self.timestamp=self.creationself.is_internal_session=Falseself.is_super_session=Falseself.default_mode='read'# short cut to querier .execute methodself._execute=repo.querier.execute# shared data, used to communicate extra information between the client# and the rql serverself.data={}# i18n initializationself.set_language(cnxprops.lang)# internalsself._threaddata=threading.local()self._threads_in_transaction=set()self._closed=Falsedef__str__(self):return'<%ssession %s (%s 0x%x)>'%(self.cnxtype,self.user.login,self.id,id(self))defhijack_user(self,user):"""return a fake request/session using specified user"""session=Session(user,self.repo)session._threaddata=self.actual_session()._threaddatareturnsessiondef_super_call(self,__cb,*args,**kwargs):ifself.is_super_session:__cb(self,*args,**kwargs)returnself.is_super_session=Truetry:__cb(self,*args,**kwargs)finally:self.is_super_session=Falsedefadd_relation(self,fromeid,rtype,toeid):"""provide direct access to the repository method to add a relation. This is equivalent to the following rql query: SET X rtype Y WHERE X eid fromeid, T eid toeid without read security check but also all the burden of rql execution. You may use this in hooks when you know both eids of the relation you want to add. """ifself.vreg.schema[rtype].inlined:entity=self.entity_from_eid(fromeid)entity[rtype]=toeidself._super_call(self.repo.glob_update_entity,entity,set((rtype,)))else:self._super_call(self.repo.glob_add_relation,fromeid,rtype,toeid)defdelete_relation(self,fromeid,rtype,toeid):"""provide direct access to the repository method to delete a relation. This is equivalent to the following rql query: DELETE X rtype Y WHERE X eid fromeid, T eid toeid without read security check but also all the burden of rql execution. You may use this in hooks when you know both eids of the relation you want to delete. """ifself.vreg.schema[rtype].inlined:entity=self.entity_from_eid(fromeid)entity[rtype]=Noneself._super_call(self.repo.glob_update_entity,entity,set((rtype,)))else:self._super_call(self.repo.glob_delete_relation,fromeid,rtype,toeid)# relations cache handling #################################################defupdate_rel_cache_add(self,subject,rtype,object,symmetric=False):self._update_entity_rel_cache_add(subject,rtype,'subject',object)ifsymmetric:self._update_entity_rel_cache_add(object,rtype,'subject',subject)else:self._update_entity_rel_cache_add(object,rtype,'object',subject)defupdate_rel_cache_del(self,subject,rtype,object,symmetric=False):self._update_entity_rel_cache_del(subject,rtype,'subject',object)ifsymmetric:self._update_entity_rel_cache_del(object,rtype,'object',object)else:self._update_entity_rel_cache_del(object,rtype,'object',subject)def_update_entity_rel_cache_add(self,eid,rtype,role,targeteid):try:entity=self.entity_cache(eid)exceptKeyError:returnrcache=entity.relation_cached(rtype,role)ifrcacheisnotNone:rset,entities=rcacherset=rset.copy()entities=list(entities)rset.rows.append([targeteid])ifnotisinstance(rset.description,list):# else description not setrset.description=list(rset.description)rset.description.append([self.describe(targeteid)[0]])targetentity=self.entity_from_eid(targeteid)iftargetentity.cw_rsetisNone:targetentity.cw_rset=rsettargetentity.cw_row=rset.rowcounttargetentity.cw_col=0rset.rowcount+=1entities.append(targetentity)entity._related_cache['%s_%s'%(rtype,role)]=(rset,tuple(entities))def_update_entity_rel_cache_del(self,eid,rtype,role,targeteid):try:entity=self.entity_cache(eid)exceptKeyError:returnrcache=entity.relation_cached(rtype,role)ifrcacheisnotNone:rset,entities=rcacheforidx,rowinenumerate(rset.rows):ifrow[0]==targeteid:breakelse:# this may occurs if the cache has been filed by a hook# after the database updateself.debug('cache inconsistency for %s%s%s%s',eid,rtype,role,targeteid)returnrset=rset.copy()entities=list(entities)delrset.rows[idx]ifisinstance(rset.description,list):# else description not setdelrset.description[idx]delentities[idx]rset.rowcount-=1entity._related_cache['%s_%s'%(rtype,role)]=(rset,tuple(entities))# resource accessors ######################################################defactual_session(self):"""return the original parent session if any, else self"""returnselfdefsystem_sql(self,sql,args=None,rollback_on_failure=True):"""return a sql cursor on the system database"""ifnotsql.split(None,1)[0].upper()=='SELECT':self.mode='write'returnself.pool.source('system').doexec(self,sql,args,rollback=rollback_on_failure)defset_language(self,language):"""i18n configuration for translation"""vreg=self.vreglanguage=languageorself.user.property_value('ui.language')try:gettext,pgettext=vreg.config.translations[language]self._=self.__=gettextself.pgettext=pgettextexceptKeyError:language=vreg.property_value('ui.language')try:gettext,pgettext=vreg.config.translations[language]self._=self.__=gettextself.pgettext=pgettextexceptKeyError:self._=self.__=unicodeself.pgettext=lambdax,y:yself.lang=languagedefchange_property(self,prop,value):assertprop=='lang'# this is the only one changeable property for nowself.set_language(value)defdeleted_in_transaction(self,eid):returneidinself.transaction_data.get('pendingeids',())defadded_in_transaction(self,eid):returneidinself.transaction_data.get('neweids',())defschema_rproperty(self,rtype,eidfrom,eidto,rprop):rschema=self.repo.schema[rtype]subjtype=self.describe(eidfrom)[0]objtype=self.describe(eidto)[0]rdef=rschema.rdef(subjtype,objtype)returnrdef.get(rprop)# connection management ###################################################defkeep_pool_mode(self,mode):"""set pool_mode, e.g. how the session will keep its pool: * if mode == 'write', the pool is freed after each ready query, but kept until the transaction's end (eg commit or rollback) when a write query is detected (eg INSERT/SET/DELETE queries) * if mode == 'transaction', the pool is only freed after the transaction's end notice that a repository has a limited set of pools, and a session has to wait for a free pool to run any rql query (unless it already has a pool set). """assertmodein('transaction','write')ifmode=='transaction':self.default_mode='transaction'else:# mode == 'write'self.default_mode='read'defget_mode(self):returngetattr(self._threaddata,'mode',self.default_mode)defset_mode(self,value):self._threaddata.mode=valuemode=property(get_mode,set_mode,doc='transaction mode (read/write/transaction), resetted to'' default_mode on commit / rollback')defget_commit_state(self):returngetattr(self._threaddata,'commit_state',None)defset_commit_state(self,value):self._threaddata.commit_state=valuecommit_state=property(get_commit_state,set_commit_state)@propertydefpool(self):"""connections pool, set according to transaction mode for each query"""returngetattr(self._threaddata,'pool',None)defset_pool(self,checkclosed=True):"""the session need a pool to execute some queries"""ifcheckclosedandself._closed:raiseException('try to set pool on a closed session')ifself.poolisNone:# get pool first to avoid race-conditionself._threaddata.pool=pool=self.repo._get_pool()try:pool.pool_set()except:self._threaddata.pool=Noneself.repo._free_pool(pool)raiseself._threads_in_transaction.add(threading.currentThread())returnself._threaddata.pooldefreset_pool(self,ignoremode=False):"""the session is no longer using its pool, at least for some time"""# pool may be none if no operation has been done since last commit# or rollbackifself.poolisnotNoneand(ignoremodeorself.mode=='read'):# even in read mode, we must release the current transactionpool=self.pooltry:self._threads_in_transaction.remove(threading.currentThread())exceptKeyError:passpool.pool_reset()self._threaddata.pool=None# free pool once everything is done to avoid race-conditionself.repo._free_pool(pool)def_touch(self):"""update latest session usage timestamp and reset mode to read"""self.timestamp=time()self.local_perm_cache.clear()self._threaddata.mode=self.default_mode# shared data handling ###################################################defget_shared_data(self,key,default=None,pop=False):"""return value associated to `key` in session data"""ifpop:returnself.data.pop(key,default)else:returnself.data.get(key,default)defset_shared_data(self,key,value,querydata=False):"""set value associated to `key` in session data"""ifquerydata:self.transaction_data[key]=valueelse:self.data[key]=value# request interface #######################################################@propertydefcursor(self):"""return a rql cursor"""returnselfdefset_entity_cache(self,entity):# XXX session level caching may be a pb with multiple repository# instances, but 1. this is probably not the only one :$ and 2. it# may be an acceptable risk. Anyway we could activate it or not# according to a configuration optiontry:self.transaction_data['ecache'].setdefault(entity.eid,entity)exceptKeyError:self.transaction_data['ecache']=ecache={}ecache[entity.eid]=entitydefentity_cache(self,eid):try:returnself.transaction_data['ecache'][eid]except:raisedefcached_entities(self):returnself.transaction_data.get('ecache',{}).values()defdrop_entity_cache(self,eid=None):ifeidisNone:self.transaction_data.pop('ecache',None)else:delself.transaction_data['ecache'][eid]defbase_url(self):url=self.repo.config['base-url']ifnoturl:try:url=self.repo.config.default_base_url()exceptAttributeError:# default_base_url() might not be availableself.warning('missing base-url definition in server config')url=u''returnurldeffrom_controller(self):"""return the id (string) of the controller issuing the request (no sense here, always return 'view') """return'view'defsource_defs(self):returnself.repo.source_defs()defdescribe(self,eid):"""return a tuple (type, sourceuri, extid) for the entity with id <eid>"""returnself.repo.type_and_source_from_eid(eid,self)# db-api like interface ###################################################defsource_from_eid(self,eid):"""return the source where the entity with id <eid> is located"""returnself.repo.source_from_eid(eid,self)defdecorate_rset(self,rset,propagate=False):rset.vreg=self.vregrset.req=propagateandselforself.actual_session()returnrset@propertydefsuper_session(self):try:csession=self._threaddata.childsessionexceptAttributeError:ifisinstance(self,(ChildSession,InternalSession)):csession=selfelse:csession=ChildSession(self)self._threaddata.childsession=csession# need shared pool setself.set_pool(checkclosed=False)returncsessiondefunsafe_execute(self,rql,kwargs=None,eid_key=None,build_descr=True,propagate=False):"""like .execute but with security checking disabled (this method is internal to the server, it's not part of the db-api) if `propagate` is true, the super_session will be attached to the result set instead of the parent session, hence further query done through entities fetched from this result set will bypass security as well """returnself.super_session.execute(rql,kwargs,eid_key,build_descr,propagate)defexecute(self,rql,kwargs=None,eid_key=None,build_descr=True,propagate=False):"""db-api like method directly linked to the querier execute method Becare that unlike actual cursor.execute, `build_descr` default to false """rset=self._execute(self,rql,kwargs,eid_key,build_descr)returnself.decorate_rset(rset,propagate)defcommit(self,reset_pool=True):"""commit the current session's transaction"""ifself.poolisNone:assertnotself.pending_operationsself.transaction_data.clear()self._touch()self.debug('commit session %s done (no db activity)',self.id)returnifself.commit_state:return# on rollback, an operation should have the following state# information:# - processed by the precommit/commit event or not# - if processed, is it the failed operationtry:fortrstatein('precommit','commit'):processed=[]self.commit_state=trstatetry:whileself.pending_operations:operation=self.pending_operations.pop(0)operation.processed=trstateprocessed.append(operation)operation.handle_event('%s_event'%trstate)self.pending_operations[:]=processedself.debug('%s session %s done',trstate,self.id)except:self.exception('error while %sing',trstate)# if error on [pre]commit:## * set .failed = True on the operation causing the failure# * call revert<event>_event on processed operations# * call rollback_event on *all* operations## that seems more natural than not calling rollback_event# for processed operations, and allow generic rollback# instead of having to implements rollback, revertprecommit# and revertcommit, that will be enough in mont case.operation.failed=Trueforoperationinprocessed:operation.handle_event('revert%s_event'%trstate)# XXX use slice notation since self.pending_operations is a# read-only property.self.pending_operations[:]=processed+self.pending_operationsself.rollback(reset_pool)raiseself.pool.commit()self.commit_state=trstate='postcommit'whileself.pending_operations:operation=self.pending_operations.pop(0)operation.processed=trstatetry:operation.handle_event('%s_event'%trstate)except:self.critical('error while %sing',trstate,exc_info=sys.exc_info())self.info('%s session %s done',trstate,self.id)finally:self._touch()self.commit_state=Noneself.pending_operations[:]=[]self.transaction_data.clear()ifreset_pool:self.reset_pool(ignoremode=True)defrollback(self,reset_pool=True):"""rollback the current session's transaction"""ifself.poolisNone:assertnotself.pending_operationsself.transaction_data.clear()self._touch()self.debug('rollback session %s done (no db activity)',self.id)returntry:whileself.pending_operations:try:operation=self.pending_operations.pop(0)operation.handle_event('rollback_event')except:self.critical('rollback error',exc_info=sys.exc_info())continueself.pool.rollback()self.debug('rollback for session %s done',self.id)finally:self._touch()self.pending_operations[:]=[]self.transaction_data.clear()ifreset_pool:self.reset_pool(ignoremode=True)defclose(self):"""do not close pool on session close, since they are shared now"""self._closed=True# copy since _threads_in_transaction maybe modified while waitingforthreadinself._threads_in_transaction.copy():ifthreadisthreading.currentThread():continueself.info('waiting for thread %s',thread)# do this loop/break instead of a simple join(10) in case thread is# the main thread (in which case it will be removed from# self._threads_in_transaction but still be alive...)foriinxrange(10):thread.join(1)ifnot(thread.isAlive()andthreadinself._threads_in_transaction):breakelse:self.error('thread %s still alive after 10 seconds, will close ''session anyway',thread)self.rollback()# transaction data/operations management ##################################@propertydeftransaction_data(self):try:returnself._threaddata.transaction_dataexceptAttributeError:self._threaddata.transaction_data={}returnself._threaddata.transaction_data@propertydefpending_operations(self):try:returnself._threaddata.pending_operationsexceptAttributeError:self._threaddata.pending_operations=[]returnself._threaddata.pending_operationsdefadd_operation(self,operation,index=None):"""add an observer"""assertself.commit_state!='commit'ifindexisnotNone:self.pending_operations.insert(index,operation)else:self.pending_operations.append(operation)# querier helpers #########################################################@propertydefrql_rewriter(self):try:returnself._threaddata._rewriterexceptAttributeError:self._threaddata._rewriter=RQLRewriter(self)returnself._threaddata._rewriterdefbuild_description(self,rqlst,args,result):"""build a description for a given result"""iflen(rqlst.children)==1andlen(rqlst.children[0].solutions)==1:# easy, all lines are identicalselected=rqlst.children[0].selectionsolution=rqlst.children[0].solutions[0]description=_make_description(selected,args,solution)return[tuple(description)]*len(result)# hard, delegate the work :o)returnself.manual_build_descr(rqlst,args,result)defmanual_build_descr(self,rqlst,args,result):"""build a description for a given result by analysing each row XXX could probably be done more efficiently during execution of query """# not so easy, looks for variable which changes from one solution# to anotherunstables=rqlst.get_variable_variables()basedescription=[]todetermine=[]selected=rqlst.children[0].selection# sample selectionfori,terminenumerate(selected):ifisinstance(term,Function)andterm.descr().rtypeisnotNone:basedescription.append(term.get_type(term.descr().rtype,args))continueforvrefinterm.get_nodes(VariableRef):ifvref.nameinunstables:basedescription.append(None)todetermine.append((i,is_final(rqlst,vref.variable,args)))breakelse:# sample etypeetype=rqlst.children[0].solutions[0]basedescription.append(term.get_type(etype,args))ifnottodetermine:return[tuple(basedescription)]*len(result)returnself._build_descr(result,basedescription,todetermine)def_build_descr(self,result,basedescription,todetermine):description=[]etype_from_eid=self.describeforrowinresult:row_descr=basedescriptionforindex,isfinalintodetermine:value=row[index]ifvalueisNone:# None value inserted by an outer join, no typerow_descr[index]=Nonecontinueifisfinal:row_descr[index]=etype_from_pyobj(value)else:try:row_descr[index]=etype_from_eid(value)[0]exceptUnknownEid:self.critical('wrong eid %s in repository, should check database'%value)row_descr[index]=row[index]=Nonedescription.append(tuple(row_descr))returndescription# deprecated ###############################################################@property@deprecated("[3.6] use session.vreg.schema")defschema(self):returnself.repo.schema@deprecated("[3.4] use vreg['etypes'].etype_class(etype)")defetype_class(self,etype):"""return an entity class for the given entity type"""returnself.vreg['etypes'].etype_class(etype)@deprecated('[3.4] use direct access to session.transaction_data')defquery_data(self,key,default=None,setdefault=False,pop=False):ifsetdefault:assertnotpopreturnself.transaction_data.setdefault(key,default)ifpop:returnself.transaction_data.pop(key,default)else:returnself.transaction_data.get(key,default)@deprecated('[3.4] use entity_from_eid(eid, etype=None)')defentity(self,eid):"""return a result set for the given eid"""returnself.entity_from_eid(eid)classChildSession(Session):"""child (or internal) session are used to hijack the security system """cnxtype='inmemory'def__init__(self,parent_session):self.id=Noneself.is_internal_session=Falseself.is_super_session=True# session which has created this oneself.parent_session=parent_sessionself.user=InternalManager()self.user.req=self# XXX remove when "vreg = user.req.vreg" hack in entity.py is goneself.repo=parent_session.repoself.vreg=parent_session.vregself.data=parent_session.dataself.encoding=parent_session.encodingself.lang=parent_session.langself._=self.__=parent_session._# short cut to querier .execute methodself._execute=self.repo.querier.execute@propertydefsuper_session(self):returnselfdefget_mode(self):returnself.parent_session.modedefset_mode(self,value):self.parent_session.set_mode(value)mode=property(get_mode,set_mode)defget_commit_state(self):returnself.parent_session.commit_statedefset_commit_state(self,value):self.parent_session.set_commit_state(value)commit_state=property(get_commit_state,set_commit_state)@propertydefpool(self):returnself.parent_session.pool@propertydefpending_operations(self):returnself.parent_session.pending_operations@propertydeftransaction_data(self):returnself.parent_session.transaction_datadefset_pool(self):"""the session need a pool to execute some queries"""self.parent_session.set_pool()defreset_pool(self):"""the session has no longer using its pool, at least for some time """self.parent_session.reset_pool()defactual_session(self):"""return the original parent session if any, else self"""returnself.parent_sessiondefcommit(self,reset_pool=True):"""commit the current session's transaction"""self.parent_session.commit(reset_pool)defrollback(self,reset_pool=True):"""rollback the current session's transaction"""self.parent_session.rollback(reset_pool)defclose(self):"""do not close pool on session close, since they are shared now"""self.rollback()defuser_data(self):"""returns a dictionnary with this user's information"""returnself.parent_session.user_data()classInternalSession(Session):"""special session created internaly by the repository"""def__init__(self,repo,cnxprops=None):super(InternalSession,self).__init__(InternalManager(),repo,cnxprops,_id='internal')self.user.req=self# XXX remove when "vreg = user.req.vreg" hack in entity.py is goneself.cnxtype='inmemory'self.is_internal_session=Trueself.is_super_session=True@propertydefsuper_session(self):returnselfclassInternalManager(object):"""a manager user with all access rights used internally for task such as bootstrapping the repository or creating regular users according to repository content """def__init__(self):self.eid=-1self.login=u'__internal_manager__'self.properties={}defmatching_groups(self,groups):return1defis_in_group(self,group):returnTruedefowns(self,eid):returnTruedefhas_permission(self,pname,contexteid=None):returnTruedefproperty_value(self,key):ifkey=='ui.language':return'en'returnNonefromloggingimportgetLoggerfromcubicwebimportset_log_methodsset_log_methods(Session,getLogger('cubicweb.session'))