[CWEP002] refactor rql read security checking
Split 'check_read_perms' into 'check_relations_perms' which checks relations
'read' permissions and 'get_local_checks' which build dictionary of local
security checks (rql expression) for variables.
This allows to check relations 'read' permissions earlier in the process and so
to prepare insertion of the rql rewriter: we want to check permissions of the
computed relation, not permissions of relations introduced by the associated
rule, to conform to the CWEP.
Related to #3546717
--- a/server/querier.py Mon Jun 16 10:22:24 2014 +0200
+++ b/server/querier.py Thu Feb 13 13:58:28 2014 +0100
@@ -28,6 +28,7 @@
from yams import BASE_TYPES
from cubicweb import ValidationError, Unauthorized, UnknownEid
+from cubicweb.rqlrewrite import RQLRelationRewriter
from cubicweb import Binary, server
from cubicweb.rset import ResultSet
@@ -72,7 +73,44 @@
except AttributeError:
return cnx.entity_metas(term.eval(args))['type']
-def check_read_access(cnx, rqlst, solution, args):
+def check_relations_read_access(cnx, select, args):
+ """Raise :exc:`Unauthorized` if the given user doesn't have credentials to
+ read relations used in the givel syntaxt tree
+ """
+ # use `term_etype` since we've to deal with rewritten constants here,
+ # when used as an external source by another repository.
+ # XXX what about local read security w/ those rewritten constants...
+ # XXX constants can also happen in some queries generated by req.find()
+ DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS
+ schema = cnx.repo.schema
+ user = cnx.user
+ if select.where is not None:
+ for rel in select.where.iget_nodes(Relation):
+ for solution in select.solutions:
+ # XXX has_text may have specific perm ?
+ if rel.r_type in READ_ONLY_RTYPES:
+ continue
+ rschema = schema.rschema(rel.r_type)
+ if rschema.final:
+ eschema = schema.eschema(term_etype(cnx, rel.children[0],
+ solution, args))
+ rdef = eschema.rdef(rschema)
+ else:
+ rdef = rschema.rdef(term_etype(cnx, rel.children[0],
+ solution, args),
+ term_etype(cnx, rel.children[1].children[0],
+ solution, args))
+ if not user.matching_groups(rdef.get_groups('read')):
+ if DBG:
+ print ('check_read_access: %s %s does not match %s' %
+ (rdef, user.groups, rdef.get_groups('read')))
+ # XXX rqlexpr not allowed
+ raise Unauthorized('read', rel.r_type)
+ if DBG:
+ print ('check_read_access: %s %s matches %s' %
+ (rdef, user.groups, rdef.get_groups('read')))
+
+def get_local_checks(cnx, rqlst, solution):
"""Check that the given user has credentials to access data read by the
query and return a dict defining necessary "local checks" (i.e. rql
expression in read permission defined in the schema) where no group grants
@@ -80,50 +118,27 @@
Returned dictionary's keys are variable names and values the rql expressions
for this variable (with the given solution).
+
+ Raise :exc:`Unauthorized` if access is known to be defined, i.e. if there is
+ no matching group and no local permissions.
"""
- # use `term_etype` since we've to deal with rewritten constants here,
- # when used as an external source by another repository.
- # XXX what about local read security w/ those rewritten constants...
DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS
schema = cnx.repo.schema
- if rqlst.where is not None:
- for rel in rqlst.where.iget_nodes(Relation):
- # XXX has_text may have specific perm ?
- if rel.r_type in READ_ONLY_RTYPES:
- continue
- rschema = schema.rschema(rel.r_type)
- if rschema.final:
- eschema = schema.eschema(term_etype(cnx, rel.children[0],
- solution, args))
- rdef = eschema.rdef(rschema)
- else:
- rdef = rschema.rdef(term_etype(cnx, rel.children[0],
- solution, args),
- term_etype(cnx, rel.children[1].children[0],
- solution, args))
- if not cnx.user.matching_groups(rdef.get_groups('read')):
- if DBG:
- print ('check_read_access: %s %s does not match %s' %
- (rdef, cnx.user.groups, rdef.get_groups('read')))
- # XXX rqlexpr not allowed
- raise Unauthorized('read', rel.r_type)
- if DBG:
- print ('check_read_access: %s %s matches %s' %
- (rdef, cnx.user.groups, rdef.get_groups('read')))
+ user = cnx.user
localchecks = {}
# iterate on defined_vars and not on solutions to ignore column aliases
for varname in rqlst.defined_vars:
eschema = schema.eschema(solution[varname])
if eschema.final:
continue
- if not cnx.user.matching_groups(eschema.get_groups('read')):
+ if not user.matching_groups(eschema.get_groups('read')):
erqlexprs = eschema.get_rqlexprs('read')
if not erqlexprs:
ex = Unauthorized('read', solution[varname])
ex.var = varname
if DBG:
print ('check_read_access: %s %s %s %s' %
- (varname, eschema, cnx.user.groups, eschema.get_groups('read')))
+ (varname, eschema, user.groups, eschema.get_groups('read')))
raise ex
# don't insert security on variable only referenced by 'NOT X relation Y' or
# 'NOT EXISTS(X relation Y)'
@@ -133,7 +148,8 @@
if (not schema.rschema(r.r_type).final
and ((isinstance(r.parent, Exists) and r.parent.neged(strict=True))
or isinstance(r.parent, Not)))])
- != len(varinfo['relations'])):
+ !=
+ len(varinfo['relations'])):
localchecks[varname] = erqlexprs
return localchecks
@@ -258,7 +274,7 @@
newsolutions = []
for solution in rqlst.solutions:
try:
- localcheck = check_read_access(cnx, rqlst, solution, self.args)
+ localcheck = get_local_checks(cnx, rqlst, solution)
except Unauthorized as ex:
msg = 'remove %s from solutions since %s has no %s access to %s'
msg %= (solution, cnx.user.login, ex.args[0], ex.args[1])
@@ -573,6 +589,7 @@
if cnx.read_security:
for select in rqlst.children:
check_no_password_selected(select)
+ check_relations_read_access(cnx, select, args)
# on select query, always copy the cached rqlst so we don't have to
# bother modifying it. This is not necessary on write queries since
# a new syntax tree is built from them.
--- a/server/test/unittest_security.py Mon Jun 16 10:22:24 2014 +0200
+++ b/server/test/unittest_security.py Thu Feb 13 13:58:28 2014 +0100
@@ -22,7 +22,7 @@
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb import Unauthorized, ValidationError, QueryError, Binary
from cubicweb.schema import ERQLExpression
-from cubicweb.server.querier import check_read_access
+from cubicweb.server.querier import get_local_checks, check_relations_read_access
from cubicweb.server.utils import _CRYPTO_CTX
@@ -37,18 +37,33 @@
class LowLevelSecurityFunctionTC(BaseSecurityTC):
- def test_check_read_access(self):
- rql = u'Personne U where U nom "managers"'
+ def test_check_relation_read_access(self):
+ rql = u'Personne U WHERE U nom "managers"'
+ rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
+ nom = self.repo.schema['Personne'].rdef('nom')
+ with self.temporary_permissions((nom, {'read': ('users', 'managers')})):
+ with self.admin_access.repo_cnx() as cnx:
+ self.repo.vreg.solutions(cnx, rqlst, None)
+ check_relations_read_access(cnx, rqlst, {})
+ with self.new_access('anon').repo_cnx() as cnx:
+ self.assertRaises(Unauthorized,
+ check_relations_read_access,
+ cnx, rqlst, {})
+ self.assertRaises(Unauthorized, cnx.execute, rql)
+
+ def test_get_local_checks(self):
+ rql = u'Personne U WHERE U nom "managers"'
rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0]
with self.temporary_permissions(Personne={'read': ('users', 'managers')}):
with self.admin_access.repo_cnx() as cnx:
self.repo.vreg.solutions(cnx, rqlst, None)
solution = rqlst.solutions[0]
- check_read_access(cnx, rqlst, solution, {})
+ localchecks = get_local_checks(cnx, rqlst, solution)
+ self.assertEqual({}, localchecks)
with self.new_access('anon').repo_cnx() as cnx:
self.assertRaises(Unauthorized,
- check_read_access,
- cnx, rqlst, solution, {})
+ get_local_checks,
+ cnx, rqlst, solution)
self.assertRaises(Unauthorized, cnx.execute, rql)
def test_upassword_not_selectable(self):