schema.py
branchreldefsecurity
changeset 3877 7ca53fc72a0a
parent 3827 c7142a4e3470
child 3890 d7a270f50f54
--- a/schema.py	Wed Nov 18 09:16:38 2009 +0100
+++ b/schema.py	Thu Nov 19 12:55:47 2009 +0100
@@ -20,7 +20,8 @@
 from logilab.common.compat import any
 
 from yams import BadSchemaDefinition, buildobjs as ybo
-from yams.schema import Schema, ERSchema, EntitySchema, RelationSchema
+from yams.schema import Schema, ERSchema, EntitySchema, RelationSchema, \
+     RelationDefinitionSchema, PermissionMixIn
 from yams.constraints import (BaseConstraint, StaticVocabularyConstraint,
                               FormatConstraint)
 from yams.reader import (CONSTRAINTS, PyFileReader, SchemaLoader,
@@ -127,7 +128,7 @@
 ERSchema.display_name = ERSchema_display_name
 
 @cached
-def ERSchema_get_groups(self, action):
+def get_groups(self, action):
     """return the groups authorized to perform <action> on entities of
     this type
 
@@ -140,28 +141,13 @@
     assert action in self.ACTIONS, action
     #assert action in self._groups, '%s %s' % (self, action)
     try:
-        return frozenset(g for g in self._groups[action] if isinstance(g, basestring))
+        return frozenset(g for g in self.permissions[action] if isinstance(g, basestring))
     except KeyError:
         return ()
-ERSchema.get_groups = ERSchema_get_groups
-
-def ERSchema_set_groups(self, action, groups):
-    """set the groups allowed to perform <action> on entities of this type. Don't
-    change rql expressions for the same action.
-
-    :type action: str
-    :param action: the name of a permission
-
-    :type groups: list or tuple
-    :param groups: names of the groups granted to do the given action
-    """
-    assert action in self.ACTIONS, action
-    clear_cache(self, 'ERSchema_get_groups')
-    self._groups[action] = tuple(groups) + self.get_rqlexprs(action)
-ERSchema.set_groups = ERSchema_set_groups
+PermissionMixIn.get_groups = get_groups
 
 @cached
-def ERSchema_get_rqlexprs(self, action):
+def get_rqlexprs(self, action):
     """return the rql expressions representing queries to check the user is allowed
     to perform <action> on entities of this type
 
@@ -174,27 +160,13 @@
     assert action in self.ACTIONS, action
     #assert action in self._rqlexprs, '%s %s' % (self, action)
     try:
-        return tuple(g for g in self._groups[action] if not isinstance(g, basestring))
+        return tuple(g for g in self.permissions[action] if not isinstance(g, basestring))
     except KeyError:
         return ()
-ERSchema.get_rqlexprs = ERSchema_get_rqlexprs
-
-def ERSchema_set_rqlexprs(self, action, rqlexprs):
-    """set the rql expression allowing to perform <action> on entities of this type. Don't
-    change groups for the same action.
-
-    :type action: str
-    :param action: the name of a permission
+PermissionMixIn.get_rqlexprs = get_rqlexprs
 
-    :type rqlexprs: list or tuple
-    :param rqlexprs: the rql expressions allowing the given action
-    """
-    assert action in self.ACTIONS, action
-    clear_cache(self, 'ERSchema_get_rqlexprs')
-    self._groups[action] = tuple(self.get_groups(action)) + tuple(rqlexprs)
-ERSchema.set_rqlexprs = ERSchema_set_rqlexprs
-
-def ERSchema_set_permissions(self, action, permissions):
+orig_set_action_permissions = PermissionMixIn.set_action_permissions
+def set_action_permissions(self, action, permissions):
     """set the groups and rql expressions allowing to perform <action> on
     entities of this type
 
@@ -204,22 +176,12 @@
     :type permissions: tuple
     :param permissions: the groups and rql expressions allowing the given action
     """
-    assert action in self.ACTIONS, action
-    clear_cache(self, 'ERSchema_get_rqlexprs')
-    clear_cache(self, 'ERSchema_get_groups')
-    self._groups[action] = tuple(permissions)
-ERSchema.set_permissions = ERSchema_set_permissions
+    orig_set_action_permissions(self, action, tuple(permissions))
+    clear_cache(self, 'get_rqlexprs')
+    clear_cache(self, 'get_groups')
+PermissionMixIn.set_action_permissions = set_action_permissions
 
-def ERSchema_has_perm(self, session, action, *args, **kwargs):
-    """return true if the action is granted globaly or localy"""
-    try:
-        self.check_perm(session, action, *args, **kwargs)
-        return True
-    except Unauthorized:
-        return False
-ERSchema.has_perm = ERSchema_has_perm
-
-def ERSchema_has_local_role(self, action):
+def has_local_role(self, action):
     """return true if the action *may* be granted localy (eg either rql
     expressions or the owners group are used in security definition)
 
@@ -230,9 +192,83 @@
     if self.get_rqlexprs(action):
         return True
     if action in ('update', 'delete'):
-        return self.has_group(action, 'owners')
+        return 'owners' in self.get_groups(action)
     return False
-ERSchema.has_local_role = ERSchema_has_local_role
+PermissionMixIn.has_local_role = has_local_role
+
+def may_have_permission(self, action, req):
+    if action != 'read' and not (self.has_local_role('read') or
+                                 self.has_perm(req, 'read')):
+        return False
+    return self.has_local_role(action) or self.has_perm(req, action)
+PermissionMixIn.may_have_permission = may_have_permission
+
+def has_perm(self, session, action, **kwargs):
+    """return true if the action is granted globaly or localy"""
+    try:
+        self.check_perm(session, action, **kwargs)
+        return True
+    except Unauthorized:
+        return False
+PermissionMixIn.has_perm = has_perm
+
+def check_perm(self, session, action, **kwargs):
+    # NB: session may be a server session or a request object check user is
+    # in an allowed group, if so that's enough internal sessions should
+    # always stop there
+    groups = self.get_groups(action)
+    if session.user.matching_groups(groups):
+        return
+    # if 'owners' in allowed groups, check if the user actually owns this
+    # object, if so that's enough
+    if 'owners' in groups and 'eid' in kwargs and session.user.owns(kwargs['eid']):
+        return
+    # else if there is some rql expressions, check them
+    if any(rqlexpr.check(session, **kwargs)
+           for rqlexpr in self.get_rqlexprs(action)):
+        return
+    raise Unauthorized(action, str(self))
+PermissionMixIn.check_perm = check_perm
+
+
+RelationDefinitionSchema._RPROPERTIES['eid'] = None
+
+def rql_expression(self, expression, mainvars=None, eid=None):
+    """rql expression factory"""
+    if self.rtype.final:
+        return ERQLExpression(expression, mainvars, eid)
+    return RRQLExpression(expression, mainvars, eid)
+RelationDefinitionSchema.rql_expression = rql_expression
+
+orig_check_permission_definitions = RelationDefinitionSchema.check_permission_definitions
+def check_permission_definitions(self):
+    orig_check_permission_definitions(self)
+    schema = self.subject.schema
+    for action, groups in self.permissions.iteritems():
+        for group_or_rqlexpr in groups:
+            if action == 'read' and \
+                   isinstance(group_or_rqlexpr, RQLExpression):
+                msg = "can't use rql expression for read permission of %s"
+                raise BadSchemaDefinition(msg % self)
+            elif self.final and isinstance(group_or_rqlexpr, RRQLExpression):
+                if schema.reading_from_database:
+                    # we didn't have final relation earlier, so turn
+                    # RRQLExpression into ERQLExpression now
+                    rqlexpr = group_or_rqlexpr
+                    newrqlexprs = [x for x in self.get_rqlexprs(action)
+                                   if not x is rqlexpr]
+                    newrqlexprs.append(ERQLExpression(rqlexpr.expression,
+                                                      rqlexpr.mainvars,
+                                                      rqlexpr.eid))
+                    self.set_rqlexprs(action, newrqlexprs)
+                else:
+                    msg = "can't use RRQLExpression on %s, use an ERQLExpression"
+                    raise BadSchemaDefinition(msg % self)
+            elif not self.final and \
+                     isinstance(group_or_rqlexpr, ERQLExpression):
+                msg = "can't use ERQLExpression on %s, use a RRQLExpression"
+                raise BadSchemaDefinition(msg % self)
+RelationDefinitionSchema.check_permission_definitions = check_permission_definitions
 
 
 def system_etypes(schema):
@@ -256,8 +292,8 @@
             eid = getattr(edef, 'eid', None)
         self.eid = eid
         # take care: no _groups attribute when deep-copying
-        if getattr(self, '_groups', None):
-            for groups in self._groups.itervalues():
+        if getattr(self, 'permissions', None):
+            for groups in self.permissions.itervalues():
                 for group_or_rqlexpr in groups:
                     if isinstance(group_or_rqlexpr, RRQLExpression):
                         msg = "can't use RRQLExpression on an entity type, use an ERQLExpression (%s)"
@@ -304,7 +340,7 @@
             if rschema.final:
                 if rschema == 'has_text':
                     has_has_text = True
-                elif self.rproperty(rschema, 'fulltextindexed'):
+                elif self.rdef(rschema).get('fulltextindexed'):
                     may_need_has_text = True
             elif rschema.fulltext_container:
                 if rschema.fulltext_container == 'subject':
@@ -329,32 +365,12 @@
         """return True if this entity type is used to build the schema"""
         return self.type in SCHEMA_TYPES
 
-    def check_perm(self, session, action, eid=None):
-        # NB: session may be a server session or a request object
-        user = session.user
-        # check user is in an allowed group, if so that's enough
-        # internal sessions should always stop there
-        if user.matching_groups(self.get_groups(action)):
-            return
-        # if 'owners' in allowed groups, check if the user actually owns this
-        # object, if so that's enough
-        if eid is not None and 'owners' in self.get_groups(action) and \
-               user.owns(eid):
-            return
-        # else if there is some rql expressions, check them
-        if any(rqlexpr.check(session, eid)
-               for rqlexpr in self.get_rqlexprs(action)):
-            return
-        raise Unauthorized(action, str(self))
-
     def rql_expression(self, expression, mainvars=None, eid=None):
         """rql expression factory"""
         return ERQLExpression(expression, mainvars, eid)
 
 
 class CubicWebRelationSchema(RelationSchema):
-    RelationSchema._RPROPERTIES['eid'] = None
-    _perms_checked = False
 
     def __init__(self, schema=None, rdef=None, eid=None, **kwargs):
         if rdef is not None:
@@ -369,73 +385,52 @@
     def meta(self):
         return self.type in META_RTYPES
 
-    def update(self, subjschema, objschema, rdef):
-        super(CubicWebRelationSchema, self).update(subjschema, objschema, rdef)
-        if not self._perms_checked and self._groups:
-            for action, groups in self._groups.iteritems():
-                for group_or_rqlexpr in groups:
-                    if action == 'read' and \
-                           isinstance(group_or_rqlexpr, RQLExpression):
-                        msg = "can't use rql expression for read permission of "\
-                              "a relation type (%s)"
-                        raise BadSchemaDefinition(msg % self.type)
-                    elif self.final and isinstance(group_or_rqlexpr, RRQLExpression):
-                        if self.schema.reading_from_database:
-                            # we didn't have final relation earlier, so turn
-                            # RRQLExpression into ERQLExpression now
-                            rqlexpr = group_or_rqlexpr
-                            newrqlexprs = [x for x in self.get_rqlexprs(action) if not x is rqlexpr]
-                            newrqlexprs.append(ERQLExpression(rqlexpr.expression,
-                                                              rqlexpr.mainvars,
-                                                              rqlexpr.eid))
-                            self.set_rqlexprs(action, newrqlexprs)
-                        else:
-                            msg = "can't use RRQLExpression on a final relation "\
-                                  "type (eg attribute relation), use an ERQLExpression (%s)"
-                            raise BadSchemaDefinition(msg % self.type)
-                    elif not self.final and \
-                             isinstance(group_or_rqlexpr, ERQLExpression):
-                        msg = "can't use ERQLExpression on a relation type, use "\
-                              "a RRQLExpression (%s)"
-                        raise BadSchemaDefinition(msg % self.type)
-            self._perms_checked = True
-
-    def cardinality(self, subjtype, objtype, target):
-        card = self.rproperty(subjtype, objtype, 'cardinality')
-        return (target == 'subject' and card[0]) or \
-               (target == 'object' and card[1])
-
     def schema_relation(self):
         """return True if this relation type is used to build the schema"""
         return self.type in SCHEMA_TYPES
 
-    def physical_mode(self):
-        """return an appropriate mode for physical storage of this relation type:
-        * 'subjectinline' if every possible subject cardinalities are 1 or ?
-        * 'objectinline' if 'subjectinline' mode is not possible but every
-          possible object cardinalities are 1 or ?
-        * None if neither 'subjectinline' and 'objectinline'
-        """
-        assert not self.final
-        return self.inlined and 'subjectinline' or None
+    def may_have_permission(self, action, req, eschema=None, role=None):
+        if eschema is not None:
+            for tschema in rschema.targets(eschema, role):
+                rdef = rschema.role_rdef(eschema, tschema, role)
+                if rdef.may_have_permission(action, req):
+                    return True
+        else:
+            for rdef in self.rdefs.itervalues():
+                if rdef.may_have_permission(action, req):
+                    return True
+        return False
 
-    def check_perm(self, session, action, *args, **kwargs):
-        # NB: session may be a server session or a request object check user is
-        # in an allowed group, if so that's enough internal sessions should
-        # always stop there
-        if session.user.matching_groups(self.get_groups(action)):
-            return
-        # else if there is some rql expressions, check them
-        if any(rqlexpr.check(session, *args, **kwargs)
-               for rqlexpr in self.get_rqlexprs(action)):
-            return
-        raise Unauthorized(action, str(self))
+    def has_perm(self, session, action, **kwargs):
+        """return true if the action is granted globaly or localy"""
+        if 'fromeid' in kwargs:
+            subjtype = session.describe(kwargs['fromeid'])
+        else:
+            subjtype = None
+        if 'toeid' in kwargs:
+            objtype = session.describe(kwargs['toeid'])
+        else:
+            objtype = Nono
+        if objtype and subjtype:
+            return self.rdef(subjtype, objtype).has_perm(session, action, **kwargs)
+        elif subjtype:
+            for tschema in rschema.targets(subjtype, 'subject'):
+                rdef = rschema.rdef(subjtype, tschema)
+                if not rdef.has_perm(action, req, **kwargs):
+                    return False
+        elif objtype:
+            for tschema in rschema.targets(objtype, 'object'):
+                rdef = rschema.rdef(tschema, objtype)
+                if not rdef.has_perm(action, req, **kwargs):
+                    return False
+        else:
+            for rdef in self.rdefs.itervalues():
+                if not rdef.has_perm(action, req, **kwargs):
+                    return False
 
-    def rql_expression(self, expression, mainvars=None, eid=None):
-        """rql expression factory"""
-        if self.final:
-            return ERQLExpression(expression, mainvars, eid)
-        return RRQLExpression(expression, mainvars, eid)
+    @deprecated('use .rdef(subjtype, objtype).role_cardinality(role)')
+    def cardinality(self, subjtype, objtype, target):
+        return self.rdef(subjtype, objtype).role_cardinality(target)
 
 
 class CubicWebSchema(Schema):
@@ -460,13 +455,10 @@
         ybo.register_base_types(self)
         rschema = self.add_relation_type(ybo.RelationType('eid'))
         rschema.final = True
-        rschema.set_default_groups()
         rschema = self.add_relation_type(ybo.RelationType('has_text'))
         rschema.final = True
-        rschema.set_default_groups()
         rschema = self.add_relation_type(ybo.RelationType('identity'))
         rschema.final = False
-        rschema.set_default_groups()
 
     def add_entity_type(self, edef):
         edef.name = edef.name.encode()
@@ -508,11 +500,10 @@
         rdef.name = rdef.name.lower()
         rdef.subject = bw_normalize_etype(rdef.subject)
         rdef.object = bw_normalize_etype(rdef.object)
-        if super(CubicWebSchema, self).add_relation_def(rdef):
+        rdefs = super(CubicWebSchema, self).add_relation_def(rdef)
+        if rdefs:
             try:
-                self._eid_index[rdef.eid] = (self.eschema(rdef.subject),
-                                             self.rschema(rdef.name),
-                                             self.eschema(rdef.object))
+                self._eid_index[rdef.eid] = rdefs
             except AttributeError:
                 pass # not a serialized schema
 
@@ -523,7 +514,9 @@
 
     def del_relation_def(self, subjtype, rtype, objtype):
         for k, v in self._eid_index.items():
-            if v == (subjtype, rtype, objtype):
+            if not isinstance(v, RelationDefinitionSchema):
+                continue
+            if v.subject == subjtype and v.rtype == rtype and v.object == objtype:
                 del self._eid_index[k]
                 break
         super(CubicWebSchema, self).del_relation_def(subjtype, rtype, objtype)
@@ -654,6 +647,11 @@
     def __repr__(self):
         return '%s(%s)' % (self.__class__.__name__, self.full_rql)
 
+    def __cmp__(self, other):
+        if hasattr(other, 'expression'):
+            return cmp(other.expression, self.expression)
+        return False
+
     def __deepcopy__(self, memo):
         return self.__class__(self.expression, self.mainvars)
     def __getstate__(self):
@@ -755,7 +753,7 @@
                 for eaction, var, col in has_perm_defs:
                     for i in xrange(len(rset)):
                         eschema = get_eschema(rset.description[i][col])
-                        eschema.check_perm(session, eaction, rset[i][col])
+                        eschema.check_perm(session, eaction, eid=rset[i][col])
                 if self.eid is not None:
                     session.local_perm_cache[key] = True
                 return True