# HG changeset patch # User Sylvain Thénault # Date 1392296308 -3600 # Node ID 79d34ba48612048142a920e8f8810852cadb0fa0 # Parent 643b19d79e4a07e2180c20fbc66a9403736776a5 [CWEP002] refactor rql read security checking Split 'check_read_perms' into 'check_relations_perms' which checks relations 'read' permissions and 'get_local_checks' which build dictionary of local security checks (rql expression) for variables. This allows to check relations 'read' permissions earlier in the process and so to prepare insertion of the rql rewriter: we want to check permissions of the computed relation, not permissions of relations introduced by the associated rule, to conform to the CWEP. Related to #3546717 diff -r 643b19d79e4a -r 79d34ba48612 server/querier.py --- a/server/querier.py Mon Jun 16 10:22:24 2014 +0200 +++ b/server/querier.py Thu Feb 13 13:58:28 2014 +0100 @@ -28,6 +28,7 @@ from yams import BASE_TYPES from cubicweb import ValidationError, Unauthorized, UnknownEid +from cubicweb.rqlrewrite import RQLRelationRewriter from cubicweb import Binary, server from cubicweb.rset import ResultSet @@ -72,7 +73,44 @@ except AttributeError: return cnx.entity_metas(term.eval(args))['type'] -def check_read_access(cnx, rqlst, solution, args): +def check_relations_read_access(cnx, select, args): + """Raise :exc:`Unauthorized` if the given user doesn't have credentials to + read relations used in the givel syntaxt tree + """ + # use `term_etype` since we've to deal with rewritten constants here, + # when used as an external source by another repository. + # XXX what about local read security w/ those rewritten constants... + # XXX constants can also happen in some queries generated by req.find() + DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS + schema = cnx.repo.schema + user = cnx.user + if select.where is not None: + for rel in select.where.iget_nodes(Relation): + for solution in select.solutions: + # XXX has_text may have specific perm ? + if rel.r_type in READ_ONLY_RTYPES: + continue + rschema = schema.rschema(rel.r_type) + if rschema.final: + eschema = schema.eschema(term_etype(cnx, rel.children[0], + solution, args)) + rdef = eschema.rdef(rschema) + else: + rdef = rschema.rdef(term_etype(cnx, rel.children[0], + solution, args), + term_etype(cnx, rel.children[1].children[0], + solution, args)) + if not user.matching_groups(rdef.get_groups('read')): + if DBG: + print ('check_read_access: %s %s does not match %s' % + (rdef, user.groups, rdef.get_groups('read'))) + # XXX rqlexpr not allowed + raise Unauthorized('read', rel.r_type) + if DBG: + print ('check_read_access: %s %s matches %s' % + (rdef, user.groups, rdef.get_groups('read'))) + +def get_local_checks(cnx, rqlst, solution): """Check that the given user has credentials to access data read by the query and return a dict defining necessary "local checks" (i.e. rql expression in read permission defined in the schema) where no group grants @@ -80,50 +118,27 @@ Returned dictionary's keys are variable names and values the rql expressions for this variable (with the given solution). + + Raise :exc:`Unauthorized` if access is known to be defined, i.e. if there is + no matching group and no local permissions. """ - # use `term_etype` since we've to deal with rewritten constants here, - # when used as an external source by another repository. - # XXX what about local read security w/ those rewritten constants... DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS schema = cnx.repo.schema - if rqlst.where is not None: - for rel in rqlst.where.iget_nodes(Relation): - # XXX has_text may have specific perm ? - if rel.r_type in READ_ONLY_RTYPES: - continue - rschema = schema.rschema(rel.r_type) - if rschema.final: - eschema = schema.eschema(term_etype(cnx, rel.children[0], - solution, args)) - rdef = eschema.rdef(rschema) - else: - rdef = rschema.rdef(term_etype(cnx, rel.children[0], - solution, args), - term_etype(cnx, rel.children[1].children[0], - solution, args)) - if not cnx.user.matching_groups(rdef.get_groups('read')): - if DBG: - print ('check_read_access: %s %s does not match %s' % - (rdef, cnx.user.groups, rdef.get_groups('read'))) - # XXX rqlexpr not allowed - raise Unauthorized('read', rel.r_type) - if DBG: - print ('check_read_access: %s %s matches %s' % - (rdef, cnx.user.groups, rdef.get_groups('read'))) + user = cnx.user localchecks = {} # iterate on defined_vars and not on solutions to ignore column aliases for varname in rqlst.defined_vars: eschema = schema.eschema(solution[varname]) if eschema.final: continue - if not cnx.user.matching_groups(eschema.get_groups('read')): + if not user.matching_groups(eschema.get_groups('read')): erqlexprs = eschema.get_rqlexprs('read') if not erqlexprs: ex = Unauthorized('read', solution[varname]) ex.var = varname if DBG: print ('check_read_access: %s %s %s %s' % - (varname, eschema, cnx.user.groups, eschema.get_groups('read'))) + (varname, eschema, user.groups, eschema.get_groups('read'))) raise ex # don't insert security on variable only referenced by 'NOT X relation Y' or # 'NOT EXISTS(X relation Y)' @@ -133,7 +148,8 @@ if (not schema.rschema(r.r_type).final and ((isinstance(r.parent, Exists) and r.parent.neged(strict=True)) or isinstance(r.parent, Not)))]) - != len(varinfo['relations'])): + != + len(varinfo['relations'])): localchecks[varname] = erqlexprs return localchecks @@ -258,7 +274,7 @@ newsolutions = [] for solution in rqlst.solutions: try: - localcheck = check_read_access(cnx, rqlst, solution, self.args) + localcheck = get_local_checks(cnx, rqlst, solution) except Unauthorized as ex: msg = 'remove %s from solutions since %s has no %s access to %s' msg %= (solution, cnx.user.login, ex.args[0], ex.args[1]) @@ -573,6 +589,7 @@ if cnx.read_security: for select in rqlst.children: check_no_password_selected(select) + check_relations_read_access(cnx, select, args) # on select query, always copy the cached rqlst so we don't have to # bother modifying it. This is not necessary on write queries since # a new syntax tree is built from them. diff -r 643b19d79e4a -r 79d34ba48612 server/test/unittest_security.py --- a/server/test/unittest_security.py Mon Jun 16 10:22:24 2014 +0200 +++ b/server/test/unittest_security.py Thu Feb 13 13:58:28 2014 +0100 @@ -22,7 +22,7 @@ from cubicweb.devtools.testlib import CubicWebTC from cubicweb import Unauthorized, ValidationError, QueryError, Binary from cubicweb.schema import ERQLExpression -from cubicweb.server.querier import check_read_access +from cubicweb.server.querier import get_local_checks, check_relations_read_access from cubicweb.server.utils import _CRYPTO_CTX @@ -37,18 +37,33 @@ class LowLevelSecurityFunctionTC(BaseSecurityTC): - def test_check_read_access(self): - rql = u'Personne U where U nom "managers"' + def test_check_relation_read_access(self): + rql = u'Personne U WHERE U nom "managers"' + rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] + nom = self.repo.schema['Personne'].rdef('nom') + with self.temporary_permissions((nom, {'read': ('users', 'managers')})): + with self.admin_access.repo_cnx() as cnx: + self.repo.vreg.solutions(cnx, rqlst, None) + check_relations_read_access(cnx, rqlst, {}) + with self.new_access('anon').repo_cnx() as cnx: + self.assertRaises(Unauthorized, + check_relations_read_access, + cnx, rqlst, {}) + self.assertRaises(Unauthorized, cnx.execute, rql) + + def test_get_local_checks(self): + rql = u'Personne U WHERE U nom "managers"' rqlst = self.repo.vreg.rqlhelper.parse(rql).children[0] with self.temporary_permissions(Personne={'read': ('users', 'managers')}): with self.admin_access.repo_cnx() as cnx: self.repo.vreg.solutions(cnx, rqlst, None) solution = rqlst.solutions[0] - check_read_access(cnx, rqlst, solution, {}) + localchecks = get_local_checks(cnx, rqlst, solution) + self.assertEqual({}, localchecks) with self.new_access('anon').repo_cnx() as cnx: self.assertRaises(Unauthorized, - check_read_access, - cnx, rqlst, solution, {}) + get_local_checks, + cnx, rqlst, solution) self.assertRaises(Unauthorized, cnx.execute, rql) def test_upassword_not_selectable(self):