--- a/schema.py Wed Nov 18 09:16:38 2009 +0100
+++ b/schema.py Thu Nov 19 12:55:47 2009 +0100
@@ -20,7 +20,8 @@
from logilab.common.compat import any
from yams import BadSchemaDefinition, buildobjs as ybo
-from yams.schema import Schema, ERSchema, EntitySchema, RelationSchema
+from yams.schema import Schema, ERSchema, EntitySchema, RelationSchema, \
+ RelationDefinitionSchema, PermissionMixIn
from yams.constraints import (BaseConstraint, StaticVocabularyConstraint,
FormatConstraint)
from yams.reader import (CONSTRAINTS, PyFileReader, SchemaLoader,
@@ -127,7 +128,7 @@
ERSchema.display_name = ERSchema_display_name
@cached
-def ERSchema_get_groups(self, action):
+def get_groups(self, action):
"""return the groups authorized to perform <action> on entities of
this type
@@ -140,28 +141,13 @@
assert action in self.ACTIONS, action
#assert action in self._groups, '%s %s' % (self, action)
try:
- return frozenset(g for g in self._groups[action] if isinstance(g, basestring))
+ return frozenset(g for g in self.permissions[action] if isinstance(g, basestring))
except KeyError:
return ()
-ERSchema.get_groups = ERSchema_get_groups
-
-def ERSchema_set_groups(self, action, groups):
- """set the groups allowed to perform <action> on entities of this type. Don't
- change rql expressions for the same action.
-
- :type action: str
- :param action: the name of a permission
-
- :type groups: list or tuple
- :param groups: names of the groups granted to do the given action
- """
- assert action in self.ACTIONS, action
- clear_cache(self, 'ERSchema_get_groups')
- self._groups[action] = tuple(groups) + self.get_rqlexprs(action)
-ERSchema.set_groups = ERSchema_set_groups
+PermissionMixIn.get_groups = get_groups
@cached
-def ERSchema_get_rqlexprs(self, action):
+def get_rqlexprs(self, action):
"""return the rql expressions representing queries to check the user is allowed
to perform <action> on entities of this type
@@ -174,27 +160,13 @@
assert action in self.ACTIONS, action
#assert action in self._rqlexprs, '%s %s' % (self, action)
try:
- return tuple(g for g in self._groups[action] if not isinstance(g, basestring))
+ return tuple(g for g in self.permissions[action] if not isinstance(g, basestring))
except KeyError:
return ()
-ERSchema.get_rqlexprs = ERSchema_get_rqlexprs
-
-def ERSchema_set_rqlexprs(self, action, rqlexprs):
- """set the rql expression allowing to perform <action> on entities of this type. Don't
- change groups for the same action.
-
- :type action: str
- :param action: the name of a permission
+PermissionMixIn.get_rqlexprs = get_rqlexprs
- :type rqlexprs: list or tuple
- :param rqlexprs: the rql expressions allowing the given action
- """
- assert action in self.ACTIONS, action
- clear_cache(self, 'ERSchema_get_rqlexprs')
- self._groups[action] = tuple(self.get_groups(action)) + tuple(rqlexprs)
-ERSchema.set_rqlexprs = ERSchema_set_rqlexprs
-
-def ERSchema_set_permissions(self, action, permissions):
+orig_set_action_permissions = PermissionMixIn.set_action_permissions
+def set_action_permissions(self, action, permissions):
"""set the groups and rql expressions allowing to perform <action> on
entities of this type
@@ -204,22 +176,12 @@
:type permissions: tuple
:param permissions: the groups and rql expressions allowing the given action
"""
- assert action in self.ACTIONS, action
- clear_cache(self, 'ERSchema_get_rqlexprs')
- clear_cache(self, 'ERSchema_get_groups')
- self._groups[action] = tuple(permissions)
-ERSchema.set_permissions = ERSchema_set_permissions
+ orig_set_action_permissions(self, action, tuple(permissions))
+ clear_cache(self, 'get_rqlexprs')
+ clear_cache(self, 'get_groups')
+PermissionMixIn.set_action_permissions = set_action_permissions
-def ERSchema_has_perm(self, session, action, *args, **kwargs):
- """return true if the action is granted globaly or localy"""
- try:
- self.check_perm(session, action, *args, **kwargs)
- return True
- except Unauthorized:
- return False
-ERSchema.has_perm = ERSchema_has_perm
-
-def ERSchema_has_local_role(self, action):
+def has_local_role(self, action):
"""return true if the action *may* be granted localy (eg either rql
expressions or the owners group are used in security definition)
@@ -230,9 +192,83 @@
if self.get_rqlexprs(action):
return True
if action in ('update', 'delete'):
- return self.has_group(action, 'owners')
+ return 'owners' in self.get_groups(action)
return False
-ERSchema.has_local_role = ERSchema_has_local_role
+PermissionMixIn.has_local_role = has_local_role
+
+def may_have_permission(self, action, req):
+ if action != 'read' and not (self.has_local_role('read') or
+ self.has_perm(req, 'read')):
+ return False
+ return self.has_local_role(action) or self.has_perm(req, action)
+PermissionMixIn.may_have_permission = may_have_permission
+
+def has_perm(self, session, action, **kwargs):
+ """return true if the action is granted globaly or localy"""
+ try:
+ self.check_perm(session, action, **kwargs)
+ return True
+ except Unauthorized:
+ return False
+PermissionMixIn.has_perm = has_perm
+
+def check_perm(self, session, action, **kwargs):
+ # NB: session may be a server session or a request object check user is
+ # in an allowed group, if so that's enough internal sessions should
+ # always stop there
+ groups = self.get_groups(action)
+ if session.user.matching_groups(groups):
+ return
+ # if 'owners' in allowed groups, check if the user actually owns this
+ # object, if so that's enough
+ if 'owners' in groups and 'eid' in kwargs and session.user.owns(kwargs['eid']):
+ return
+ # else if there is some rql expressions, check them
+ if any(rqlexpr.check(session, **kwargs)
+ for rqlexpr in self.get_rqlexprs(action)):
+ return
+ raise Unauthorized(action, str(self))
+PermissionMixIn.check_perm = check_perm
+
+
+RelationDefinitionSchema._RPROPERTIES['eid'] = None
+
+def rql_expression(self, expression, mainvars=None, eid=None):
+ """rql expression factory"""
+ if self.rtype.final:
+ return ERQLExpression(expression, mainvars, eid)
+ return RRQLExpression(expression, mainvars, eid)
+RelationDefinitionSchema.rql_expression = rql_expression
+
+orig_check_permission_definitions = RelationDefinitionSchema.check_permission_definitions
+def check_permission_definitions(self):
+ orig_check_permission_definitions(self)
+ schema = self.subject.schema
+ for action, groups in self.permissions.iteritems():
+ for group_or_rqlexpr in groups:
+ if action == 'read' and \
+ isinstance(group_or_rqlexpr, RQLExpression):
+ msg = "can't use rql expression for read permission of %s"
+ raise BadSchemaDefinition(msg % self)
+ elif self.final and isinstance(group_or_rqlexpr, RRQLExpression):
+ if schema.reading_from_database:
+ # we didn't have final relation earlier, so turn
+ # RRQLExpression into ERQLExpression now
+ rqlexpr = group_or_rqlexpr
+ newrqlexprs = [x for x in self.get_rqlexprs(action)
+ if not x is rqlexpr]
+ newrqlexprs.append(ERQLExpression(rqlexpr.expression,
+ rqlexpr.mainvars,
+ rqlexpr.eid))
+ self.set_rqlexprs(action, newrqlexprs)
+ else:
+ msg = "can't use RRQLExpression on %s, use an ERQLExpression"
+ raise BadSchemaDefinition(msg % self)
+ elif not self.final and \
+ isinstance(group_or_rqlexpr, ERQLExpression):
+ msg = "can't use ERQLExpression on %s, use a RRQLExpression"
+ raise BadSchemaDefinition(msg % self)
+RelationDefinitionSchema.check_permission_definitions = check_permission_definitions
def system_etypes(schema):
@@ -256,8 +292,8 @@
eid = getattr(edef, 'eid', None)
self.eid = eid
# take care: no _groups attribute when deep-copying
- if getattr(self, '_groups', None):
- for groups in self._groups.itervalues():
+ if getattr(self, 'permissions', None):
+ for groups in self.permissions.itervalues():
for group_or_rqlexpr in groups:
if isinstance(group_or_rqlexpr, RRQLExpression):
msg = "can't use RRQLExpression on an entity type, use an ERQLExpression (%s)"
@@ -304,7 +340,7 @@
if rschema.final:
if rschema == 'has_text':
has_has_text = True
- elif self.rproperty(rschema, 'fulltextindexed'):
+ elif self.rdef(rschema).get('fulltextindexed'):
may_need_has_text = True
elif rschema.fulltext_container:
if rschema.fulltext_container == 'subject':
@@ -329,32 +365,12 @@
"""return True if this entity type is used to build the schema"""
return self.type in SCHEMA_TYPES
- def check_perm(self, session, action, eid=None):
- # NB: session may be a server session or a request object
- user = session.user
- # check user is in an allowed group, if so that's enough
- # internal sessions should always stop there
- if user.matching_groups(self.get_groups(action)):
- return
- # if 'owners' in allowed groups, check if the user actually owns this
- # object, if so that's enough
- if eid is not None and 'owners' in self.get_groups(action) and \
- user.owns(eid):
- return
- # else if there is some rql expressions, check them
- if any(rqlexpr.check(session, eid)
- for rqlexpr in self.get_rqlexprs(action)):
- return
- raise Unauthorized(action, str(self))
-
def rql_expression(self, expression, mainvars=None, eid=None):
"""rql expression factory"""
return ERQLExpression(expression, mainvars, eid)
class CubicWebRelationSchema(RelationSchema):
- RelationSchema._RPROPERTIES['eid'] = None
- _perms_checked = False
def __init__(self, schema=None, rdef=None, eid=None, **kwargs):
if rdef is not None:
@@ -369,73 +385,52 @@
def meta(self):
return self.type in META_RTYPES
- def update(self, subjschema, objschema, rdef):
- super(CubicWebRelationSchema, self).update(subjschema, objschema, rdef)
- if not self._perms_checked and self._groups:
- for action, groups in self._groups.iteritems():
- for group_or_rqlexpr in groups:
- if action == 'read' and \
- isinstance(group_or_rqlexpr, RQLExpression):
- msg = "can't use rql expression for read permission of "\
- "a relation type (%s)"
- raise BadSchemaDefinition(msg % self.type)
- elif self.final and isinstance(group_or_rqlexpr, RRQLExpression):
- if self.schema.reading_from_database:
- # we didn't have final relation earlier, so turn
- # RRQLExpression into ERQLExpression now
- rqlexpr = group_or_rqlexpr
- newrqlexprs = [x for x in self.get_rqlexprs(action) if not x is rqlexpr]
- newrqlexprs.append(ERQLExpression(rqlexpr.expression,
- rqlexpr.mainvars,
- rqlexpr.eid))
- self.set_rqlexprs(action, newrqlexprs)
- else:
- msg = "can't use RRQLExpression on a final relation "\
- "type (eg attribute relation), use an ERQLExpression (%s)"
- raise BadSchemaDefinition(msg % self.type)
- elif not self.final and \
- isinstance(group_or_rqlexpr, ERQLExpression):
- msg = "can't use ERQLExpression on a relation type, use "\
- "a RRQLExpression (%s)"
- raise BadSchemaDefinition(msg % self.type)
- self._perms_checked = True
-
- def cardinality(self, subjtype, objtype, target):
- card = self.rproperty(subjtype, objtype, 'cardinality')
- return (target == 'subject' and card[0]) or \
- (target == 'object' and card[1])
-
def schema_relation(self):
"""return True if this relation type is used to build the schema"""
return self.type in SCHEMA_TYPES
- def physical_mode(self):
- """return an appropriate mode for physical storage of this relation type:
- * 'subjectinline' if every possible subject cardinalities are 1 or ?
- * 'objectinline' if 'subjectinline' mode is not possible but every
- possible object cardinalities are 1 or ?
- * None if neither 'subjectinline' and 'objectinline'
- """
- assert not self.final
- return self.inlined and 'subjectinline' or None
+ def may_have_permission(self, action, req, eschema=None, role=None):
+ if eschema is not None:
+ for tschema in rschema.targets(eschema, role):
+ rdef = rschema.role_rdef(eschema, tschema, role)
+ if rdef.may_have_permission(action, req):
+ return True
+ else:
+ for rdef in self.rdefs.itervalues():
+ if rdef.may_have_permission(action, req):
+ return True
+ return False
- def check_perm(self, session, action, *args, **kwargs):
- # NB: session may be a server session or a request object check user is
- # in an allowed group, if so that's enough internal sessions should
- # always stop there
- if session.user.matching_groups(self.get_groups(action)):
- return
- # else if there is some rql expressions, check them
- if any(rqlexpr.check(session, *args, **kwargs)
- for rqlexpr in self.get_rqlexprs(action)):
- return
- raise Unauthorized(action, str(self))
+ def has_perm(self, session, action, **kwargs):
+ """return true if the action is granted globaly or localy"""
+ if 'fromeid' in kwargs:
+ subjtype = session.describe(kwargs['fromeid'])
+ else:
+ subjtype = None
+ if 'toeid' in kwargs:
+ objtype = session.describe(kwargs['toeid'])
+ else:
+ objtype = Nono
+ if objtype and subjtype:
+ return self.rdef(subjtype, objtype).has_perm(session, action, **kwargs)
+ elif subjtype:
+ for tschema in rschema.targets(subjtype, 'subject'):
+ rdef = rschema.rdef(subjtype, tschema)
+ if not rdef.has_perm(action, req, **kwargs):
+ return False
+ elif objtype:
+ for tschema in rschema.targets(objtype, 'object'):
+ rdef = rschema.rdef(tschema, objtype)
+ if not rdef.has_perm(action, req, **kwargs):
+ return False
+ else:
+ for rdef in self.rdefs.itervalues():
+ if not rdef.has_perm(action, req, **kwargs):
+ return False
- def rql_expression(self, expression, mainvars=None, eid=None):
- """rql expression factory"""
- if self.final:
- return ERQLExpression(expression, mainvars, eid)
- return RRQLExpression(expression, mainvars, eid)
+ @deprecated('use .rdef(subjtype, objtype).role_cardinality(role)')
+ def cardinality(self, subjtype, objtype, target):
+ return self.rdef(subjtype, objtype).role_cardinality(target)
class CubicWebSchema(Schema):
@@ -460,13 +455,10 @@
ybo.register_base_types(self)
rschema = self.add_relation_type(ybo.RelationType('eid'))
rschema.final = True
- rschema.set_default_groups()
rschema = self.add_relation_type(ybo.RelationType('has_text'))
rschema.final = True
- rschema.set_default_groups()
rschema = self.add_relation_type(ybo.RelationType('identity'))
rschema.final = False
- rschema.set_default_groups()
def add_entity_type(self, edef):
edef.name = edef.name.encode()
@@ -508,11 +500,10 @@
rdef.name = rdef.name.lower()
rdef.subject = bw_normalize_etype(rdef.subject)
rdef.object = bw_normalize_etype(rdef.object)
- if super(CubicWebSchema, self).add_relation_def(rdef):
+ rdefs = super(CubicWebSchema, self).add_relation_def(rdef)
+ if rdefs:
try:
- self._eid_index[rdef.eid] = (self.eschema(rdef.subject),
- self.rschema(rdef.name),
- self.eschema(rdef.object))
+ self._eid_index[rdef.eid] = rdefs
except AttributeError:
pass # not a serialized schema
@@ -523,7 +514,9 @@
def del_relation_def(self, subjtype, rtype, objtype):
for k, v in self._eid_index.items():
- if v == (subjtype, rtype, objtype):
+ if not isinstance(v, RelationDefinitionSchema):
+ continue
+ if v.subject == subjtype and v.rtype == rtype and v.object == objtype:
del self._eid_index[k]
break
super(CubicWebSchema, self).del_relation_def(subjtype, rtype, objtype)
@@ -654,6 +647,11 @@
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.full_rql)
+ def __cmp__(self, other):
+ if hasattr(other, 'expression'):
+ return cmp(other.expression, self.expression)
+ return False
+
def __deepcopy__(self, memo):
return self.__class__(self.expression, self.mainvars)
def __getstate__(self):
@@ -755,7 +753,7 @@
for eaction, var, col in has_perm_defs:
for i in xrange(len(rset)):
eschema = get_eschema(rset.description[i][col])
- eschema.check_perm(session, eaction, rset[i][col])
+ eschema.check_perm(session, eaction, eid=rset[i][col])
if self.eid is not None:
session.local_perm_cache[key] = True
return True