[CWEP002] refactor rql read security checking
authorSylvain Thénault <sylvain.thenault@logilab.fr>
Thu, 13 Feb 2014 13:58:28 +0100
changeset 9954 79d34ba48612
parent 9953 643b19d79e4a
child 9955 60a9cd1b3a4b
[CWEP002] refactor rql read security checking Split 'check_read_perms' into 'check_relations_perms' which checks relations 'read' permissions and 'get_local_checks' which build dictionary of local security checks (rql expression) for variables. This allows to check relations 'read' permissions earlier in the process and so to prepare insertion of the rql rewriter: we want to check permissions of the computed relation, not permissions of relations introduced by the associated rule, to conform to the CWEP. Related to #3546717
server/querier.py
server/test/unittest_security.py
--- a/server/querier.py	Mon Jun 16 10:22:24 2014 +0200
+++ b/server/querier.py	Thu Feb 13 13:58:28 2014 +0100
@@ -28,6 +28,7 @@
 from yams import BASE_TYPES
 
 from cubicweb import ValidationError, Unauthorized, UnknownEid
+from cubicweb.rqlrewrite import RQLRelationRewriter
 from cubicweb import Binary, server
 from cubicweb.rset import ResultSet
 
@@ -72,7 +73,44 @@
     except AttributeError:
         return cnx.entity_metas(term.eval(args))['type']
 
-def check_read_access(cnx, rqlst, solution, args):
+def check_relations_read_access(cnx, select, args):
+    """Raise :exc:`Unauthorized` if the given user doesn't have credentials to
+    read relations used in the givel syntaxt tree
+    """
+    # use `term_etype` since we've to deal with rewritten constants here,
+    # when used as an external source by another repository.
+    # XXX what about local read security w/ those rewritten constants...
+    # XXX constants can also happen in some queries generated by req.find()
+    DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS
+    schema = cnx.repo.schema
+    user = cnx.user
+    if select.where is not None:
+        for rel in select.where.iget_nodes(Relation):
+            for solution in select.solutions:
+                # XXX has_text may have specific perm ?
+                if rel.r_type in READ_ONLY_RTYPES:
+                    continue
+                rschema = schema.rschema(rel.r_type)
+                if rschema.final:
+                    eschema = schema.eschema(term_etype(cnx, rel.children[0],
+                                             solution, args))
+                    rdef = eschema.rdef(rschema)
+                else:
+                    rdef = rschema.rdef(term_etype(cnx, rel.children[0],
+                                                   solution, args),
+                                        term_etype(cnx, rel.children[1].children[0],
+                                                   solution, args))
+                if not user.matching_groups(rdef.get_groups('read')):
+                    if DBG:
+                        print ('check_read_access: %s %s does not match %s' %
+                               (rdef, user.groups, rdef.get_groups('read')))
+                    # XXX rqlexpr not allowed
+                    raise Unauthorized('read', rel.r_type)
+                if DBG:
+                    print ('check_read_access: %s %s matches %s' %
+                           (rdef, user.groups, rdef.get_groups('read')))
+
+def get_local_checks(cnx, rqlst, solution):
     """Check that the given user has credentials to access data read by the
     query and return a dict defining necessary "local checks" (i.e. rql
     expression in read permission defined in the schema) where no group grants
@@ -80,50 +118,27 @@
 
     Returned dictionary's keys are variable names and values the rql expressions
     for this variable (with the given solution).
+
+    Raise :exc:`Unauthorized` if access is known to be defined, i.e. if there is
+    no matching group and no local permissions.
     """
-    # use `term_etype` since we've to deal with rewritten constants here,
-    # when used as an external source by another repository.
-    # XXX what about local read security w/ those rewritten constants...
     DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS
     schema = cnx.repo.schema
-    if rqlst.where is not None:
-        for rel in rqlst.where.iget_nodes(Relation):
-            # XXX has_text may have specific perm ?
-            if rel.r_type in READ_ONLY_RTYPES:
-                continue
-            rschema = schema.rschema(rel.r_type)
-            if rschema.final:
-                eschema = schema.eschema(term_etype(cnx, rel.children[0],
-                                                    solution, args))
-                rdef = eschema.rdef(rschema)
-            else:
-                rdef = rschema.rdef(term_etype(cnx, rel.children[0],
-                                               solution, args),
-                                    term_etype(cnx, rel.children[1].children[0],
-                                               solution, args))
-            if not cnx.user.matching_groups(rdef.get_groups('read')):
-                if DBG:
-                    print ('check_read_access: %s %s does not match %s' %
-                           (rdef, cnx.user.groups, rdef.get_groups('read')))
-                # XXX rqlexpr not allowed
-                raise Unauthorized('read', rel.r_type)
-            if DBG:
-                print ('check_read_access: %s %s matches %s' %
-                       (rdef, cnx.user.groups, rdef.get_groups('read')))
+    user = cnx.user
     localchecks = {}
     # iterate on defined_vars and not on solutions to ignore column aliases
     for varname in rqlst.defined_vars:
         eschema = schema.eschema(solution[varname])
         if eschema.final:
             continue
-        if not cnx.user.matching_groups(eschema.get_groups('read')):
+        if not user.matching_groups(eschema.get_groups('read')):
             erqlexprs = eschema.get_rqlexprs('read')
             if not erqlexprs:
                 ex = Unauthorized('read', solution[varname])
                 ex.var = varname
                 if DBG:
                     print ('check_read_access: %s %s %s %s' %
-                           (varname, eschema, cnx.user.groups, eschema.get_groups('read')))
+                           (varname, eschema, user.groups, eschema.get_groups('read')))
                 raise ex
             # don't insert security on variable only referenced by 'NOT X relation Y' or
             # 'NOT EXISTS(X relation Y)'
@@ -133,7 +148,8 @@
                      if (not schema.rschema(r.r_type).final
                          and ((isinstance(r.parent, Exists) and r.parent.neged(strict=True))
                               or isinstance(r.parent, Not)))])
-                != len(varinfo['relations'])):
+                !=
+                len(varinfo['relations'])):
                 localchecks[varname] = erqlexprs
     return localchecks
 
@@ -258,7 +274,7 @@
         newsolutions = []
         for solution in rqlst.solutions:
             try:
-                localcheck = check_read_access(cnx, rqlst, solution, self.args)
+                localcheck = get_local_checks(cnx, rqlst, solution)
             except Unauthorized as ex:
                 msg = 'remove %s from solutions since %s has no %s access to %s'
                 msg %= (solution, cnx.user.login, ex.args[0], ex.args[1])
@@ -573,6 +589,7 @@
             if cnx.read_security:
                 for select in rqlst.children:
                     check_no_password_selected(select)
+                    check_relations_read_access(cnx, select, args)
             # on select query, always copy the cached rqlst so we don't have to
             # bother modifying it. This is not necessary on write queries since
             # a new syntax tree is built from them.
--- a/server/test/unittest_security.py	Mon Jun 16 10:22:24 2014 +0200
+++ b/server/test/unittest_security.py	Thu Feb 13 13:58:28 2014 +0100
@@ -22,7 +22,7 @@
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb import Unauthorized, ValidationError, QueryError, Binary
 from cubicweb.schema import ERQLExpression
-from cubicweb.server.querier import check_read_access
+from cubicweb.server.querier import get_local_checks, check_relations_read_access
 from cubicweb.server.utils import _CRYPTO_CTX
 
 
@@ -37,18 +37,33 @@
 
 class LowLevelSecurityFunctionTC(BaseSecurityTC):
 
-    def test_check_read_access(self):
-        rql = u'Personne U where U nom "managers"'
+    def test_check_relation_read_access(self):
+        rql = u'Personne U WHERE U nom "managers"'
+        rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
+        nom = self.repo.schema['Personne'].rdef('nom')
+        with self.temporary_permissions((nom, {'read': ('users', 'managers')})):
+            with self.admin_access.repo_cnx() as cnx:
+                self.repo.vreg.solutions(cnx, rqlst, None)
+                check_relations_read_access(cnx, rqlst, {})
+            with self.new_access('anon').repo_cnx() as cnx:
+                self.assertRaises(Unauthorized,
+                                  check_relations_read_access,
+                                  cnx, rqlst, {})
+                self.assertRaises(Unauthorized, cnx.execute, rql)
+
+    def test_get_local_checks(self):
+        rql = u'Personne U WHERE U nom "managers"'
         rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
         with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
             with self.admin_access.repo_cnx() as cnx:
                 self.repo.vreg.solutions(cnx, rqlst, None)
                 solution = rqlst.solutions[0]
-                check_read_access(cnx, rqlst, solution, {})
+                localchecks = get_local_checks(cnx, rqlst, solution)
+                self.assertEqual({}, localchecks)
             with self.new_access('anon').repo_cnx() as cnx:
                 self.assertRaises(Unauthorized,
-                                  check_read_access,
-                                  cnx, rqlst, solution, {})
+                                  get_local_checks,
+                                  cnx, rqlst, solution)
                 self.assertRaises(Unauthorized, cnx.execute, rql)
 
     def test_upassword_not_selectable(self):