diff -r 058bb3dc685f -r 0b59724cb3f2 server/querier.py --- a/server/querier.py Mon Jan 04 18:40:30 2016 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,737 +0,0 @@ -# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""Helper classes to execute RQL queries on a set of sources, performing -security checking and data aggregation. -""" -from __future__ import print_function - -__docformat__ = "restructuredtext en" - -from itertools import repeat - -from six import text_type, string_types, integer_types -from six.moves import range - -from rql import RQLSyntaxError, CoercionError -from rql.stmts import Union -from rql.nodes import ETYPE_PYOBJ_MAP, etype_from_pyobj, Relation, Exists, Not -from yams import BASE_TYPES - -from cubicweb import ValidationError, Unauthorized, UnknownEid -from cubicweb.rqlrewrite import RQLRelationRewriter -from cubicweb import Binary, server -from cubicweb.rset import ResultSet - -from cubicweb.utils import QueryCache, RepeatList -from cubicweb.server.rqlannotation import SQLGenAnnotator, set_qdata -from cubicweb.server.ssplanner import READ_ONLY_RTYPES, add_types_restriction -from cubicweb.server.edition import EditedEntity -from cubicweb.server.ssplanner import SSPlanner -from cubicweb.statsd_logger import statsd_timeit, statsd_c - -ETYPE_PYOBJ_MAP[Binary] = 'Bytes' - - -def empty_rset(rql, args, rqlst=None): - """build an empty result set object""" - return ResultSet([], rql, args, rqlst=rqlst) - -def update_varmap(varmap, selected, table): - """return a sql schema to store RQL query result""" - for i, term in enumerate(selected): - key = term.as_string() - value = '%s.C%s' % (table, i) - if varmap.get(key, value) != value: - raise Exception('variable name conflict on %s: got %s / %s' - % (key, value, varmap)) - varmap[key] = value - -# permission utilities ######################################################## - -def check_no_password_selected(rqlst): - """check that Password entities are not selected""" - for solution in rqlst.solutions: - for var, etype in solution.items(): - if etype == 'Password': - raise Unauthorized('Password selection is not allowed (%s)' % var) - -def term_etype(cnx, term, solution, args): - """return the entity type for the given term (a VariableRef or a Constant - node) - """ - try: - return solution[term.name] - except AttributeError: - return cnx.entity_metas(term.eval(args))['type'] - -def check_relations_read_access(cnx, select, args): - """Raise :exc:`Unauthorized` if the given user doesn't have credentials to - read relations used in the given syntax tree - """ - # use `term_etype` since we've to deal with rewritten constants here, - # when used as an external source by another repository. - # XXX what about local read security w/ those rewritten constants... - # XXX constants can also happen in some queries generated by req.find() - DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS - schema = cnx.repo.schema - user = cnx.user - if select.where is not None: - for rel in select.where.iget_nodes(Relation): - for solution in select.solutions: - # XXX has_text may have specific perm ? - if rel.r_type in READ_ONLY_RTYPES: - continue - rschema = schema.rschema(rel.r_type) - if rschema.final: - eschema = schema.eschema(term_etype(cnx, rel.children[0], - solution, args)) - rdef = eschema.rdef(rschema) - else: - rdef = rschema.rdef(term_etype(cnx, rel.children[0], - solution, args), - term_etype(cnx, rel.children[1].children[0], - solution, args)) - if not user.matching_groups(rdef.get_groups('read')): - if DBG: - print('check_read_access: %s %s does not match %s' % - (rdef, user.groups, rdef.get_groups('read'))) - # XXX rqlexpr not allowed - raise Unauthorized('read', rel.r_type) - if DBG: - print('check_read_access: %s %s matches %s' % - (rdef, user.groups, rdef.get_groups('read'))) - -def get_local_checks(cnx, rqlst, solution): - """Check that the given user has credentials to access data read by the - query and return a dict defining necessary "local checks" (i.e. rql - expression in read permission defined in the schema) where no group grants - him the permission. - - Returned dictionary's keys are variable names and values the rql expressions - for this variable (with the given solution). - - Raise :exc:`Unauthorized` if access is known to be defined, i.e. if there is - no matching group and no local permissions. - """ - DBG = (server.DEBUG & server.DBG_SEC) and 'read' in server._SECURITY_CAPS - schema = cnx.repo.schema - user = cnx.user - localchecks = {} - # iterate on defined_vars and not on solutions to ignore column aliases - for varname in rqlst.defined_vars: - eschema = schema.eschema(solution[varname]) - if eschema.final: - continue - if not user.matching_groups(eschema.get_groups('read')): - erqlexprs = eschema.get_rqlexprs('read') - if not erqlexprs: - ex = Unauthorized('read', solution[varname]) - ex.var = varname - if DBG: - print('check_read_access: %s %s %s %s' % - (varname, eschema, user.groups, eschema.get_groups('read'))) - raise ex - # don't insert security on variable only referenced by 'NOT X relation Y' or - # 'NOT EXISTS(X relation Y)' - varinfo = rqlst.defined_vars[varname].stinfo - if varinfo['selected'] or ( - len([r for r in varinfo['relations'] - if (not schema.rschema(r.r_type).final - and ((isinstance(r.parent, Exists) and r.parent.neged(strict=True)) - or isinstance(r.parent, Not)))]) - != - len(varinfo['relations'])): - localchecks[varname] = erqlexprs - return localchecks - - -# Plans ####################################################################### - -class ExecutionPlan(object): - """the execution model of a rql query, composed of querier steps""" - - def __init__(self, querier, rqlst, args, cnx): - # original rql syntax tree - self.rqlst = rqlst - self.args = args or {} - # cnx executing the query - self.cnx = cnx - # quick reference to the system source - self.syssource = cnx.repo.system_source - # execution steps - self.steps = [] - # various resource accesors - self.querier = querier - self.schema = querier.schema - self.sqlannotate = querier.sqlgen_annotate - self.rqlhelper = cnx.vreg.rqlhelper - - def annotate_rqlst(self): - if not self.rqlst.annotated: - self.rqlhelper.annotate(self.rqlst) - - def add_step(self, step): - """add a step to the plan""" - self.steps.append(step) - - def sqlexec(self, sql, args=None): - return self.syssource.sqlexec(self.cnx, sql, args) - - def execute(self): - """execute a plan and return resulting rows""" - for step in self.steps: - result = step.execute() - # the latest executed step contains the full query result - return result - - def preprocess(self, union, security=True): - """insert security when necessary then annotate rql st for sql generation - - return rqlst to actually execute - """ - cached = None - if security and self.cnx.read_security: - # ensure security is turned of when security is inserted, - # else we may loop for ever... - if self.cnx.transaction_data.get('security-rqlst-cache'): - key = self.cache_key - else: - key = None - if key is not None and key in self.cnx.transaction_data: - cachedunion, args = self.cnx.transaction_data[key] - union.children[:] = [] - for select in cachedunion.children: - union.append(select) - union.has_text_query = cachedunion.has_text_query - args.update(self.args) - self.args = args - cached = True - else: - with self.cnx.security_enabled(read=False): - noinvariant = self._insert_security(union) - if key is not None: - self.cnx.transaction_data[key] = (union, self.args) - else: - noinvariant = () - if cached is None: - self.rqlhelper.simplify(union) - self.sqlannotate(union) - set_qdata(self.schema.rschema, union, noinvariant) - if union.has_text_query: - self.cache_key = None - - def _insert_security(self, union): - noinvariant = set() - for select in union.children[:]: - for subquery in select.with_: - self._insert_security(subquery.query) - localchecks, restricted = self._check_permissions(select) - if any(localchecks): - self.cnx.rql_rewriter.insert_local_checks( - select, self.args, localchecks, restricted, noinvariant) - return noinvariant - - def _check_permissions(self, rqlst): - """Return a dict defining "local checks", i.e. RQLExpression defined in - the schema that should be inserted in the original query, together with - a set of variable names which requires some security to be inserted. - - Solutions where a variable has a type which the user can't definitly - read are removed, else if the user *may* read it (i.e. if an rql - expression is defined for the "read" permission of the related type), - the local checks dict is updated. - - The local checks dict has entries for each different local check - necessary, with associated solutions as value, a local check being - defined by a list of 2-uple (variable name, rql expressions) for each - variable which has to be checked. Solutions which don't require local - checks will be associated to the empty tuple key. - - Note rqlst should not have been simplified at this point. - """ - cnx = self.cnx - msgs = [] - # dict(varname: eid), allowing to check rql expression for variables - # which have a known eid - varkwargs = {} - if not cnx.transaction_data.get('security-rqlst-cache'): - for var in rqlst.defined_vars.values(): - if var.stinfo['constnode'] is not None: - eid = var.stinfo['constnode'].eval(self.args) - varkwargs[var.name] = int(eid) - # dictionary of variables restricted for security reason - localchecks = {} - restricted_vars = set() - newsolutions = [] - for solution in rqlst.solutions: - try: - localcheck = get_local_checks(cnx, rqlst, solution) - except Unauthorized as ex: - msg = 'remove %s from solutions since %s has no %s access to %s' - msg %= (solution, cnx.user.login, ex.args[0], ex.args[1]) - msgs.append(msg) - LOGGER.info(msg) - else: - newsolutions.append(solution) - # try to benefit of rqlexpr.check cache for entities which - # are specified by eid in query'args - for varname, eid in varkwargs.items(): - try: - rqlexprs = localcheck.pop(varname) - except KeyError: - continue - # if entity has been added in the current transaction, the - # user can read it whatever rql expressions are associated - # to its type - if cnx.added_in_transaction(eid): - continue - for rqlexpr in rqlexprs: - if rqlexpr.check(cnx, eid): - break - else: - raise Unauthorized('No read acces on %r with eid %i.' % (var, eid)) - # mark variables protected by an rql expression - restricted_vars.update(localcheck) - # turn local check into a dict key - localcheck = tuple(sorted(localcheck.items())) - localchecks.setdefault(localcheck, []).append(solution) - # raise Unautorized exception if the user can't access to any solution - if not newsolutions: - raise Unauthorized('\n'.join(msgs)) - # if there is some message, solutions have been modified and must be - # reconsidered by the syntax treee - if msgs: - rqlst.set_possible_types(newsolutions) - return localchecks, restricted_vars - - def finalize(self, select, solutions, insertedvars): - rqlst = Union() - rqlst.append(select) - for mainvarname, rschema, newvarname in insertedvars: - nvartype = str(rschema.objects(solutions[0][mainvarname])[0]) - for sol in solutions: - sol[newvarname] = nvartype - select.clean_solutions(solutions) - add_types_restriction(self.schema, select) - self.rqlhelper.annotate(rqlst) - self.preprocess(rqlst, security=False) - return rqlst - - -class InsertPlan(ExecutionPlan): - """an execution model specific to the INSERT rql query - """ - - def __init__(self, querier, rqlst, args, cnx): - ExecutionPlan.__init__(self, querier, rqlst, args, cnx) - # save originally selected variable, we may modify this - # dictionary for substitution (query parameters) - self.selected = rqlst.selection - # list of rows of entities definition (ssplanner.EditedEntity) - self.e_defs = [[]] - # list of new relation definition (3-uple (from_eid, r_type, to_eid) - self.r_defs = set() - # indexes to track entity definitions bound to relation definitions - self._r_subj_index = {} - self._r_obj_index = {} - self._expanded_r_defs = {} - - def add_entity_def(self, edef): - """add an entity definition to build""" - self.e_defs[-1].append(edef) - - def add_relation_def(self, rdef): - """add an relation definition to build""" - self.r_defs.add(rdef) - if not isinstance(rdef[0], int): - self._r_subj_index.setdefault(rdef[0], []).append(rdef) - if not isinstance(rdef[2], int): - self._r_obj_index.setdefault(rdef[2], []).append(rdef) - - def substitute_entity_def(self, edef, edefs): - """substitute an incomplete entity definition by a list of complete - equivalents - - e.g. on queries such as :: - INSERT Personne X, Societe Y: X nom N, Y nom 'toto', X travaille Y - WHERE U login 'admin', U login N - - X will be inserted as many times as U exists, and so the X travaille Y - relations as to be added as many time as X is inserted - """ - if not edefs or not self.e_defs: - # no result, no entity will be created - self.e_defs = () - return - # first remove the incomplete entity definition - colidx = self.e_defs[0].index(edef) - for i, row in enumerate(self.e_defs[:]): - self.e_defs[i][colidx] = edefs[0] - samplerow = self.e_defs[i] - for edef_ in edefs[1:]: - row = [ed.clone() for i, ed in enumerate(samplerow) - if i != colidx] - row.insert(colidx, edef_) - self.e_defs.append(row) - # now, see if this entity def is referenced as subject in some relation - # definition - if edef in self._r_subj_index: - for rdef in self._r_subj_index[edef]: - expanded = self._expanded(rdef) - result = [] - for exp_rdef in expanded: - for edef_ in edefs: - result.append( (edef_, exp_rdef[1], exp_rdef[2]) ) - self._expanded_r_defs[rdef] = result - # and finally, see if this entity def is referenced as object in some - # relation definition - if edef in self._r_obj_index: - for rdef in self._r_obj_index[edef]: - expanded = self._expanded(rdef) - result = [] - for exp_rdef in expanded: - for edef_ in edefs: - result.append( (exp_rdef[0], exp_rdef[1], edef_) ) - self._expanded_r_defs[rdef] = result - - def _expanded(self, rdef): - """return expanded value for the given relation definition""" - try: - return self._expanded_r_defs[rdef] - except KeyError: - self.r_defs.remove(rdef) - return [rdef] - - def relation_defs(self): - """return the list for relation definitions to insert""" - for rdefs in self._expanded_r_defs.values(): - for rdef in rdefs: - yield rdef - for rdef in self.r_defs: - yield rdef - - def insert_entity_defs(self): - """return eids of inserted entities in a suitable form for the resulting - result set, e.g.: - - e.g. on queries such as :: - INSERT Personne X, Societe Y: X nom N, Y nom 'toto', X travaille Y - WHERE U login 'admin', U login N - - if there is two entities matching U, the result set will look like - [(eidX1, eidY1), (eidX2, eidY2)] - """ - cnx = self.cnx - repo = cnx.repo - results = [] - for row in self.e_defs: - results.append([repo.glob_add_entity(cnx, edef) - for edef in row]) - return results - - def insert_relation_defs(self): - cnx = self.cnx - repo = cnx.repo - edited_entities = {} - relations = {} - for subj, rtype, obj in self.relation_defs(): - # if a string is given into args instead of an int, we get it here - if isinstance(subj, string_types): - subj = int(subj) - elif not isinstance(subj, integer_types): - subj = subj.entity.eid - if isinstance(obj, string_types): - obj = int(obj) - elif not isinstance(obj, integer_types): - obj = obj.entity.eid - if repo.schema.rschema(rtype).inlined: - if subj not in edited_entities: - entity = cnx.entity_from_eid(subj) - edited = EditedEntity(entity) - edited_entities[subj] = edited - else: - edited = edited_entities[subj] - edited.edited_attribute(rtype, obj) - else: - if rtype in relations: - relations[rtype].append((subj, obj)) - else: - relations[rtype] = [(subj, obj)] - repo.glob_add_relations(cnx, relations) - for edited in edited_entities.values(): - repo.glob_update_entity(cnx, edited) - - -class QuerierHelper(object): - """helper class to execute rql queries, putting all things together""" - - def __init__(self, repo, schema): - # system info helper - self._repo = repo - # instance schema - self.set_schema(schema) - - def set_schema(self, schema): - self.schema = schema - repo = self._repo - # rql st and solution cache. - self._rql_cache = QueryCache(repo.config['rql-cache-size']) - # rql cache key cache. Don't bother using a Cache instance: we should - # have a limited number of queries in there, since there are no entries - # in this cache for user queries (which have no args) - self._rql_ck_cache = {} - # some cache usage stats - self.cache_hit, self.cache_miss = 0, 0 - # rql parsing / analysing helper - self.solutions = repo.vreg.solutions - rqlhelper = repo.vreg.rqlhelper - # set backend on the rql helper, will be used for function checking - rqlhelper.backend = repo.config.system_source_config['db-driver'] - self._parse = rqlhelper.parse - self._annotate = rqlhelper.annotate - # rql planner - self._planner = SSPlanner(schema, rqlhelper) - # sql generation annotator - self.sqlgen_annotate = SQLGenAnnotator(schema).annotate - - def parse(self, rql, annotate=False): - """return a rql syntax tree for the given rql""" - try: - return self._parse(text_type(rql), annotate=annotate) - except UnicodeError: - raise RQLSyntaxError(rql) - - def plan_factory(self, rqlst, args, cnx): - """create an execution plan for an INSERT RQL query""" - if rqlst.TYPE == 'insert': - return InsertPlan(self, rqlst, args, cnx) - return ExecutionPlan(self, rqlst, args, cnx) - - @statsd_timeit - def execute(self, cnx, rql, args=None, build_descr=True): - """execute a rql query, return resulting rows and their description in - a `ResultSet` object - - * `rql` should be a Unicode string or a plain ASCII string - * `args` the optional parameters dictionary associated to the query - * `build_descr` is a boolean flag indicating if the description should - be built on select queries (if false, the description will be en empty - list) - - on INSERT queries, there will be one row with the eid of each inserted - entity - - result for DELETE and SET queries is undefined yet - - to maximize the rql parsing/analyzing cache performance, you should - always use substitute arguments in queries (i.e. avoid query such as - 'Any X WHERE X eid 123'!) - """ - if server.DEBUG & (server.DBG_RQL | server.DBG_SQL): - if server.DEBUG & (server.DBG_MORE | server.DBG_SQL): - print('*'*80) - print('querier input', repr(rql), repr(args)) - # parse the query and binds variables - cachekey = (rql,) - try: - if args: - # search for named args in query which are eids (hence - # influencing query's solutions) - eidkeys = self._rql_ck_cache[rql] - if eidkeys: - # if there are some, we need a better cache key, eg (rql + - # entity type of each eid) - try: - cachekey = self._repo.querier_cache_key(cnx, rql, - args, eidkeys) - except UnknownEid: - # we want queries such as "Any X WHERE X eid 9999" - # return an empty result instead of raising UnknownEid - return empty_rset(rql, args) - rqlst = self._rql_cache[cachekey] - self.cache_hit += 1 - statsd_c('cache_hit') - except KeyError: - self.cache_miss += 1 - statsd_c('cache_miss') - rqlst = self.parse(rql) - try: - # compute solutions for rqlst and return named args in query - # which are eids. Notice that if you may not need `eidkeys`, we - # have to compute solutions anyway (kept as annotation on the - # tree) - eidkeys = self.solutions(cnx, rqlst, args) - except UnknownEid: - # we want queries such as "Any X WHERE X eid 9999" return an - # empty result instead of raising UnknownEid - return empty_rset(rql, args) - if args and rql not in self._rql_ck_cache: - self._rql_ck_cache[rql] = eidkeys - if eidkeys: - cachekey = self._repo.querier_cache_key(cnx, rql, args, - eidkeys) - self._rql_cache[cachekey] = rqlst - if rqlst.TYPE != 'select': - if cnx.read_security: - check_no_password_selected(rqlst) - cachekey = None - else: - if cnx.read_security: - for select in rqlst.children: - check_no_password_selected(select) - check_relations_read_access(cnx, select, args) - # on select query, always copy the cached rqlst so we don't have to - # bother modifying it. This is not necessary on write queries since - # a new syntax tree is built from them. - rqlst = rqlst.copy() - # Rewrite computed relations - rewriter = RQLRelationRewriter(cnx) - rewriter.rewrite(rqlst, args) - self._annotate(rqlst) - if args: - # different SQL generated when some argument is None or not (IS - # NULL). This should be considered when computing sql cache key - cachekey += tuple(sorted([k for k, v in args.items() - if v is None])) - # make an execution plan - plan = self.plan_factory(rqlst, args, cnx) - plan.cache_key = cachekey - self._planner.build_plan(plan) - # execute the plan - try: - results = plan.execute() - except (Unauthorized, ValidationError): - # getting an Unauthorized/ValidationError exception means the - # transaction must be rolled back - # - # notes: - # * we should not reset the connections set here, since we don't want the - # connection to loose it during processing - # * don't rollback if we're in the commit process, will be handled - # by the connection - if cnx.commit_state is None: - cnx.commit_state = 'uncommitable' - raise - # build a description for the results if necessary - descr = () - if build_descr: - if rqlst.TYPE == 'select': - # sample selection - if len(rqlst.children) == 1 and len(rqlst.children[0].solutions) == 1: - # easy, all lines are identical - selected = rqlst.children[0].selection - solution = rqlst.children[0].solutions[0] - description = _make_description(selected, args, solution) - descr = RepeatList(len(results), tuple(description)) - else: - # hard, delegate the work :o) - descr = manual_build_descr(cnx, rqlst, args, results) - elif rqlst.TYPE == 'insert': - # on insert plan, some entities may have been auto-casted, - # so compute description manually even if there is only - # one solution - basedescr = [None] * len(plan.selected) - todetermine = list(zip(range(len(plan.selected)), repeat(False))) - descr = _build_descr(cnx, results, basedescr, todetermine) - # FIXME: get number of affected entities / relations on non - # selection queries ? - # return a result set object - return ResultSet(results, rql, args, descr) - - # these are overridden by set_log_methods below - # only defining here to prevent pylint from complaining - info = warning = error = critical = exception = debug = lambda msg,*a,**kw: None - -from logging import getLogger -from cubicweb import set_log_methods -LOGGER = getLogger('cubicweb.querier') -set_log_methods(QuerierHelper, LOGGER) - - -def manual_build_descr(cnx, rqlst, args, result): - """build a description for a given result by analysing each row - - XXX could probably be done more efficiently during execution of query - """ - # not so easy, looks for variable which changes from one solution - # to another - unstables = rqlst.get_variable_indices() - basedescr = [] - todetermine = [] - for i in range(len(rqlst.children[0].selection)): - ttype = _selection_idx_type(i, rqlst, args) - if ttype is None or ttype == 'Any': - ttype = None - isfinal = True - else: - isfinal = ttype in BASE_TYPES - if ttype is None or i in unstables: - basedescr.append(None) - todetermine.append( (i, isfinal) ) - else: - basedescr.append(ttype) - if not todetermine: - return RepeatList(len(result), tuple(basedescr)) - return _build_descr(cnx, result, basedescr, todetermine) - -def _build_descr(cnx, result, basedescription, todetermine): - description = [] - entity_metas = cnx.entity_metas - todel = [] - for i, row in enumerate(result): - row_descr = basedescription[:] - for index, isfinal in todetermine: - value = row[index] - if value is None: - # None value inserted by an outer join, no type - row_descr[index] = None - continue - if isfinal: - row_descr[index] = etype_from_pyobj(value) - else: - try: - row_descr[index] = entity_metas(value)['type'] - except UnknownEid: - cnx.error('wrong eid %s in repository, you should ' - 'db-check the database' % value) - todel.append(i) - break - else: - description.append(tuple(row_descr)) - for i in reversed(todel): - del result[i] - return description - -def _make_description(selected, args, solution): - """return a description for a result set""" - description = [] - for term in selected: - description.append(term.get_type(solution, args)) - return description - -def _selection_idx_type(i, rqlst, args): - """try to return type of term at index `i` of the rqlst's selection""" - for select in rqlst.children: - term = select.selection[i] - for solution in select.solutions: - try: - ttype = term.get_type(solution, args) - if ttype is not None: - return ttype - except CoercionError: - return None