"""This file contains some basic selectors required by application objects.A selector is responsible to score how well an object may be used with agiven result set (publishing time selection)If you have trouble with selectors, especially if the objet (typicallya view or a component) you want to use is not selected and you want toknow which one(s) of its selectors fail (e.g. returns 0), you can use`traced_selection` or even direclty `TRACED_OIDS`.`TRACED_OIDS` is a tuple of traced object ids. The special value'all' may be used to log selectors for all objects.For instance, say that the following code yields a `NoSelectableObject`exception:: self.view('calendar', myrset)You can log the selectors involved for *calendar* by replacing the lineabove by:: # in Python2.5 from cubicweb.common.selectors import traced_selection with traced_selection(): self.view('calendar', myrset) # in Python2.4 from cubicweb.common import selectors selectors.TRACED_OIDS = ('calendar',) self.view('calendar', myrset) selectors.TRACED_OIDS = ():organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"importloggingfromlogilab.common.compatimportallfromlogilab.common.deprecationimportdeprecated_functionfromcubicwebimportUnauthorized,NoSelectableObject,rolefromcubicweb.cwvregimportDummyCursorErrorfromcubicweb.vregistryimportchainall,chainfirst,NoSelectableObjectfromcubicweb.cwconfigimportCubicWebConfigurationfromcubicweb.schemaimportsplit_expression# helpers for debugging selectorsSELECTOR_LOGGER=logging.getLogger('cubicweb.selectors')TRACED_OIDS=()deflltrace(selector):# don't wrap selectors if not in development modeifCubicWebConfiguration.mode=='installed':returnselectordeftraced(cls,*args,**kwargs):ret=selector(cls,*args,**kwargs)ifTRACED_OIDS=='all'orcls.idinTRACED_OIDS:SELECTOR_LOGGER.critical('selector %s returned %s for %s',selector.__name__,ret,cls)returnrettraced.__name__=selector.__name__returntracedclasstraced_selection(object):"""selector debugging helper. Typical usage is : >>> with traced_selection(): ... # some code in which you want to debug selectors ... # for all objects or >>> with traced_selection( ('oid1', 'oid2') ): ... # some code in which you want to debug selectors ... # for objects with id 'oid1' and 'oid2' """def__init__(self,traced='all'):self.traced=traceddef__enter__(self):globalTRACED_OIDSTRACED_OIDS=self.traceddef__exit__(self,exctype,exc,traceback):globalTRACED_OIDSTRACED_OIDS=()returntracebackisNone# very basic selectors ########################################################defyes(cls,*args,**kwargs):"""accept everything"""return1yes_selector=deprecated_function(yes)@lltracedefnone_rset(cls,req,rset,*args,**kwargs):"""accept no result set"""ifrsetisNone:return1return0norset_selector=deprecated_function(none_rset)@lltracedefany_rset(cls,req,rset,*args,**kwargs):"""accept result set, whatever the number of result"""ifrsetisnotNone:return1return0rset_selector=deprecated_function(any_rset)@lltracedefnonempty_rset(cls,req,rset,*args,**kwargs):"""accept any non empty result set"""ifrsetisnotNoneandrset.rowcount:return1return0anyrset_selector=deprecated_function(nonempty_rset)@lltracedefempty_rset(cls,req,rset,*args,**kwargs):"""accept empty result set"""ifrsetisnotNoneandrset.rowcount==0:return1return0emptyrset_selector=deprecated_function(empty_rset)@lltracedefone_line_rset(cls,req,rset,row=None,*args,**kwargs):"""accept result set with a single line of result"""ifrsetisnotNoneand(rowisnotNoneorrset.rowcount==1):return1return0onelinerset_selector=deprecated_function(one_line_rset)@lltracedeftwo_lines_rset(cls,req,rset,*args,**kwargs):"""accept result set with *at least* two lines of result"""ifrsetisnotNoneandrset.rowcount>1:return1return0twolinerset_selector=deprecated_function(two_lines_rset)@lltracedeftwo_cols_rset(cls,req,rset,*args,**kwargs):"""accept result set with at least one line and two columns of result"""ifrsetisnotNoneandrset.rowcount>0andlen(rset.rows[0])>1:return1return0twocolrset_selector=deprecated_function(two_cols_rset)@lltracedefpaginated_rset(cls,req,rset,*args,**kwargs):"""accept result sets with more rows than the page size """page_size=kwargs.get('page_size')ifpage_sizeisNone:page_size=req.form.get('page_size')ifpage_sizeisNone:page_size=req.property_value('navigation.page-size')else:page_size=int(page_size)ifrsetisNoneorlen(rset)<=page_size:return0return1largerset_selector=deprecated_function(paginated_rset)@lltracedefsorted_rset(cls,req,rset,row=None,col=None,**kwargs):"""accept sorted result set"""rqlst=rset.syntax_tree()iflen(rqlst.children)>1ornotrqlst.children[0].orderby:return0return2sortedrset_selector=deprecated_function(sorted_rset)@lltracedefone_etype_rset(cls,req,rset,*args,**kwargs):"""accept result set where entities in the first columns are all of the same type """iflen(rset.column_types(0))!=1:return0return1oneetyperset_selector=deprecated_function(one_etype_rset)@lltracedeftwo_etypes_rset(cls,req,rset,**kwargs):"""accepts resultsets containing several entity types"""ifrset:etypes=rset.column_types(0)iflen(etypes)>1:return1return0multitype_selector=deprecated_function(two_etypes_rset)@lltracedefmatch_search_state(cls,req,rset,row=None,col=None,**kwargs):"""checks if the current search state is in a .search_states attribute of the wrapped class search state should be either 'normal' or 'linksearch' (eg searching for an object to create a relation with another) """try:ifnotreq.search_state[0]incls.search_states:return0exceptAttributeError:return1# class doesn't care about search state, accept itreturn1searchstate_selector=deprecated_function(match_search_state)@lltracedefanonymous_user(cls,req,*args,**kwargs):"""accept if user is anonymous"""ifreq.cnx.anonymous_connection:return1return0anonymous_selector=deprecated_function(anonymous_user)@lltracedefauthenticated_user(cls,req,*args,**kwargs):"""accept if user is authenticated"""returnnotanonymous_user(cls,req,*args,**kwargs)not_anonymous_selector=deprecated_function(authenticated_user)@lltracedefmatch_form_params(cls,req,*args,**kwargs):"""check if parameters specified by the form_params attribute on the wrapped class are specified in request form parameters """score=0forparamincls.form_params:val=req.form.get(param)ifnotval:return0score+=1returnscore+1req_form_params_selector=deprecated_function(match_form_params)@lltracedefmatch_kwargs(cls,req,*args,**kwargs):"""check if arguments specified by the expected_kwargs attribute on the wrapped class are specified in given named parameters """values=[]forargincls.expected_kwargs:ifnotarginkwargs:return0return1kwargs_selector=deprecated_function(match_kwargs)# not so basic selectors ######################################################@lltracedefaccept_etype(cls,req,*args,**kwargs):"""check etype presence in request form *and* accepts conformance"""if'etype'notinreq.formand'etype'notinkwargs:return0try:etype=req.form['etype']exceptKeyError:etype=kwargs['etype']# value is a list or a tuple if web request form received several# values for etype parameterassertisinstance(etype,basestring),"got multiple etype parameters in req.form"if'Any'incls.accepts:return1# no Any found, we *need* exact matchifetypenotincls.accepts:return0# exact match must return a greater value than 'Any'-matchreturn2etype_form_selector=deprecated_function(accept_etype)@lltracedef_non_final_entity(cls,req,rset,row=None,col=None,**kwargs):"""accept non final entities if row is not specified, use the first one if col is not specified, use the first one """etype=rset.description[rowor0][color0]ifetypeisNone:# outer joinreturn0ifcls.schema.eschema(etype).is_final():return0return1_nfentity_selector=deprecated_function(_non_final_entity)@lltracedef_rql_condition(cls,req,rset,row=None,col=None,**kwargs):"""accept single entity result set if the entity match an rql condition """ifcls.condition:eid=rset[rowor0][color0]if'U'infrozenset(split_expression(cls.condition)):rql='Any X WHERE X eid %%(x)s, U eid %%(u)s, %s'%cls.conditionelse:rql='Any X WHERE X eid %%(x)s, %s'%cls.conditiontry:returnlen(req.execute(rql,{'x':eid,'u':req.user.eid},'x'))exceptUnauthorized:return0return1_rqlcondition_selector=deprecated_function(_rql_condition)@lltracedef_implement_interface(cls,req,rset,row=None,col=None,**kwargs):"""accept uniform result sets, and apply the following rules: * wrapped class must have a accepts_interfaces attribute listing the accepted ORed interfaces * if row is None, return the sum of values returned by the method for each entity's class in the result set. If any score is 0, return 0. * if row is specified, return the value returned by the method with the entity's class of this row """# XXX this selector can be refactored : extract the code testing# for entity schema / interface compliancescore=0# check 'accepts' to give priority to more specific classesifrowisNone:foretypeinrset.column_types(color0):eclass=cls.vreg.etype_class(etype)escore=0forifaceincls.accepts_interfaces:escore+=iface.is_implemented_by(eclass)ifnotescore:return0score+=escoreaccepts=set(getattr(cls,'accepts',()))# if accepts is defined on the vobject, eclass must matchifaccepts:eschema=eclass.e_schemaetypes=set([eschema]+eschema.ancestors())ifaccepts&etypes:score+=2elif'Any'notinaccepts:return0returnscore+1etype=rset.description[row][color0]ifetypeisNone:# outer joinreturn0eclass=cls.vreg.etype_class(etype)forifaceincls.accepts_interfaces:score+=iface.is_implemented_by(eclass)ifscore:accepts=set(getattr(cls,'accepts',()))# if accepts is defined on the vobject, eclass must matchifaccepts:eschema=eclass.e_schemaetypes=set([eschema]+eschema.ancestors())ifaccepts&etypes:score+=1elif'Any'notinaccepts:return0score+=1returnscore_interface_selector=deprecated_function(_implement_interface)@lltracedefscore_entity_selector(cls,req,rset,row=None,col=None,**kwargs):ifrowisNone:rows=xrange(rset.rowcount)else:rows=(row,)forrowinrows:try:score=cls.score_entity(rset.get_entity(row,color0))exceptDummyCursorError:# get a dummy cursor error, that means we are currently# using a dummy rset to list possible views for an entity# type, not for an actual result set. In that case, we# don't care of the value, consider the object as selectablereturn1ifnotscore:return0return1@lltracedefaccept_rset(cls,req,rset,row=None,col=None,**kwargs):"""simply delegate to cls.accept_rset method"""returncls.accept_rset(req,rset,row=row,col=col)accept_rset_selector=deprecated_function(accept_rset)@lltracedefbut_etype(cls,req,rset,row=None,col=None,**kwargs):"""restrict the searchstate_accept_one_selector to exclude entity's type refered by the .etype attribute """ifrset.description[rowor0][color0]==cls.etype:return0return1but_etype_selector=deprecated_function(but_etype)@lltracedefetype_rtype_selector(cls,req,rset,row=None,col=None,**kwargs):"""only check if the user has read access on the entity's type refered by the .etype attribute and on the relations's type refered by the .rtype attribute if set. """schema=cls.schemaperm=getattr(cls,'require_permission','read')ifhasattr(cls,'etype'):eschema=schema.eschema(cls.etype)ifnot(eschema.has_perm(req,perm)oreschema.has_local_role(perm)):return0ifhasattr(cls,'rtype'):rschema=schema.rschema(cls.rtype)ifnot(rschema.has_perm(req,perm)orrschema.has_local_role(perm)):return0return1@lltracedefhas_relation(cls,req,rset,row=None,col=None,**kwargs):"""check if the user has read access on the relations's type refered by the .rtype attribute of the class, and if all entities types in the result set has this relation. """ifhasattr(cls,'rtype'):rschema=cls.schema.rschema(cls.rtype)perm=getattr(cls,'require_permission','read')ifnot(rschema.has_perm(req,perm)orrschema.has_local_role(perm)):return0ifrowisNone:foretypeinrset.column_types(color0):ifnotcls.relation_possible(etype):return0elifnotcls.relation_possible(rset.description[row][color0]):return0return1accept_rtype_selector=deprecated_function(has_relation)@lltracedefone_has_relation(cls,req,rset,row=None,col=None,**kwargs):"""check if the user has read access on the relations's type refered by the .rtype attribute of the class, and if at least one entity type in the result set has this relation. """rschema=cls.schema.rschema(cls.rtype)perm=getattr(cls,'require_permission','read')ifnot(rschema.has_perm(req,perm)orrschema.has_local_role(perm)):return0ifrowisNone:foretypeinrset.column_types(color0):ifcls.relation_possible(etype):return1elifcls.relation_possible(rset.description[row][color0]):return1return0one_has_relation_selector=deprecated_function(one_has_relation)@lltracedefhas_related_entities(cls,req,rset,row=None,col=None,**kwargs):returnbool(rset.get_entity(rowor0,color0).related(cls.rtype,role(cls)))@lltracedefmatch_user_group(cls,req,rset=None,row=None,col=None,**kwargs):"""select according to user's groups"""ifnotcls.require_groups:return1user=req.userifuserisNone:returnint('guests'incls.require_groups)score=0if'owners'incls.require_groupsandrset:ifrowisnotNone:eid=rset[row][color0]ifuser.owns(eid):score=1else:score=all(user.owns(r[color0])forrinrset)score+=user.matching_groups(cls.require_groups)ifscore:# add 1 so that an object with one matching group take priority# on an object without require_groupsreturnscore+1return0in_group_selector=deprecated_function(match_user_group)@lltracedefuser_can_add_etype(cls,req,rset,row=None,col=None,**kwargs):"""only check if the user has add access on the entity's type refered by the .etype attribute. """ifnotcls.schema.eschema(cls.etype).has_perm(req,'add'):return0return1add_etype_selector=deprecated_function(user_can_add_etype)@lltracedefmatch_context_prop(cls,req,rset,row=None,col=None,context=None,**kwargs):propval=req.property_value('%s.%s.context'%(cls.__registry__,cls.id))ifnotpropval:propval=cls.contextifcontextisnotNoneandpropvalandcontext!=propval:return0return1contextprop_selector=deprecated_function(match_context_prop)@lltracedefprimary_view(cls,req,rset,row=None,col=None,view=None,**kwargs):ifviewisnotNoneandnotview.is_primary():return0return1primaryview_selector=deprecated_function(primary_view)defappobject_selectable(registry,oid):"""return a selector that will have a positive score if an object for the given registry and object id is selectable for the input context """@lltracedefselector(cls,req,rset,*args,**kwargs):try:cls.vreg.select_object(registry,oid,req,rset,*args,**kwargs)return1exceptNoSelectableObject:return0returnselector# compound selectors ##########################################################non_final_entity=chainall(nonempty_rset,_non_final_entity)non_final_entity.__name__='non_final_entity'nfentity_selector=deprecated_function(non_final_entity)implement_interface=chainall(non_final_entity,_implement_interface)implement_interface.__name__='implement_interface'interface_selector=deprecated_function(implement_interface)accept=chainall(non_final_entity,accept_rset)accept.__name__='accept'accept_selector=deprecated_function(accept)accept_one=chainall(one_line_rset,accept)accept_one.__name__='accept_one'accept_one_selector=deprecated_function(accept_one)rql_condition=chainall(non_final_entity,one_line_rset,_rql_condition)rql_condition.__name__='rql_condition'rqlcondition_selector=deprecated_function(rql_condition)searchstate_accept=chainall(nonempty_rset,match_search_state,accept)searchstate_accept.__name__='searchstate_accept'searchstate_accept_selector=deprecated_function(searchstate_accept)searchstate_accept_one=chainall(one_line_rset,match_search_state,accept,_rql_condition)searchstate_accept_one.__name__='searchstate_accept_one'searchstate_accept_one_selector=deprecated_function(searchstate_accept_one)searchstate_accept_one_but_etype=chainall(searchstate_accept_one,but_etype)searchstate_accept_one_but_etype.__name__='searchstate_accept_one_but_etype'searchstate_accept_one_but_etype_selector=deprecated_function(searchstate_accept_one_but_etype)