diff -r 000000000000 -r b97547f5f1fa common/selectors.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/common/selectors.py Wed Nov 05 15:52:50 2008 +0100 @@ -0,0 +1,413 @@ +"""This file contains some basic selectors required by application objects. + +A selector is responsible to score how well an object may be used with a +given result set (publishing time selection) + +:organization: Logilab +:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr +""" + +__docformat__ = "restructuredtext en" + +from logilab.common.compat import all + +from cubicweb import Unauthorized +from cubicweb.cwvreg import DummyCursorError +from cubicweb.vregistry import chainall, chainfirst +from cubicweb.cwconfig import CubicWebConfiguration +from cubicweb.schema import split_expression + + +def lltrace(selector): + # don't wrap selectors if not in development mode + if CubicWebConfiguration.mode == 'installed': + return selector + def traced(cls, *args, **kwargs): + ret = selector(cls, *args, **kwargs) + cls.lldebug('selector %s returned %s for %s', selector.__name__, ret, cls) + return ret + return traced + +# very basic selectors ######################################################## + +def yes_selector(cls, *args, **kwargs): + """accept everything""" + return 1 + +@lltrace +def norset_selector(cls, req, rset, *args, **kwargs): + """accept no result set""" + if rset is None: + return 1 + return 0 + +@lltrace +def rset_selector(cls, req, rset, *args, **kwargs): + """accept result set, whatever the number of result""" + if rset is not None: + return 1 + return 0 + +@lltrace +def anyrset_selector(cls, req, rset, *args, **kwargs): + """accept any non empty result set""" + if rset and rset.rowcount: # XXX if rset is not None and rset.rowcount > 0: + return 1 + return 0 + +@lltrace +def emptyrset_selector(cls, req, rset, *args, **kwargs): + """accept empty result set""" + if rset is not None and rset.rowcount == 0: + return 1 + return 0 + +@lltrace +def onelinerset_selector(cls, req, rset, row=None, *args, **kwargs): + """accept result set with a single line of result""" + if rset is not None and (row is not None or rset.rowcount == 1): + return 1 + return 0 + +@lltrace +def twolinerset_selector(cls, req, rset, *args, **kwargs): + """accept result set with at least two lines of result""" + if rset is not None and rset.rowcount > 1: + return 1 + return 0 + +@lltrace +def twocolrset_selector(cls, req, rset, *args, **kwargs): + """accept result set with at least one line and two columns of result""" + if rset is not None and rset.rowcount > 0 and len(rset.rows[0]) > 1: + return 1 + return 0 + +@lltrace +def largerset_selector(cls, req, rset, *args, **kwargs): + """accept result sets with more rows than the page size + """ + if rset is None or len(rset) <= req.property_value('navigation.page-size'): + return 0 + return 1 + +@lltrace +def sortedrset_selector(cls, req, rset, row=None, col=None): + """accept sorted result set""" + rqlst = rset.syntax_tree() + if len(rqlst.children) > 1 or not rqlst.children[0].orderby: + return 0 + return 2 + +@lltrace +def oneetyperset_selector(cls, req, rset, *args, **kwargs): + """accept result set where entities in the first columns are all of the + same type + """ + if len(rset.column_types(0)) != 1: + return 0 + return 1 + +@lltrace +def multitype_selector(cls, req, rset, **kwargs): + """accepts resultsets containing several entity types""" + if rset: + etypes = rset.column_types(0) + if len(etypes) > 1: + return 1 + return 0 + +@lltrace +def searchstate_selector(cls, req, rset, row=None, col=None, **kwargs): + """extend the anyrset_selector by checking if the current search state + is in a .search_states attribute of the wrapped class + + search state should be either 'normal' or 'linksearch' (eg searching for an + object to create a relation with another) + """ + try: + if not req.search_state[0] in cls.search_states: + return 0 + except AttributeError: + return 1 # class don't care about search state, accept it + return 1 + +@lltrace +def anonymous_selector(cls, req, *args, **kwargs): + """accept if user is anonymous""" + if req.cnx.anonymous_connection: + return 1 + return 0 + +@lltrace +def not_anonymous_selector(cls, req, *args, **kwargs): + """accept if user is anonymous""" + return not anonymous_selector(cls, req, *args, **kwargs) + + +# not so basic selectors ###################################################### + +@lltrace +def req_form_params_selector(cls, req, *args, **kwargs): + """check if parameters specified by the form_params attribute on + the wrapped class are specified in request form parameters + """ + score = 0 + for param in cls.form_params: + val = req.form.get(param) + if not val: + return 0 + score += 1 + return score + 1 + +@lltrace +def kwargs_selector(cls, req, *args, **kwargs): + """check if arguments specified by the expected_kwargs attribute on + the wrapped class are specified in given named parameters + """ + values = [] + for arg in cls.expected_kwargs: + if not arg in kwargs: + return 0 + return 1 + +@lltrace +def etype_form_selector(cls, req, *args, **kwargs): + """check etype presence in request form *and* accepts conformance""" + if 'etype' not in req.form and 'etype' not in kwargs: + return 0 + try: + etype = req.form['etype'] + except KeyError: + etype = kwargs['etype'] + # value is a list or a tuple if web request form received several + # values for etype parameter + assert isinstance(etype, basestring), "got multiple etype parameters in req.form" + if 'Any' in cls.accepts: + return 1 + # no Any found, we *need* exact match + if etype not in cls.accepts: + return 0 + # exact match must return a greater value than 'Any'-match + return 2 + +@lltrace +def _nfentity_selector(cls, req, rset, row=None, col=None, **kwargs): + """accept non final entities + if row is not specified, use the first one + if col is not specified, use the first one + """ + etype = rset.description[row or 0][col or 0] + if etype is None: # outer join + return 0 + if cls.schema.eschema(etype).is_final(): + return 0 + return 1 + +@lltrace +def _rqlcondition_selector(cls, req, rset, row=None, col=None, **kwargs): + """accept single entity result set if the entity match an rql condition + """ + if cls.condition: + eid = rset[row or 0][col or 0] + if 'U' in frozenset(split_expression(cls.condition)): + rql = 'Any X WHERE X eid %%(x)s, U eid %%(u)s, %s' % cls.condition + else: + rql = 'Any X WHERE X eid %%(x)s, %s' % cls.condition + try: + return len(req.execute(rql, {'x': eid, 'u': req.user.eid}, 'x')) + except Unauthorized: + return 0 + + return 1 + +@lltrace +def _interface_selector(cls, req, rset, row=None, col=None, **kwargs): + """accept uniform result sets, and apply the following rules: + + * wrapped class must have a accepts_interfaces attribute listing the + accepted ORed interfaces + * if row is None, return the sum of values returned by the method + for each entity's class in the result set. If any score is 0, + return 0. + * if row is specified, return the value returned by the method with + the entity's class of this row + """ + score = 0 + # check 'accepts' to give priority to more specific classes + if row is None: + for etype in rset.column_types(col or 0): + eclass = cls.vreg.etype_class(etype) + escore = 0 + for iface in cls.accepts_interfaces: + escore += iface.is_implemented_by(eclass) + if not escore: + return 0 + score += escore + if eclass.id in getattr(cls, 'accepts', ()): + score += 2 + return score + 1 + etype = rset.description[row][col or 0] + if etype is None: # outer join + return 0 + eclass = cls.vreg.etype_class(etype) + for iface in cls.accepts_interfaces: + score += iface.is_implemented_by(eclass) + if score: + if eclass.id in getattr(cls, 'accepts', ()): + score += 2 + else: + score += 1 + return score + +@lltrace +def score_entity_selector(cls, req, rset, row=None, col=None, **kwargs): + if row is None: + rows = xrange(rset.rowcount) + else: + rows = (row,) + for row in rows: + try: + score = cls.score_entity(rset.get_entity(row, col or 0)) + except DummyCursorError: + # get a dummy cursor error, that means we are currently + # using a dummy rset to list possible views for an entity + # type, not for an actual result set. In that case, we + # don't care of the value, consider the object as selectable + return 1 + if not score: + return 0 + return 1 + +@lltrace +def accept_rset_selector(cls, req, rset, row=None, col=None, **kwargs): + """simply delegate to cls.accept_rset method""" + return cls.accept_rset(req, rset, row=row, col=col) + +@lltrace +def but_etype_selector(cls, req, rset, row=None, col=None, **kwargs): + """restrict the searchstate_accept_one_selector to exclude entity's type + refered by the .etype attribute + """ + if rset.description[row or 0][col or 0] == cls.etype: + return 0 + return 1 + +@lltrace +def etype_rtype_selector(cls, req, rset, row=None, col=None, **kwargs): + """only check if the user has read access on the entity's type refered + by the .etype attribute and on the relations's type refered by the + .rtype attribute if set. + """ + schema = cls.schema + perm = getattr(cls, 'require_permission', 'read') + if hasattr(cls, 'etype'): + eschema = schema.eschema(cls.etype) + if not (eschema.has_perm(req, perm) or eschema.has_local_role(perm)): + return 0 + if hasattr(cls, 'rtype'): + if not schema.rschema(cls.rtype).has_perm(req, perm): + return 0 + return 1 + +@lltrace +def accept_rtype_selector(cls, req, rset, row=None, col=None, **kwargs): + if hasattr(cls, 'rtype'): + if row is None: + for etype in rset.column_types(col or 0): + if not cls.relation_possible(etype): + return 0 + elif not cls.relation_possible(rset.description[row][col or 0]): + return 0 + return 1 + +@lltrace +def one_has_relation_selector(cls, req, rset, row=None, col=None, **kwargs): + """check if the user has read access on the relations's type refered by the + .rtype attribute of the class, and if at least one entity type in the + result set has this relation. + """ + schema = cls.schema + perm = getattr(cls, 'require_permission', 'read') + if not schema.rschema(cls.rtype).has_perm(req, perm): + return 0 + if row is None: + for etype in rset.column_types(col or 0): + if cls.relation_possible(etype): + return 1 + elif cls.relation_possible(rset.description[row][col or 0]): + return 1 + return 0 + +@lltrace +def in_group_selector(cls, req, rset=None, row=None, col=None, **kwargs): + """select according to user's groups""" + if not cls.require_groups: + return 1 + user = req.user + if user is None: + return int('guests' in cls.require_groups) + score = 0 + if 'owners' in cls.require_groups and rset: + if row is not None: + eid = rset[row][col or 0] + if user.owns(eid): + score = 1 + else: + score = all(user.owns(r[col or 0]) for r in rset) + score += user.matching_groups(cls.require_groups) + if score: + # add 1 so that an object with one matching group take priority + # on an object without require_groups + return score + 1 + return 0 + +@lltrace +def add_etype_selector(cls, req, rset, row=None, col=None, **kwargs): + """only check if the user has add access on the entity's type refered + by the .etype attribute. + """ + if not cls.schema.eschema(cls.etype).has_perm(req, 'add'): + return 0 + return 1 + +@lltrace +def contextprop_selector(cls, req, rset, row=None, col=None, context=None, + **kwargs): + propval = req.property_value('%s.%s.context' % (cls.__registry__, cls.id)) + if not propval: + propval = cls.context + if context is not None and propval is not None and context != propval: + return 0 + return 1 + +@lltrace +def primaryview_selector(cls, req, rset, row=None, col=None, view=None, + **kwargs): + if view is not None and not view.is_primary(): + return 0 + return 1 + + +# compound selectors ########################################################## + +nfentity_selector = chainall(anyrset_selector, _nfentity_selector) +interface_selector = chainall(nfentity_selector, _interface_selector) + +accept_selector = chainall(nfentity_selector, accept_rset_selector) +accept_one_selector = chainall(onelinerset_selector, accept_selector) + +rqlcondition_selector = chainall(nfentity_selector, + onelinerset_selector, + _rqlcondition_selector) + +searchstate_accept_selector = chainall(anyrset_selector, searchstate_selector, + accept_selector) +searchstate_accept_one_selector = chainall(anyrset_selector, searchstate_selector, + accept_selector, rqlcondition_selector) +searchstate_accept_one_but_etype_selector = chainall(searchstate_accept_one_selector, + but_etype_selector) + +__all__ = [name for name in globals().keys() if name.endswith('selector')] +__all__ += ['chainall', 'chainfirst']