[reledit] do NOT peruse the likely ?vid=edition part
"""Repository users' and internal' sessions.:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"importsysimportthreadingfromtimeimporttimefromlogilab.common.deprecationimportobsoletefromrql.nodesimportVariableRef,Function,ETYPE_PYOBJ_MAP,etype_from_pyobjfromyamsimportBASE_TYPESfromcubicwebimportRequestSessionMixIn,Binary,UnknownEidfromcubicweb.dbapiimportConnectionPropertiesfromcubicweb.utilsimportmake_uidfromcubicweb.server.rqlrewriteimportRQLRewriterETYPE_PYOBJ_MAP[Binary]='Bytes'defis_final(rqlst,variable,args):# try to find if this is a final var or notforselectinrqlst.children:forsolinselect.solutions:etype=variable.get_type(sol,args)ifetypeisNone:continueifetypeinBASE_TYPES:returnTruereturnFalsedef_make_description(selected,args,solution):"""return a description for a result set"""description=[]forterminselected:description.append(term.get_type(solution,args))returndescriptionfromrqlimportstmtsasserthasattr(stmts.Union,'get_variable_variables'),"You need RQL > 0.18.3"classSession(RequestSessionMixIn):"""tie session id, user, connections pool and other session data all together """def__init__(self,user,repo,cnxprops=None,_id=None):super(Session,self).__init__(repo.vreg)self.id=_idormake_uid(user.login.encode('UTF8'))cnxprops=cnxpropsorConnectionProperties('inmemory')self.user=userself.repo=repoself.cnxtype=cnxprops.cnxtypeself.creation=time()self.timestamp=self.creationself.is_internal_session=Falseself.is_super_session=False# short cut to querier .execute methodself._execute=repo.querier.execute# shared data, used to communicate extra information between the client# and the rql serverself.data={}# i18n initializationself.set_language(cnxprops.lang)# internalsself._threaddata=threading.local()self._threads_in_transaction=set()self._closed=Falsedef__str__(self):return'<%ssession %s (%s 0x%x)>'%(self.cnxtype,self.user.login,self.id,id(self))# resource accessors ######################################################defactual_session(self):"""return the original parent session if any, else self"""returnselfdefetype_class(self,etype):"""return an entity class for the given entity type"""returnself.vreg.etype_class(etype)defsystem_sql(self,sql,args=None):"""return a sql cursor on the system database"""ifnotsql.split(None,1)[0].upper()=='SELECT':self.mode='write'returnself.pool.source('system').doexec(self,sql,args)defset_language(self,language):"""i18n configuration for translation"""vreg=self.vreglanguage=languageorself.user.property_value('ui.language')try:self._=self.__=vreg.config.translations[language]exceptKeyError:language=vreg.property_value('ui.language')try:self._=self.__=vreg.config.translations[language]exceptKeyError:self._=self.__=unicodeself.lang=languagedefchange_property(self,prop,value):assertprop=='lang'# this is the only one changeable property for nowself.set_language(value)# connection management ###################################################defget_mode(self):returngetattr(self._threaddata,'mode','read')defset_mode(self,value):self._threaddata.mode=valuemode=property(get_mode,set_mode,doc='transaction mode (read/write), resetted to read on ''commit / rollback')defget_commit_state(self):returngetattr(self._threaddata,'commit_state',None)defset_commit_state(self,value):self._threaddata.commit_state=valuecommit_state=property(get_commit_state,set_commit_state)@propertydefpool(self):"""connections pool, set according to transaction mode for each query"""returngetattr(self._threaddata,'pool',None)defset_pool(self):"""the session need a pool to execute some queries"""ifself._closed:raiseException('try to set pool on a closed session')ifself.poolisNone:# get pool first to avoid race-conditionself._threaddata.pool=pool=self.repo._get_pool()try:pool.pool_set()except:self._threaddata.pool=Noneself.repo._free_pool(pool)raiseself._threads_in_transaction.add(threading.currentThread())returnself._threaddata.pooldefreset_pool(self):"""the session is no longer using its pool, at least for some time"""# pool may be none if no operation has been done since last commit# or rollbackifself.poolisnotNoneandself.mode=='read':# even in read mode, we must release the current transactionpool=self.pooltry:self._threads_in_transaction.remove(threading.currentThread())exceptKeyError:passpool.pool_reset()self._threaddata.pool=None# free pool once everything is done to avoid race-conditionself.repo._free_pool(pool)def_touch(self):"""update latest session usage timestamp and reset mode to read"""self.timestamp=time()self.local_perm_cache.clear()self._threaddata.mode='read'# shared data handling ###################################################defget_shared_data(self,key,default=None,pop=False):"""return value associated to `key` in session data"""ifpop:returnself.data.pop(key,default)else:returnself.data.get(key,default)defset_shared_data(self,key,value,querydata=False):"""set value associated to `key` in session data"""ifquerydata:self.transaction_data[key]=valueelse:self.data[key]=value# request interface #######################################################defset_entity_cache(self,entity):# no entity cache in the server, too high risk of inconsistency# between pre/post hookspassdefentity_cache(self,eid):raiseKeyError(eid)defbase_url(self):url=self.repo.config['base-url']ifnoturl:try:url=self.repo.config.default_base_url()exceptAttributeError:# default_base_url() might not be availableself.warning('missing base-url definition in server config')url=u''returnurldeffrom_controller(self):"""return the id (string) of the controller issuing the request (no sense here, always return 'view') """return'view'defsource_defs(self):returnself.repo.source_defs()defdescribe(self,eid):"""return a tuple (type, sourceuri, extid) for the entity with id <eid>"""returnself.repo.type_and_source_from_eid(eid,self)# db-api like interface ###################################################defsource_from_eid(self,eid):"""return the source where the entity with id <eid> is located"""returnself.repo.source_from_eid(eid,self)defdecorate_rset(self,rset,propagate=False):rset.vreg=self.vregrset.req=propagateandselforself.actual_session()returnrset@propertydefsuper_session(self):try:csession=self._threaddata.childsessionexceptAttributeError:ifself.is_super_session:csession=selfelse:csession=ChildSession(self)self._threaddata.childsession=csession# need shared pool setself.set_pool()returncsessiondefunsafe_execute(self,rql,kwargs=None,eid_key=None,build_descr=False,propagate=False):"""like .execute but with security checking disabled (this method is internal to the server, it's not part of the db-api) if `propagate` is true, the super_session will be attached to the result set instead of the parent session, hence further query done through entities fetched from this result set will bypass security as well """returnself.super_session.execute(rql,kwargs,eid_key,build_descr,propagate)@propertydefcursor(self):"""return a rql cursor"""returnselfdefexecute(self,rql,kwargs=None,eid_key=None,build_descr=True,propagate=False):"""db-api like method directly linked to the querier execute method Becare that unlike actual cursor.execute, `build_descr` default to false """rset=self._execute(self,rql,kwargs,eid_key,build_descr)returnself.decorate_rset(rset,propagate)defcommit(self,reset_pool=True):"""commit the current session's transaction"""ifself.poolisNone:assertnotself.pending_operationsself.transaction_data.clear()self._touch()self.debug('commit session %s done (no db activity)',self.id)returnifself.commit_state:return# on rollback, an operation should have the following state# information:# - processed by the precommit/commit event or not# - if processed, is it the failed operationtry:fortrstatein('precommit','commit'):processed=[]self.commit_state=trstatetry:whileself.pending_operations:operation=self.pending_operations.pop(0)operation.processed=trstateprocessed.append(operation)operation.handle_event('%s_event'%trstate)self.pending_operations[:]=processedself.debug('%s session %s done',trstate,self.id)except:self.exception('error while %sing',trstate)operation.failed=Trueforoperationinprocessed:operation.handle_event('revert%s_event'%trstate)self.rollback(reset_pool)raiseself.pool.commit()finally:self._touch()self.commit_state=Noneself.pending_operations[:]=[]self.transaction_data.clear()ifreset_pool:self.reset_pool()defrollback(self,reset_pool=True):"""rollback the current session's transaction"""ifself.poolisNone:assertnotself.pending_operationsself.transaction_data.clear()self._touch()self.debug('rollback session %s done (no db activity)',self.id)returntry:whileself.pending_operations:try:operation=self.pending_operations.pop(0)operation.handle_event('rollback_event')except:self.critical('rollback error',exc_info=sys.exc_info())continueself.pool.rollback()self.debug('rollback for session %s done',self.id)finally:self._touch()self.pending_operations[:]=[]self.transaction_data.clear()ifreset_pool:self.reset_pool()defclose(self):"""do not close pool on session close, since they are shared now"""self._closed=True# copy since _threads_in_transaction maybe modified while waitingforthreadinself._threads_in_transaction.copy():ifthreadisthreading.currentThread():continueself.info('waiting for thread %s',thread)# do this loop/break instead of a simple join(10) in case thread is# the main thread (in which case it will be removed from# self._threads_in_transaction but still be alive...)foriinxrange(10):thread.join(1)ifnot(thread.isAlive()andthreadinself._threads_in_transaction):breakelse:self.error('thread %s still alive after 10 seconds, will close ''session anyway',thread)self.rollback()# transaction data/operations management ##################################@propertydeftransaction_data(self):try:returnself._threaddata.transaction_dataexceptAttributeError:self._threaddata.transaction_data={}returnself._threaddata.transaction_data@propertydefpending_operations(self):try:returnself._threaddata.pending_operationsexceptAttributeError:self._threaddata.pending_operations=[]returnself._threaddata.pending_operationsdefadd_operation(self,operation,index=None):"""add an observer"""assertself.commit_state!='commit'ifindexisnotNone:self.pending_operations.insert(index,operation)else:self.pending_operations.append(operation)# querier helpers #########################################################@propertydefrql_rewriter(self):try:returnself._threaddata._rewriterexceptAttributeError:self._threaddata._rewriter=RQLRewriter(self.repo.querier,self)returnself._threaddata._rewriterdefbuild_description(self,rqlst,args,result):"""build a description for a given result"""iflen(rqlst.children)==1andlen(rqlst.children[0].solutions)==1:# easy, all lines are identicalselected=rqlst.children[0].selectionsolution=rqlst.children[0].solutions[0]description=_make_description(selected,args,solution)return[tuple(description)]*len(result)# hard, delegate the work :o)returnself.manual_build_descr(rqlst,args,result)defmanual_build_descr(self,rqlst,args,result):"""build a description for a given result by analysing each row XXX could probably be done more efficiently during execution of query """# not so easy, looks for variable which changes from one solution# to anotherunstables=rqlst.get_variable_variables()basedescription=[]todetermine=[]selected=rqlst.children[0].selection# sample selectionfori,terminenumerate(selected):ifisinstance(term,Function)andterm.descr().rtypeisnotNone:basedescription.append(term.get_type(term.descr().rtype,args))continueforvrefinterm.get_nodes(VariableRef):ifvref.nameinunstables:basedescription.append(None)todetermine.append((i,is_final(rqlst,vref.variable,args)))breakelse:# sample etypeetype=rqlst.children[0].solutions[0]basedescription.append(term.get_type(etype,args))ifnottodetermine:return[tuple(basedescription)]*len(result)returnself._build_descr(result,basedescription,todetermine)def_build_descr(self,result,basedescription,todetermine):description=[]etype_from_eid=self.describeforrowinresult:row_descr=basedescriptionforindex,isfinalintodetermine:value=row[index]ifvalueisNone:# None value inserted by an outer join, no typerow_descr[index]=Nonecontinueifisfinal:row_descr[index]=etype_from_pyobj(value)else:try:row_descr[index]=etype_from_eid(value)[0]exceptUnknownEid:self.critical('wrong eid %s in repository, should check database'%value)row_descr[index]=row[index]=Nonedescription.append(tuple(row_descr))returndescription@obsolete('use direct access to session.transaction_data')defquery_data(self,key,default=None,setdefault=False,pop=False):ifsetdefault:assertnotpopreturnself.transaction_data.setdefault(key,default)ifpop:returnself.transaction_data.pop(key,default)else:returnself.transaction_data.get(key,default)@obsolete('use entity_from_eid(eid, etype=None)')defentity(self,eid):"""return a result set for the given eid"""returnself.eid_rset(eid).get_entity(0,0)classChildSession(Session):"""child (or internal) session are used to hijack the security system """cnxtype='inmemory'def__init__(self,parent_session):self.id=Noneself.is_internal_session=Falseself.is_super_session=True# session which has created this oneself.parent_session=parent_sessionself.user=InternalManager()self.repo=parent_session.repoself.vreg=parent_session.vregself.data=parent_session.dataself.encoding=parent_session.encodingself.lang=parent_session.langself._=self.__=parent_session._# short cut to querier .execute methodself._execute=self.repo.querier.execute@propertydefsuper_session(self):returnselfdefget_mode(self):returnself.parent_session.modedefset_mode(self,value):self.parent_session.set_mode(value)mode=property(get_mode,set_mode)defget_commit_state(self):returnself.parent_session.commit_statedefset_commit_state(self,value):self.parent_session.set_commit_state(value)commit_state=property(get_commit_state,set_commit_state)@propertydefpool(self):returnself.parent_session.pool@propertydefpending_operations(self):returnself.parent_session.pending_operations@propertydeftransaction_data(self):returnself.parent_session.transaction_datadefset_pool(self):"""the session need a pool to execute some queries"""self.parent_session.set_pool()defreset_pool(self):"""the session has no longer using its pool, at least for some time """self.parent_session.reset_pool()defactual_session(self):"""return the original parent session if any, else self"""returnself.parent_sessiondefcommit(self,reset_pool=True):"""commit the current session's transaction"""self.parent_session.commit(reset_pool)defrollback(self,reset_pool=True):"""rollback the current session's transaction"""self.parent_session.rollback(reset_pool)defclose(self):"""do not close pool on session close, since they are shared now"""self.rollback()defuser_data(self):"""returns a dictionnary with this user's information"""returnself.parent_session.user_data()classInternalSession(Session):"""special session created internaly by the repository"""def__init__(self,repo,cnxprops=None):super(InternalSession,self).__init__(_IMANAGER,repo,cnxprops,_id='internal')self.cnxtype='inmemory'self.is_internal_session=Trueself.is_super_session=True@propertydefsuper_session(self):returnselfclassInternalManager(object):"""a manager user with all access rights used internally for task such as bootstrapping the repository or creating regular users according to repository content """def__init__(self):self.eid=-1self.login=u'__internal_manager__'self.properties={}defmatching_groups(self,groups):return1defis_in_group(self,group):returnTruedefowns(self,eid):returnTruedefhas_permission(self,pname,contexteid=None):returnTruedefproperty_value(self,key):ifkey=='ui.language':return'en'returnNone_IMANAGER=InternalManager()fromloggingimportgetLoggerfromcubicwebimportset_log_methodsset_log_methods(Session,getLogger('cubicweb.session'))