"""This file contains some basic selectors required by application objects.
A selector is responsible to score how well an object may be used with a
given result set (publishing time selection)
:organization: Logilab
:copyright: 2001-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
"""
__docformat__ = "restructuredtext en"
from logilab.common.compat import all
from cubicweb import Unauthorized
from cubicweb.cwvreg import DummyCursorError
from cubicweb.vregistry import chainall, chainfirst
from cubicweb.cwconfig import CubicWebConfiguration
from cubicweb.schema import split_expression
def lltrace(selector):
# don't wrap selectors if not in development mode
if CubicWebConfiguration.mode == 'installed':
return selector
def traced(cls, *args, **kwargs):
ret = selector(cls, *args, **kwargs)
cls.lldebug('selector %s returned %s for %s', selector.__name__, ret, cls)
return ret
return traced
# very basic selectors ########################################################
def yes_selector(cls, *args, **kwargs):
"""accept everything"""
return 1
@lltrace
def norset_selector(cls, req, rset, *args, **kwargs):
"""accept no result set"""
if rset is None:
return 1
return 0
@lltrace
def rset_selector(cls, req, rset, *args, **kwargs):
"""accept result set, whatever the number of result"""
if rset is not None:
return 1
return 0
@lltrace
def anyrset_selector(cls, req, rset, *args, **kwargs):
"""accept any non empty result set"""
if rset and rset.rowcount: # XXX if rset is not None and rset.rowcount > 0:
return 1
return 0
@lltrace
def emptyrset_selector(cls, req, rset, *args, **kwargs):
"""accept empty result set"""
if rset is not None and rset.rowcount == 0:
return 1
return 0
@lltrace
def onelinerset_selector(cls, req, rset, row=None, *args, **kwargs):
"""accept result set with a single line of result"""
if rset is not None and (row is not None or rset.rowcount == 1):
return 1
return 0
@lltrace
def twolinerset_selector(cls, req, rset, *args, **kwargs):
"""accept result set with at least two lines of result"""
if rset is not None and rset.rowcount > 1:
return 1
return 0
@lltrace
def twocolrset_selector(cls, req, rset, *args, **kwargs):
"""accept result set with at least one line and two columns of result"""
if rset is not None and rset.rowcount > 0 and len(rset.rows[0]) > 1:
return 1
return 0
@lltrace
def largerset_selector(cls, req, rset, *args, **kwargs):
"""accept result sets with more rows than the page size
"""
if rset is None or len(rset) <= req.property_value('navigation.page-size'):
return 0
return 1
@lltrace
def sortedrset_selector(cls, req, rset, row=None, col=None):
"""accept sorted result set"""
rqlst = rset.syntax_tree()
if len(rqlst.children) > 1 or not rqlst.children[0].orderby:
return 0
return 2
@lltrace
def oneetyperset_selector(cls, req, rset, *args, **kwargs):
"""accept result set where entities in the first columns are all of the
same type
"""
if len(rset.column_types(0)) != 1:
return 0
return 1
@lltrace
def multitype_selector(cls, req, rset, **kwargs):
"""accepts resultsets containing several entity types"""
if rset:
etypes = rset.column_types(0)
if len(etypes) > 1:
return 1
return 0
@lltrace
def searchstate_selector(cls, req, rset, row=None, col=None, **kwargs):
"""extend the anyrset_selector by checking if the current search state
is in a .search_states attribute of the wrapped class
search state should be either 'normal' or 'linksearch' (eg searching for an
object to create a relation with another)
"""
try:
if not req.search_state[0] in cls.search_states:
return 0
except AttributeError:
return 1 # class don't care about search state, accept it
return 1
@lltrace
def anonymous_selector(cls, req, *args, **kwargs):
"""accept if user is anonymous"""
if req.cnx.anonymous_connection:
return 1
return 0
@lltrace
def not_anonymous_selector(cls, req, *args, **kwargs):
"""accept if user is anonymous"""
return not anonymous_selector(cls, req, *args, **kwargs)
# not so basic selectors ######################################################
@lltrace
def req_form_params_selector(cls, req, *args, **kwargs):
"""check if parameters specified by the form_params attribute on
the wrapped class are specified in request form parameters
"""
score = 0
for param in cls.form_params:
val = req.form.get(param)
if not val:
return 0
score += 1
return score + 1
@lltrace
def kwargs_selector(cls, req, *args, **kwargs):
"""check if arguments specified by the expected_kwargs attribute on
the wrapped class are specified in given named parameters
"""
values = []
for arg in cls.expected_kwargs:
if not arg in kwargs:
return 0
return 1
@lltrace
def etype_form_selector(cls, req, *args, **kwargs):
"""check etype presence in request form *and* accepts conformance"""
if 'etype' not in req.form and 'etype' not in kwargs:
return 0
try:
etype = req.form['etype']
except KeyError:
etype = kwargs['etype']
# value is a list or a tuple if web request form received several
# values for etype parameter
assert isinstance(etype, basestring), "got multiple etype parameters in req.form"
if 'Any' in cls.accepts:
return 1
# no Any found, we *need* exact match
if etype not in cls.accepts:
return 0
# exact match must return a greater value than 'Any'-match
return 2
@lltrace
def _nfentity_selector(cls, req, rset, row=None, col=None, **kwargs):
"""accept non final entities
if row is not specified, use the first one
if col is not specified, use the first one
"""
etype = rset.description[row or 0][col or 0]
if etype is None: # outer join
return 0
if cls.schema.eschema(etype).is_final():
return 0
return 1
@lltrace
def _rqlcondition_selector(cls, req, rset, row=None, col=None, **kwargs):
"""accept single entity result set if the entity match an rql condition
"""
if cls.condition:
eid = rset[row or 0][col or 0]
if 'U' in frozenset(split_expression(cls.condition)):
rql = 'Any X WHERE X eid %%(x)s, U eid %%(u)s, %s' % cls.condition
else:
rql = 'Any X WHERE X eid %%(x)s, %s' % cls.condition
try:
return len(req.execute(rql, {'x': eid, 'u': req.user.eid}, 'x'))
except Unauthorized:
return 0
return 1
@lltrace
def _interface_selector(cls, req, rset, row=None, col=None, **kwargs):
"""accept uniform result sets, and apply the following rules:
* wrapped class must have a accepts_interfaces attribute listing the
accepted ORed interfaces
* if row is None, return the sum of values returned by the method
for each entity's class in the result set. If any score is 0,
return 0.
* if row is specified, return the value returned by the method with
the entity's class of this row
"""
score = 0
# check 'accepts' to give priority to more specific classes
if row is None:
for etype in rset.column_types(col or 0):
eclass = cls.vreg.etype_class(etype)
escore = 0
for iface in cls.accepts_interfaces:
escore += iface.is_implemented_by(eclass)
if not escore:
return 0
score += escore
if eclass.id in getattr(cls, 'accepts', ()):
score += 2
return score + 1
etype = rset.description[row][col or 0]
if etype is None: # outer join
return 0
eclass = cls.vreg.etype_class(etype)
for iface in cls.accepts_interfaces:
score += iface.is_implemented_by(eclass)
if score:
if eclass.id in getattr(cls, 'accepts', ()):
score += 2
else:
score += 1
return score
@lltrace
def score_entity_selector(cls, req, rset, row=None, col=None, **kwargs):
if row is None:
rows = xrange(rset.rowcount)
else:
rows = (row,)
for row in rows:
try:
score = cls.score_entity(rset.get_entity(row, col or 0))
except DummyCursorError:
# get a dummy cursor error, that means we are currently
# using a dummy rset to list possible views for an entity
# type, not for an actual result set. In that case, we
# don't care of the value, consider the object as selectable
return 1
if not score:
return 0
return 1
@lltrace
def accept_rset_selector(cls, req, rset, row=None, col=None, **kwargs):
"""simply delegate to cls.accept_rset method"""
return cls.accept_rset(req, rset, row=row, col=col)
@lltrace
def but_etype_selector(cls, req, rset, row=None, col=None, **kwargs):
"""restrict the searchstate_accept_one_selector to exclude entity's type
refered by the .etype attribute
"""
if rset.description[row or 0][col or 0] == cls.etype:
return 0
return 1
@lltrace
def etype_rtype_selector(cls, req, rset, row=None, col=None, **kwargs):
"""only check if the user has read access on the entity's type refered
by the .etype attribute and on the relations's type refered by the
.rtype attribute if set.
"""
schema = cls.schema
perm = getattr(cls, 'require_permission', 'read')
if hasattr(cls, 'etype'):
eschema = schema.eschema(cls.etype)
if not (eschema.has_perm(req, perm) or eschema.has_local_role(perm)):
return 0
if hasattr(cls, 'rtype'):
if not schema.rschema(cls.rtype).has_perm(req, perm):
return 0
return 1
@lltrace
def accept_rtype_selector(cls, req, rset, row=None, col=None, **kwargs):
if hasattr(cls, 'rtype'):
if row is None:
for etype in rset.column_types(col or 0):
if not cls.relation_possible(etype):
return 0
elif not cls.relation_possible(rset.description[row][col or 0]):
return 0
return 1
@lltrace
def one_has_relation_selector(cls, req, rset, row=None, col=None, **kwargs):
"""check if the user has read access on the relations's type refered by the
.rtype attribute of the class, and if at least one entity type in the
result set has this relation.
"""
schema = cls.schema
perm = getattr(cls, 'require_permission', 'read')
if not schema.rschema(cls.rtype).has_perm(req, perm):
return 0
if row is None:
for etype in rset.column_types(col or 0):
if cls.relation_possible(etype):
return 1
elif cls.relation_possible(rset.description[row][col or 0]):
return 1
return 0
@lltrace
def in_group_selector(cls, req, rset=None, row=None, col=None, **kwargs):
"""select according to user's groups"""
if not cls.require_groups:
return 1
user = req.user
if user is None:
return int('guests' in cls.require_groups)
score = 0
if 'owners' in cls.require_groups and rset:
if row is not None:
eid = rset[row][col or 0]
if user.owns(eid):
score = 1
else:
score = all(user.owns(r[col or 0]) for r in rset)
score += user.matching_groups(cls.require_groups)
if score:
# add 1 so that an object with one matching group take priority
# on an object without require_groups
return score + 1
return 0
@lltrace
def add_etype_selector(cls, req, rset, row=None, col=None, **kwargs):
"""only check if the user has add access on the entity's type refered
by the .etype attribute.
"""
if not cls.schema.eschema(cls.etype).has_perm(req, 'add'):
return 0
return 1
@lltrace
def contextprop_selector(cls, req, rset, row=None, col=None, context=None,
**kwargs):
propval = req.property_value('%s.%s.context' % (cls.__registry__, cls.id))
if not propval:
propval = cls.context
if context is not None and propval is not None and context != propval:
return 0
return 1
@lltrace
def primaryview_selector(cls, req, rset, row=None, col=None, view=None,
**kwargs):
if view is not None and not view.is_primary():
return 0
return 1
def appobject_selectable(registry, oid):
"""return a selector that will have a positive score if an object for the
given registry and object id is selectable for the input context
"""
@lltrace
def selector(cls, req, rset, *args, **kwargs):
try:
cls.vreg.select_object(registry, oid, req, rset, *args, **kwargs)
return 1
except NoSelectableObject:
return 0
return selector
# compound selectors ##########################################################
nfentity_selector = chainall(anyrset_selector, _nfentity_selector)
interface_selector = chainall(nfentity_selector, _interface_selector)
accept_selector = chainall(nfentity_selector, accept_rset_selector)
accept_one_selector = chainall(onelinerset_selector, accept_selector)
rqlcondition_selector = chainall(nfentity_selector,
onelinerset_selector,
_rqlcondition_selector)
searchstate_accept_selector = chainall(anyrset_selector, searchstate_selector,
accept_selector)
searchstate_accept_one_selector = chainall(anyrset_selector, searchstate_selector,
accept_selector, rqlcondition_selector)
searchstate_accept_one_but_etype_selector = chainall(searchstate_accept_one_selector,
but_etype_selector)
__all__ = [name for name in globals().keys() if name.endswith('selector')]
__all__ += ['chainall', 'chainfirst']