# HG changeset patch # User Sylvain Thénault # Date 1390324290 -3600 # Node ID 3e7cad3967c5e2bd51c4b1e1482b4c11da6b7930 # Parent 0636c49602594374ae8c6ebd577eab687c287efa [multi-sources-removal] Drop the "true" multi-sources planner Only keep copy-based sources. This will soon allow for greater code simplification. Related to #2919300 diff -r 0636c4960259 -r 3e7cad3967c5 dataimport.py --- a/dataimport.py Thu Jun 27 08:30:46 2013 +0200 +++ b/dataimport.py Tue Jan 21 18:11:30 2014 +0100 @@ -971,7 +971,6 @@ self._storage_handler = self.system_source._storage_handler self.preprocess_entity = self.system_source.preprocess_entity self.sqlgen = self.system_source.sqlgen - self.copy_based_source = self.system_source.copy_based_source self.uri = self.system_source.uri self.eid = self.system_source.eid # Directory to write temporary files @@ -1125,9 +1124,8 @@ if extid is not None: assert isinstance(extid, str) extid = b64encode(extid) - uri = 'system' if source.copy_based_source else source.uri attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, - 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} + 'source': 'system', 'asource': source.uri, 'mtime': datetime.utcnow()} self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs) # insert core relations: is, is_instance_of and cw_source try: diff -r 0636c4960259 -r 3e7cad3967c5 devtools/repotest.py --- a/devtools/repotest.py Thu Jun 27 08:30:46 2013 +0200 +++ b/devtools/repotest.py Tue Jan 21 18:11:30 2014 +0100 @@ -382,16 +382,6 @@ return _orig_select_principal(scope, relations, _sort=lambda rels: sorted(rels, key=sort_key)) -try: - from cubicweb.server.msplanner import PartPlanInformation -except ImportError: - class PartPlanInformation(object): - def merge_input_maps(self, *args, **kwargs): - pass - def _choose_term(self, sourceterms): - pass -_orig_merge_input_maps = PartPlanInformation.merge_input_maps -_orig_choose_term = PartPlanInformation._choose_term def _merge_input_maps(*args, **kwargs): return sorted(_orig_merge_input_maps(*args, **kwargs)) @@ -411,7 +401,6 @@ return x.value return _orig_choose_term(self, source, DumbOrderedDict2(sourceterms, get_key)) - def _ordered_iter_relations(stinfo): return sorted(_orig_iter_relations(stinfo), key=lambda x:x.r_type) @@ -422,8 +411,6 @@ ExecutionPlan._check_permissions = _check_permissions ExecutionPlan.tablesinorder = None ExecutionPlan.init_temp_table = _init_temp_table - PartPlanInformation.merge_input_maps = _merge_input_maps - PartPlanInformation._choose_term = _choose_term def undo_monkey_patch(): rqlrewrite.iter_relations = _orig_iter_relations @@ -431,5 +418,3 @@ rqlrewrite.RQLRewriter.build_variantes = _orig_build_variantes ExecutionPlan._check_permissions = _orig_check_permissions ExecutionPlan.init_temp_table = _orig_init_temp_table - PartPlanInformation.merge_input_maps = _orig_merge_input_maps - PartPlanInformation._choose_term = _orig_choose_term diff -r 0636c4960259 -r 3e7cad3967c5 hooks/__init__.py --- a/hooks/__init__.py Thu Jun 27 08:30:46 2013 +0200 +++ b/hooks/__init__.py Tue Jan 21 18:11:30 2014 +0100 @@ -59,7 +59,7 @@ def update_feeds(repo): # don't iter on repo.sources which doesn't include copy based # sources (the one we're looking for) - # take a list to avoid iterating on a dictionary which size may + # take a list to avoid iterating on a dictionary whose size may # change for source in list(repo.sources_by_eid.values()): if (not source.copy_based_source @@ -83,8 +83,8 @@ def __call__(self): def expire_dataimports(repo=self.repo): - for source in repo.sources_by_eid.itervalues(): - if (not source.copy_based_source + for uri, source in repo.sources_by_uri.iteritems(): + if (uri == 'system' or not repo.config.source_enabled(source)): continue session = repo.internal_session() diff -r 0636c4960259 -r 3e7cad3967c5 hooks/metadata.py --- a/hooks/metadata.py Thu Jun 27 08:30:46 2013 +0200 +++ b/hooks/metadata.py Tue Jan 21 18:11:30 2014 +0100 @@ -159,11 +159,8 @@ extid = entity.cw_metainformation()['extid'] repo._type_source_cache[entity.eid] = ( entity.cw_etype, self.newsource.uri, None, self.newsource.uri) - if self.oldsource.copy_based_source: - uri = 'system' - else: - uri = self.oldsource.uri - repo._extid_cache[(extid, uri)] = -entity.eid + repo._extid_cache[(extid, 'system')] = -entity.eid + class ChangeEntitySourceDeleteHook(MetaDataHook): """support for moving an entity from an external source by watching 'Any @@ -197,16 +194,6 @@ syssource = newsource.repo_source oldsource = self._cw.entity_from_eid(schange[self.eidfrom]) entity = self._cw.entity_from_eid(self.eidfrom) - # copy entity if necessary - if not oldsource.repo_source.copy_based_source: - entity.complete(skip_bytes=False, skip_pwd=False) - if not entity.creation_date: - entity.cw_attr_cache['creation_date'] = datetime.now() - if not entity.modification_date: - entity.cw_attr_cache['modification_date'] = datetime.now() - entity.cw_attr_cache['cwuri'] = u'%s%s' % (self._cw.base_url(), entity.eid) - entity.cw_edited = EditedEntity(entity, **entity.cw_attr_cache) - syssource.add_entity(self._cw, entity) # we don't want the moved entity to be reimported later. To # distinguish this state, the trick is to change the associated # record in the 'entities' system table with eid=-eid while leaving diff -r 0636c4960259 -r 3e7cad3967c5 hooks/syncsources.py --- a/hooks/syncsources.py Thu Jun 27 08:30:46 2013 +0200 +++ b/hooks/syncsources.py Tue Jan 21 18:11:30 2014 +0100 @@ -93,10 +93,7 @@ def precommit_event(self): source = self.session.repo.sources_by_uri[self.oldname] - if source.copy_based_source: - sql = 'UPDATE entities SET asource=%(newname)s WHERE asource=%(oldname)s' - else: - sql = 'UPDATE entities SET source=%(newname)s, asource=%(newname)s WHERE source=%(oldname)s' + sql = 'UPDATE entities SET asource=%(newname)s WHERE asource=%(oldname)s' self.session.system_sql(sql, {'oldname': self.oldname, 'newname': self.newname}) @@ -109,11 +106,6 @@ repo.sources_by_uri[self.newname] = source repo._type_source_cache.clear() clear_cache(repo, 'source_defs') - if not source.copy_based_source: - repo._extid_cache.clear() - repo._clear_planning_caches() - for cnxset in repo.cnxsets: - cnxset.source_cnxs[self.oldname] = cnxset.source_cnxs.pop(self.oldname) class SourceUpdatedHook(SourceHook): diff -r 0636c4960259 -r 3e7cad3967c5 misc/scripts/drop_external_entities.py --- a/misc/scripts/drop_external_entities.py Thu Jun 27 08:30:46 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,23 +0,0 @@ -from cubicweb import UnknownEid -source, = __args__ - -sql("DELETE FROM entities WHERE type='Int'") - -ecnx = session.cnxset.connection(source) -for e in rql('Any X WHERE X cw_source S, S name %(name)s', {'name': source}).entities(): - meta = e.cw_metainformation() - assert meta['source']['uri'] == source - try: - suri = ecnx.describe(meta['extid'])[1] - except UnknownEid: - print 'cant describe', e.cw_etype, e.eid, meta - continue - if suri != 'system': - try: - print 'deleting', e.cw_etype, e.eid, suri, e.dc_title().encode('utf8') - repo.delete_info(session, e, suri, scleanup=e.eid) - except UnknownEid: - print ' cant delete', e.cw_etype, e.eid, meta - - -commit() diff -r 0636c4960259 -r 3e7cad3967c5 server/msplanner.py --- a/server/msplanner.py Thu Jun 27 08:30:46 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1822 +0,0 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""plan execution of rql queries on multiple sources - -the best way to understand what are we trying to acheive here is to read the -unit-tests in unittest_msplanner.py - - -What you need to know -~~~~~~~~~~~~~~~~~~~~~ -1. The system source is expected to support every entity and relation types - -2. Given "X relation Y": - - * if relation, X and Y types are supported by the external source, we suppose - by default that X and Y should both come from the same source as the - relation. You can specify otherwise by adding relation into the - "cross_relations" set in the source's mapping file and it that case, we'll - consider that we can also find in the system source some relation between - X and Y coming from different sources. - - * if "relation" isn't supported by the external source but X or Y - types (or both) are, we suppose by default that can find in the system - source some relation where X and/or Y come from the external source. You - can specify otherwise by adding relation into the "dont_cross_relations" - set in the source's mapping file and it that case, we'll consider that we - can only find in the system source some relation between X and Y coming - the system source. - - -Implementation -~~~~~~~~~~~~~~ -XXX explain algorithm - - -Exemples of multi-sources query execution -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -For a system source and a ldap user source (only CWUser and its attributes -is supported, no group or such): - -:CWUser X: -1. fetch CWUser X from both sources and return concatenation of results - -:CWUser X WHERE X in_group G, G name 'users': -* catch 1 - 1. fetch CWUser X from both sources, store concatenation of results into a - temporary table - 2. return the result of TMP X WHERE X in_group G, G name 'users' from the - system source -* catch 2 - 1. return the result of CWUser X WHERE X in_group G, G name 'users' from system - source, that's enough (optimization of the sql querier will avoid join on - CWUser, so we will directly get local eids) - -:CWUser X,L WHERE X in_group G, X login L, G name 'users': -1. fetch Any X,L WHERE X is CWUser, X login L from both sources, store - concatenation of results into a temporary table -2. return the result of Any X, L WHERE X is TMP, X login LX in_group G, - G name 'users' from the system source - - -:Any X WHERE X owned_by Y: -* catch 1 - 1. fetch CWUser X from both sources, store concatenation of results into a - temporary table - 2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system - source -* catch 2 - 1. return the result of Any X WHERE X owned_by Y from system source, that's - enough (optimization of the sql querier will avoid join on CWUser, so we - will directly get local eids) -""" - -__docformat__ = "restructuredtext en" - -from itertools import imap, ifilterfalse - -from logilab.common.compat import any -from logilab.common.decorators import cached -from logilab.common.deprecation import deprecated - -from rql import BadRQLQuery -from rql.stmts import Union, Select -from rql.nodes import (VariableRef, Comparison, Relation, Constant, Variable, - Not, Exists, SortTerm, Function) - -from cubicweb import server -from cubicweb.utils import make_uid -from cubicweb.rqlrewrite import add_types_restriction, cleanup_solutions -from cubicweb.server.ssplanner import SSPlanner, OneFetchStep -from cubicweb.server.mssteps import * - -Variable._ms_table_key = lambda x: x.name -Relation._ms_table_key = lambda x: x.r_type -# str() Constant.value to ensure generated table name won't be unicode -Constant._ms_table_key = lambda x: str(x.value) - -Variable._ms_may_be_processed = lambda x, terms, linkedterms: any( - t for t in terms if t in linkedterms.get(x, ())) -Relation._ms_may_be_processed = lambda x, terms, linkedterms: all( - getattr(hs, 'variable', hs) in terms for hs in x.get_variable_parts()) - -def ms_scope(term): - rel = None - scope = term.scope - if isinstance(term, Variable) and len(term.stinfo['relations']) == 1: - rel = iter(term.stinfo['relations']).next().relation() - elif isinstance(term, Constant): - rel = term.relation() - elif isinstance(term, Relation): - rel = term - if rel is not None and ( - rel.r_type != 'identity' and rel.scope is scope - and isinstance(rel.parent, Exists) and rel.parent.neged(strict=True)): - return scope.parent.scope - return scope - -def need_intersect(select, getrschema): - for rel in select.iget_nodes(Relation): - if isinstance(rel.parent, Exists) and rel.parent.neged(strict=True) and not rel.is_types_restriction(): - rschema = getrschema(rel.r_type) - if not rschema.final: - # if one of the relation's variable is ambiguous but not - # invariant, an intersection will be necessary - for vref in rel.get_nodes(VariableRef): - var = vref.variable - if (var.valuable_references() == 1 - and len(var.stinfo['possibletypes']) > 1): - return True - return False - -def neged_relation(rel): - parent = rel.parent - return isinstance(parent, Not) or (isinstance(parent, Exists) and - isinstance(parent.parent, Not)) - -def need_source_access_relation(vargraph): - if not vargraph: - return False - # check vargraph contains some other relation than the identity relation - # test of key nature since it may be a variable name (don't care about that) - # or a 2-uple (var1, var2) associated to the relation to traverse to go from - # var1 to var2 - return any(key for key, val in vargraph.iteritems() - if isinstance(key, tuple) and val != 'identity') - -def need_aggr_step(select, sources, stepdefs=None): - """return True if a temporary table is necessary to store some partial - results to execute the given query - """ - if len(sources) == 1: - # can do everything at once with a single source - return False - if select.orderby or select.groupby or select.has_aggregat: - # if more than one source, we need a temp table to deal with sort / - # groups / aggregat if : - # * the rqlst won't be splitted (in the other case the last query - # using partial temporary table can do sort/groups/aggregat without - # the need for a later AggrStep) - # * the rqlst is splitted in multiple steps and there are more than one - # final step - if stepdefs is None: - return True - has_one_final = False - fstepsolindices = set() - for stepdef in stepdefs: - if stepdef[-1]: - if has_one_final or frozenset(stepdef[2]) != fstepsolindices: - return True - has_one_final = True - else: - fstepsolindices.update(stepdef[2]) - return False - -def select_group_sort(select): # XXX something similar done in rql2sql - # add variables used in groups and sort terms to the selection - # if necessary - if select.groupby: - for vref in select.groupby: - if not vref in select.selection: - select.append_selected(vref.copy(select)) - for sortterm in select.orderby: - for vref in sortterm.iget_nodes(VariableRef): - if not vref in select.get_selected_variables(): - # we can't directly insert sortterm.term because it references - # a variable of the select before the copy. - # XXX if constant term are used to define sort, their value - # may necessite a decay - select.append_selected(vref.copy(select)) - if select.groupby and not vref in select.groupby: - select.add_group_var(vref.copy(select)) - -def allequals(solutions): - """return true if all solutions are identical""" - sol = solutions.next() - noconstsol = None - for sol_ in solutions: - if sol_ != sol: - return False - return True - -# XXX move functions below to rql ############################################## - -def is_ancestor(n1, n2): - """return True if n2 is a parent scope of n1""" - p = n1.parent - while p is not None: - if p is n2: - return True - p = p.parent - return False - -def copy_node(newroot, node, subparts=()): - newnode = node.__class__(*node.initargs(newroot)) - for part in subparts: - newnode.append(part) - return newnode - -def used_in_outer_scope(var, scope): - """return true if the variable is used in an outer scope of the given scope - """ - for rel in var.stinfo['relations']: - rscope = ms_scope(rel) - if not rscope is scope and is_ancestor(scope, rscope): - return True - return False - -################################################################################ - -class PartPlanInformation(object): - """regroups necessary information to execute some part of a "global" rql - query ("global" means as received by the querier, which may result in - several internal queries, e.g. parts, due to security insertions). Actually - a PPI is created for each subquery and for each query in a union. - - It exposes as well some methods helping in executing this part on a - multi-sources repository, modifying its internal structure during the - process. - - :attr plan: - the execution plan - :attr rqlst: - the original rql syntax tree handled by this part - - :attr needsplit: - bool telling if the query has to be split into multiple steps for - execution or if it can be executed at once - - :attr temptable: - a SQL temporary table name or None, if necessary to handle aggregate / - sorting for this part of the query - - :attr finaltable: - a SQL table name or None, if results for this part of the query should be - written into a temporary table (usually shared by multiple PPI) - - :attr sourcesterms: - a dictionary {source : {term: set([solution index, ])}} telling for each - source which terms are supported for which solutions. A "term" may be - either a rql Variable, Constant or Relation node. - """ - def __init__(self, plan, rqlst, rqlhelper=None): - self.plan = plan - self.rqlst = rqlst - self.needsplit = False - self.temptable = None - self.finaltable = None - # shortcuts - self._schema = plan.schema - self._session = plan.session - self._repo = self._session.repo - self._solutions = rqlst.solutions - self._solindices = range(len(self._solutions)) - self.system_source = self._repo.system_source - # source : {term: [solution index, ]} - self.sourcesterms = self._sourcesterms = {} - # source : {relation: set(child variable and constant)} - self._crossrelations = {} - # term : set(sources) - self._discarded_sources = {} - # dictionary of variables and constants which are linked to each other - # using a non final relation supported by multiple sources (crossed or - # not). - self._linkedterms = {} - # processing - termssources = self._compute_sourcesterms() - self._remove_invalid_sources(termssources) - self._compute_needsplit() - # after initialisation, .sourcesterms contains the same thing as - # ._sourcesterms though during plan construction, ._sourcesterms will - # be modified while .sourcesterms will be kept unmodified - self.sourcesterms = {} - for k, v in self._sourcesterms.iteritems(): - self.sourcesterms[k] = {} - for k2, v2 in v.iteritems(): - self.sourcesterms[k][k2] = v2.copy() - # cleanup linked var - for var, linkedrelsinfo in self._linkedterms.iteritems(): - self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo) - # map output of a step to input of a following step - self._inputmaps = {} - # record input map conflicts to resolve them on final step generation - self._conflicts = [] - if rqlhelper is not None: # else test - self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional - if server.DEBUG & server.DBG_MS: - print 'sourcesterms:' - self._debug_sourcesterms() - - def _debug_sourcesterms(self): - for source in self._sourcesterms: - print '-', source - for term, sols in self._sourcesterms[source].items(): - print ' -', term, id(term), ':', sols - - def copy_solutions(self, solindices): - return [self._solutions[solidx].copy() for solidx in solindices] - - @property - @cached - def part_sources(self): - if self._sourcesterms: - return tuple(sorted(self._sourcesterms)) - return (self.system_source,) - - @property - @cached - def _sys_source_set(self): - return frozenset((self.system_source, solindex) - for solindex in self._solindices) - - @cached - def _norel_support_set(self, relation): - """return a set of (source, solindex) where source doesn't support the - relation - """ - return frozenset((source, solidx) for source in self._repo.sources - for solidx in self._solindices - if not ((source.support_relation(relation.r_type)) - or relation.r_type in source.dont_cross_relations)) - - def _compute_sourcesterms(self): - """compute for each term (variable, rewritten constant, relation) and - for each solution in the rqlst which sources support them - """ - repo = self._repo - eschema = self._schema.eschema - sourcesterms = self._sourcesterms - # find for each source which variable/solution are supported - for varname, varobj in self.rqlst.defined_vars.items(): - # if variable has an eid specified, we can get its source directly - # NOTE: use uidrel and not constnode to deal with "X eid IN(1,2,3,4)" - if varobj.stinfo['uidrel'] is not None: - rel = varobj.stinfo['uidrel'] - hasrel = len(varobj.stinfo['relations']) > 1 - for const in rel.children[1].get_nodes(Constant): - eid = const.eval(self.plan.args) - source = self._session.source_from_eid(eid) - if (source is self.system_source - or (hasrel and varobj._q_invariant and - not any(source.support_relation(r.r_type) - for r in varobj.stinfo['relations'] - if not r is rel))): - self._set_source_for_term(self.system_source, varobj) - else: - self._set_source_for_term(source, varobj) - continue - rels = varobj.stinfo['relations'] - if not rels and varobj.stinfo['typerel'] is None: - # (rare) case where the variable has no type specified nor - # relation accessed ex. "Any MAX(X)" - self._set_source_for_term(self.system_source, varobj) - continue - for i, sol in enumerate(self._solutions): - vartype = sol[varname] - # skip final variable - if eschema(vartype).final: - break - for source in repo.sources: - if source.support_entity(vartype): - # the source support the entity type, though we will - # actually have to fetch from it only if - # * the variable isn't invariant - # * at least one supported relation specified - if not varobj._q_invariant or \ - any(imap(source.support_relation, - (r.r_type for r in rels if r.r_type not in ('identity', 'eid')))): - sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i) - # if variable is not invariant and is used by a relation - # not supported by this source, we'll have to split the - # query - if not varobj._q_invariant and any(ifilterfalse( - source.support_relation, (r.r_type for r in rels))): - self.needsplit = True - # add source for rewritten constants to sourcesterms - self._const_vars = {} - for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): - # remember those consts come from the same variable - for const in vconsts: - self._const_vars[const] = vconsts - source = self._session.source_from_eid(const.eval(self.plan.args)) - if source is self.system_source: - for const in vconsts: - self._set_source_for_term(source, const) - elif not self._sourcesterms: - for const in vconsts: - self._set_source_for_term(source, const) - elif source in self._sourcesterms: - source_scopes = frozenset(ms_scope(t) for t in self._sourcesterms[source]) - for const in vconsts: - if ms_scope(const) in source_scopes: - self._set_source_for_term(source, const) - # if system source is used, add every rewritten constant - # to its supported terms even when associated entity - # doesn't actually come from it so we get a changes that - # allequals will return True as expected when computing - # needsplit - # check const is used in a relation restriction - if const.relation() and self.system_source in sourcesterms: - self._set_source_for_term(self.system_source, const) - # add source for relations - rschema = self._schema.rschema - termssources = {} - sourcerels = [] - for rel in self.rqlst.iget_nodes(Relation): - # process non final relations only - # note: don't try to get schema for 'is' relation (not available - # during bootstrap) - if rel.r_type == 'cw_source': - sourcerels.append(rel) - if not (rel.is_types_restriction() or rschema(rel.r_type).final): - # nothing to do if relation is not supported by multiple sources - # or if some source has it listed in its cross_relations - # attribute - # - # XXX code below don't deal if some source allow relation - # crossing but not another one - relsources = [s for s in repo.rel_type_sources(rel.r_type) - if s is self.system_source - or s in self._sourcesterms] - if len(relsources) < 2: - # filter out sources being there because they have this - # relation in their dont_cross_relations attribute - relsources = [source for source in relsources - if source.support_relation(rel.r_type)] - if relsources: - # this means the relation is using a variable inlined as - # a constant and another unsupported variable, in which - # case we put the relation in sourcesterms - self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices) - continue - lhs, rhs = rel.get_variable_parts() - lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs) - # update dictionary of sources supporting lhs and rhs vars - if not lhsv in termssources: - termssources[lhsv] = self._term_sources(lhs) - if not rhsv in termssources: - termssources[rhsv] = self._term_sources(rhs) - self._handle_cross_relation(rel, relsources, termssources) - self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel)) - self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel)) - # extract information from cw_source relation - for srel in sourcerels: - vref = srel.children[1].children[0] - sourceeids, sourcenames = [], [] - if isinstance(vref, Constant): - # simplified variable - sourceeids = None, (vref.eval(self.plan.args),) - var = vref - else: - var = vref.variable - for rel in var.stinfo['relations'] - var.stinfo['rhsrelations']: - # skip neged eid relation since it's the kind of query - # generated when clearing old value of '?1" relation, - # cw_source included. See - # unittest_ldapuser.test_copy_to_system_source - if rel.r_type == 'name' or \ - (rel.r_type == 'eid' and not rel.neged(strict=True)): - if rel.r_type == 'eid': - slist = sourceeids - else: - slist = sourcenames - sources = [cst.eval(self.plan.args) - for cst in rel.children[1].get_nodes(Constant)] - if sources: - if slist: - # don't attempt to do anything - sourcenames = sourceeids = None - break - slist[:] = (rel, sources) - if sourceeids: - rel, values = sourceeids - sourcesdict = self._repo.sources_by_eid - elif sourcenames: - rel, values = sourcenames - sourcesdict = self._repo.sources_by_uri - else: - sourcesdict = None - if sourcesdict is not None: - lhs = srel.children[0] - try: - sources = [sourcesdict[key] for key in values] - except KeyError: - raise BadRQLQuery('source conflict for term %s' % lhs.as_string()) - if isinstance(lhs, Constant): - source = self._session.source_from_eid(lhs.eval(self.plan.args)) - if not source in sources: - raise BadRQLQuery('source conflict for term %s' % lhs.as_string()) - else: - lhs = getattr(lhs, 'variable', lhs) - invariant = getattr(lhs, '_q_invariant', False) - # XXX NOT NOT - neged = srel.neged(traverse_scope=True) or (rel and rel.neged(strict=True)) - has_copy_based_source = False - sources_ = [] - for source in sources: - if source.copy_based_source: - has_copy_based_source = True - if not self.system_source in sources_: - sources_.append(self.system_source) - else: - sources_.append(source) - sources = sources_ - if neged: - for source in sources: - if invariant and source is self.system_source: - continue - self._remove_source_term(source, lhs) - self._discarded_sources.setdefault(lhs, set()).add(source) - usesys = self.system_source not in sources - else: - for source, terms in sourcesterms.items(): - if lhs in terms and not source in sources: - if invariant and source is self.system_source: - continue - self._remove_source_term(source, lhs) - self._discarded_sources.setdefault(lhs, set()).add(source) - usesys = self.system_source in sources - if rel is None or (len(var.stinfo['relations']) == 2 and - not var.stinfo['selected']): - self._remove_source_term(self.system_source, var) - if not (has_copy_based_source or len(sources) > 1 - or usesys or invariant): - if rel is None: - srel.parent.remove(srel) - else: - self.rqlst.undefine_variable(var) - self._remove_source_term(self.system_source, srel) - return termssources - - def _handle_cross_relation(self, rel, relsources, termssources): - for source in relsources: - if rel.r_type in source.cross_relations: - ssource = self.system_source - crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) - for const in rel.get_nodes(Constant): - if source.uri != 'system' and not const in self._sourcesterms.get(source, ()): - continue - crossvars.add(const) - self._crossrelations.setdefault(source, {})[rel] = crossvars - if len(crossvars) < 2: - # this means there is a constant in the relation which is - # not supported by the source, so we can stop here - continue - self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices) - solindices = None - for term in crossvars: - if len(termssources[term]) == 1 and iter(termssources[term]).next()[0].uri == 'system': - for ov in crossvars: - if ov is not term and (isinstance(ov, Constant) or ov._q_invariant): - ssset = frozenset((ssource,)) - self._remove_sources(ov, termssources[ov] - ssset) - break - if solindices is None: - solindices = set(sol for s, sol in termssources[term] - if s is source) - else: - solindices &= set(sol for s, sol in termssources[term] - if s is source) - else: - self._sourcesterms.setdefault(source, {})[rel] = solindices - - def _remove_invalid_sources(self, termssources): - """removes invalid sources from `sourcesterms` member according to - traversed relations and their properties (which sources support them, - can they cross sources, etc...) - """ - for term in self._linkedterms: - self._remove_sources_until_stable(term, termssources) - if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'): - # the querier doesn't annotate write queries, need to do it here - self.plan.annotate_rqlst() - # insert/update/delete queries, we may get extra information from - # the main relation (eg relations to the left of the WHERE - if self.plan.rqlst.TYPE == 'insert': - inserted = dict((vref.variable, etype) - for etype, vref in self.plan.rqlst.main_variables) - else: - inserted = {} - repo = self._repo - rschema = self._schema.rschema - for rel in self.plan.rqlst.main_relations: - if not rschema(rel.r_type).final: - # nothing to do if relation is not supported by multiple sources - if len(repo.rel_type_sources(rel.r_type)) < 2: - continue - lhs, rhs = rel.get_variable_parts() - try: - lhsv = self._extern_term(lhs, termssources, inserted) - rhsv = self._extern_term(rhs, termssources, inserted) - except KeyError: - continue - self._remove_term_sources(lhsv, rel, rhsv, termssources) - self._remove_term_sources(rhsv, rel, lhsv, termssources) - - def _extern_term(self, term, termssources, inserted): - var = term.variable - if var.stinfo['constnode']: - termv = var.stinfo['constnode'] - termssources[termv] = self._term_sources(termv) - elif var in inserted: - termv = var - source = self._repo.locate_etype_source(inserted[var]) - termssources[termv] = set((source, solindex) - for solindex in self._solindices) - else: - termv = self.rqlst.defined_vars[var.name] - if not termv in termssources: - termssources[termv] = self._term_sources(termv) - return termv - - def _remove_sources_until_stable(self, term, termssources): - sourcesterms = self._sourcesterms - for oterm, rel in self._linkedterms.get(term, ()): - tscope = ms_scope(term) - otscope = ms_scope(oterm) - rscope = ms_scope(rel) - if not tscope is otscope and rscope.neged(strict=True): - # can't get information from relation inside a NOT exists - # where terms don't belong to the same scope - continue - need_ancestor_scope = False - if not (tscope is rscope and otscope is rscope): - if rel.ored(): - continue - if rel.ored(traverse_scope=True): - # if relation has some OR as parent, constraints should only - # propagate from parent scope to child scope, nothing else - need_ancestor_scope = True - relsources = self._repo.rel_type_sources(rel.r_type) - if neged_relation(rel) and ( - len(relsources) < 2 - or not isinstance(oterm, Variable) - or oterm.valuable_references() != 1 - or any(sourcesterms[source][term] != sourcesterms[source][oterm] - for source in relsources - if term in sourcesterms.get(source, ()) - and oterm in sourcesterms.get(source, ()))): - # neged relation doesn't allow to infer term sources unless - # we're on a multisource relation for a term only used by this - # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is - # Y) - continue - # compute invalid sources for terms and remove them - if not need_ancestor_scope or is_ancestor(tscope, otscope): - self._remove_term_sources(term, rel, oterm, termssources) - if not need_ancestor_scope or is_ancestor(otscope, tscope): - self._remove_term_sources(oterm, rel, term, termssources) - - def _remove_term_sources(self, term, rel, oterm, termssources): - """remove invalid sources for term according to oterm's sources and the - relation between those two terms. - """ - norelsup = self._norel_support_set(rel) - termsources = termssources[term] - invalid_sources = termsources - (termssources[oterm] | norelsup) - if invalid_sources and self._repo.can_cross_relation(rel.r_type): - invalid_sources -= self._sys_source_set - if invalid_sources and isinstance(term, Variable) \ - and self._need_ext_source_access(term, rel): - # if the term is a not invariant variable, we should filter out - # source where the relation is a cross relation from invalid - # sources - invalid_sources = frozenset((s, solidx) for s, solidx in invalid_sources - if not (s in self._crossrelations and - rel in self._crossrelations[s])) - if invalid_sources: - self._remove_sources(term, invalid_sources) - discarded = self._discarded_sources.get(term) - if discarded is not None and not any(x[0] for x in (termsources-invalid_sources) - if not x[0] in discarded): - raise BadRQLQuery('relation %s cant be crossed but %s and %s should ' - 'come from difference sources' % - (rel.r_type, term.as_string(), oterm.as_string())) - # if term is a rewritten const, we can apply the same changes to - # all other consts inserted from the same original variable - for const in self._const_vars.get(term, ()): - if const is not term: - self._remove_sources(const, invalid_sources) - termsources -= invalid_sources - self._remove_sources_until_stable(term, termssources) - if isinstance(oterm, Constant): - self._remove_sources(oterm, invalid_sources) - - def _compute_needsplit(self): - """tell according to sourcesterms if the rqlst has to be splitted for - execution among multiple sources - - the execution has to be split if - * a source support an entity (non invariant) but doesn't support a - relation on it - * a source support an entity which is accessed by an optional relation - * there is more than one source and either all sources'supported - variable/solutions are not equivalent or multiple variables have to - be fetched from some source - """ - # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2 - if len(self._sourcesterms) < 2: - self.needsplit = False - # if this is not the system source but we have only constant terms - # and no relation (other than eid), apply query on the system source - # - # testing for rqlst with nothing in vargraph nor defined_vars is the - # simplest way the check the condition explained below - if not self.system_source in self._sourcesterms and \ - not self.rqlst.defined_vars and \ - not need_source_access_relation(self.rqlst.vargraph): - self._sourcesterms = {self.system_source: {}} - elif not self.needsplit: - if not allequals(self._sourcesterms.itervalues()): - for source, terms in self._sourcesterms.iteritems(): - if source is self.system_source: - continue - if any(x for x in terms if not isinstance(x, Constant)): - self.needsplit = True - return - self._sourcesterms = {self.system_source: {}} - self.needsplit = False - else: - sample = self._sourcesterms.itervalues().next() - if len(sample) > 1: - for term in sample: - # need split if unlinked variable - if isinstance(term, Variable) and not term in self._linkedterms: - self.needsplit = True - break - else: - # need split if there are some cross relation on non - # invariant variable or if the variable is used in - # multi-sources relation - if self._crossrelations: - for reldict in self._crossrelations.itervalues(): - for rel, terms in reldict.iteritems(): - for term in terms: - if isinstance(term, Variable) \ - and self._need_ext_source_access(term, rel): - self.needsplit = True - return - else: - # remove sources only accessing to constant nodes - for source, terms in self._sourcesterms.items(): - if source is self.system_source: - continue - if not any(x for x in terms if not isinstance(x, Constant)): - del self._sourcesterms[source] - if len(self._sourcesterms) < 2: - self.needsplit = False - - @cached - def _need_ext_source_access(self, var, rel): - if not var._q_invariant: - return True - if any(r for x, r in self._linkedterms[var] - if not r is rel and self._repo.is_multi_sources_relation(r.r_type)): - return True - return False - - def _set_source_for_term(self, source, term): - self._sourcesterms.setdefault(source, {})[term] = set(self._solindices) - - def _term_sources(self, term): - """returns possible sources for terms `term`""" - if isinstance(term, Constant): - source = self._session.source_from_eid(term.eval(self.plan.args)) - return set((source, solindex) for solindex in self._solindices) - else: - var = getattr(term, 'variable', term) - sources = [source for source, varobjs in self.sourcesterms.iteritems() - if var in varobjs] - return set((source, solindex) for source in sources - for solindex in self.sourcesterms[source][var]) - - def _remove_sources(self, term, sources): - """removes invalid sources (`sources`) from `sourcesterms` - - :param sources: the list of sources to remove - :param term: the analyzed term - """ - sourcesterms = self._sourcesterms - for source, solindex in sources: - try: - sourcesterms[source][term].remove(solindex) - except KeyError: - import rql.base as rqlb - assert isinstance(term, (rqlb.BaseNode, Variable)), repr(term) - continue # may occur with subquery column alias - if not sourcesterms[source][term]: - self._remove_source_term(source, term) - - def _remove_source_term(self, source, term): - try: - poped = self._sourcesterms[source].pop(term, None) - except KeyError: - pass - else: - if not self._sourcesterms[source]: - del self._sourcesterms[source] - - def crossed_relation(self, source, relation): - return relation in self._crossrelations.get(source, ()) - - def part_steps(self): - """precompute necessary part steps before generating actual rql for - each step. This is necessary to know if an aggregate step will be - necessary or not. - """ - steps = [] - select = self.rqlst - rschema = self._schema.rschema - for source in self.part_sources: - try: - sourceterms = self._sourcesterms[source] - except KeyError: - continue # already proceed - while sourceterms: - # take a term randomly, and all terms supporting the - # same solutions - term, solindices = self._choose_term(source, sourceterms) - if source.uri == 'system': - # ensure all variables are available for the latest step - # (missing one will be available from temporary tables - # of previous steps) - scope = select - terms = scope.defined_vars.values() + scope.aliases.values() - sourceterms.clear() - sources = [source] - else: - scope = ms_scope(term) - # find which sources support the same term and solutions - sources = self._expand_sources(source, term, solindices) - # no try to get as much terms as possible - terms = self._expand_terms(term, sources, sourceterms, - scope, solindices) - if len(terms) == 1 and isinstance(terms[0], Constant): - # we can't generate anything interesting with a single - # constant term (will generate an empty "Any" query), - # go to the next iteration directly! - continue - if not sourceterms: - try: - del self._sourcesterms[source] - except KeyError: - # XXX already cleaned - pass - # set of terms which should be additionaly selected when - # possible - needsel = set() - if not self._sourcesterms and scope is select: - terms += scope.defined_vars.values() + scope.aliases.values() - if isinstance(term, Relation) and len(sources) > 1: - variants = set() - partterms = [term] - for vref in term.get_nodes(VariableRef): - if not vref.variable._q_invariant: - variants.add(vref.name) - if len(variants) == 2: - # we need an extra-step to fetch relations from each source - # before a join with prefetched inputs - # (see test_crossed_relation_noeid_needattr in - # unittest_msplanner / unittest_multisources) - lhs, rhs = term.get_variable_parts() - steps.append( (sources, [term, getattr(lhs, 'variable', lhs), - getattr(rhs, 'variable', rhs)], - solindices, scope, variants, False) ) - sources = [self.system_source] - final = True - else: - # suppose this is a final step until the contrary is proven - final = scope is select - # add attribute variables and mark variables which should be - # additionaly selected when possible - for var in select.defined_vars.itervalues(): - if not var in terms: - stinfo = var.stinfo - for ovar, rtype in stinfo.get('attrvars', ()): - if ovar in terms: - needsel.add(var.name) - terms.append(var) - break - else: - needsel.add(var.name) - final = False - # check all relations are supported by the sources - for rel in scope.iget_nodes(Relation): - if rel.is_types_restriction(): - continue - # take care not overwriting the existing "source" identifier - for _source in sources: - if not _source.support_relation(rel.r_type) or ( - self.crossed_relation(_source, rel) and not rel in terms): - for vref in rel.iget_nodes(VariableRef): - needsel.add(vref.name) - final = False - break - else: - if not scope is select: - self._exists_relation(rel, terms, needsel, source) - # if relation is supported by all sources and some of - # its lhs/rhs variable isn't in "terms", and the - # other end *is* in "terms", mark it have to be - # selected - if source.uri != 'system' and not rschema(rel.r_type).final: - lhs, rhs = rel.get_variable_parts() - try: - lhsvar = lhs.variable - except AttributeError: - lhsvar = lhs - try: - rhsvar = rhs.variable - except AttributeError: - rhsvar = rhs - try: - if lhsvar in terms and not rhsvar in terms: - needsel.add(lhsvar.name) - elif rhsvar in terms and not lhsvar in terms: - needsel.add(rhsvar.name) - except AttributeError: - continue # not an attribute, no selection needed - if final and source.uri != 'system': - # check rewritten constants - for vconsts in select.stinfo['rewritten'].itervalues(): - const = vconsts[0] - eid = const.eval(self.plan.args) - _source = self._session.source_from_eid(eid) - if len(sources) > 1 or not _source in sources: - # if there is some rewriten constant used by a not - # neged relation while there are some source not - # supporting the associated entity, this step can't - # be final (unless the relation is explicitly in - # `terms`, eg cross relations) - for c in vconsts: - rel = c.relation() - if rel is None or not (rel in terms or neged_relation(rel)): - final = False - break - break - if final: - self._cleanup_sourcesterms(sources, solindices) - steps.append((sources, terms, solindices, scope, needsel, final) - ) - if not steps[-1][-1]: - # add a final step - terms = select.defined_vars.values() + select.aliases.values() - steps.append( ([self.system_source], terms, set(self._solindices), - select, set(), True) ) - return steps - - def _exists_relation(self, rel, terms, needsel, source): - rschema = self._schema.rschema(rel.r_type) - lhs, rhs = rel.get_variable_parts() - try: - lhsvar, rhsvar = lhs.variable, rhs.variable - except AttributeError: - pass - else: - # supported relation with at least one end supported, check the - # other end is in as well. If not this usually means the - # variable is refed by an outer scope and should be substituted - # using an 'identity' relation (else we'll get a conflict of - # temporary tables) - relscope = ms_scope(rel) - lhsscope = ms_scope(lhsvar) - rhsscope = ms_scope(rhsvar) - if rhsvar in terms and not lhsvar in terms and lhsscope is lhsvar.stmt: - self._identity_substitute(rel, lhsvar, terms, needsel, relscope) - elif lhsvar in terms and not rhsvar in terms and rhsscope is rhsvar.stmt: - self._identity_substitute(rel, rhsvar, terms, needsel, relscope) - elif self.crossed_relation(source, rel): - if lhsscope is not relscope: - self._identity_substitute(rel, lhsvar, terms, needsel, - relscope, lhsscope) - if rhsscope is not relscope: - self._identity_substitute(rel, rhsvar, terms, needsel, - relscope, rhsscope) - - def _identity_substitute(self, relation, var, terms, needsel, exist, - idrelscope=None): - newvar = self._insert_identity_variable(exist, var, idrelscope) - # ensure relation is using '=' operator, else we rely on a - # sqlgenerator side effect (it won't insert an inequality operator - # in this case) - relation.children[1].operator = '=' - terms.append(newvar) - needsel.add(newvar.name) - - def _choose_term(self, source, sourceterms): - """pick one term among terms supported by a source, which will be used - as a base to generate an execution step - """ - secondchoice = None - if len(self._sourcesterms) > 1: - # first, return non invariant variable of crossed relation, then the - # crossed relation itself - for term in sourceterms: - if (isinstance(term, Relation) - and self.crossed_relation(source, term) - and not ms_scope(term) is self.rqlst): - for vref in term.get_variable_parts(): - try: - var = vref.variable - except AttributeError: - # Constant - continue - if ((len(var.stinfo['relations']) > 1 or var.stinfo['selected']) - and var in sourceterms): - return var, sourceterms.pop(var) - return term, sourceterms.pop(term) - # priority to variable from subscopes - for term in sourceterms: - if not ms_scope(term) is self.rqlst: - if isinstance(term, Variable): - return term, sourceterms.pop(term) - secondchoice = term - else: - # priority to variable from outer scope - for term in sourceterms: - if ms_scope(term) is self.rqlst: - if isinstance(term, Variable): - return term, sourceterms.pop(term) - secondchoice = term - if secondchoice is not None: - return secondchoice, sourceterms.pop(secondchoice) - # priority to variable with the less solutions supported and with the - # most valuable refs. Add variable name for test predictability - variables = sorted([(var, sols) for (var, sols) in sourceterms.items() - if isinstance(var, Variable)], - key=lambda (v, s): (len(s), -v.valuable_references(), v.name)) - if variables: - var = variables[0][0] - return var, sourceterms.pop(var) - # priority to constant - for term in sourceterms: - if isinstance(term, Constant): - return term, sourceterms.pop(term) - # whatever (relation) - term = iter(sourceterms).next() - return term, sourceterms.pop(term) - - def _expand_sources(self, selected_source, term, solindices): - """return all sources supporting given term / solindices""" - sources = [selected_source] - sourcesterms = self._sourcesterms - for source in list(sourcesterms): - if source is selected_source: - continue - if not (term in sourcesterms[source] and - solindices.issubset(sourcesterms[source][term])): - continue - sources.append(source) - if source.uri != 'system' or not (isinstance(term, Variable) and not term in self._linkedterms): - termsolindices = sourcesterms[source][term] - termsolindices -= solindices - if not termsolindices: - del sourcesterms[source][term] - if not sourcesterms[source]: - del sourcesterms[source] - return sources - - def _expand_terms(self, term, sources, sourceterms, scope, solindices): - terms = [term] - sources = sorted(sources) - sourcesterms = self._sourcesterms - linkedterms = self._linkedterms - # term has to belong to the same scope if there is more - # than the system source remaining - if len(sourcesterms) > 1 and not scope is self.rqlst: - candidates = (t for t in sourceterms if scope is ms_scope(t)) - else: - candidates = sourceterms - # we only want one unlinked term in each generated query - candidates = [t for t in candidates - if isinstance(t, (Constant, Relation)) or - (solindices.issubset(sourceterms[t]) and t in linkedterms)] - cross_rels = {} - for source in sources: - cross_rels.update(self._crossrelations.get(source, {})) - exclude = {} - for crossvars in cross_rels.itervalues(): - vars = [t for t in crossvars if isinstance(t, Variable)] - try: - exclude[vars[0]] = vars[1] - exclude[vars[1]] = vars[0] - except IndexError: - pass - accept_term = lambda x: (not any(s for s in sources - if not x in sourcesterms.get(s, ())) - and x._ms_may_be_processed(terms, linkedterms) - and not exclude.get(x) in terms) - if isinstance(term, Relation) and term in cross_rels: - cross_terms = cross_rels.pop(term) - base_accept_term = accept_term - accept_term = lambda x: (base_accept_term(x) or x in cross_terms) - for refed in cross_terms: - if not refed in candidates: - terms.append(refed) - # repeat until no term can't be added, since addition of a new - # term may permit to another one to be added - modified = True - while modified and candidates: - modified = False - for term in candidates[:]: - if isinstance(term, Constant): - termsources = set(x[0] for x in self._term_sources(term)) - # ensure system source is there for constant - if self.system_source in sources: - termsources.add(self.system_source) - if sorted(termsources) != sources: - continue - terms.append(term) - candidates.remove(term) - modified = True - del sourceterms[term] - elif accept_term(term): - terms.append(term) - candidates.remove(term) - modified = True - self._cleanup_sourcesterms(sources, solindices, term) - return terms - - def _cleanup_sourcesterms(self, sources, solindices, term=None): - """remove solutions so we know they are already processed""" - for source in sources: - try: - sourceterms = self._sourcesterms[source] - except KeyError: - continue - if term is None: - for term, termsolindices in sourceterms.items(): - if isinstance(term, Relation) and self.crossed_relation(source, term): - continue - termsolindices -= solindices - if not termsolindices: - del sourceterms[term] - else: - try: - sourceterms[term] -= solindices - if not sourceterms[term]: - del sourceterms[term] - except KeyError: - pass - #assert term in cross_terms - if not sourceterms: - del self._sourcesterms[source] - - def merge_input_maps(self, allsolindices, complete=True): - """inputmaps is a dictionary with tuple of solution indices as key with - an associated input map as value. This function compute for each - solution its necessary input map and return them grouped - - ex: - inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, - (1,): {'X': 't2.C0', 'T': 't2.C1'}} - return : [([1], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1', - 'X': 't2.C0', 'T': 't2.C1'}), - ([0,2], {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'})] - """ - if not self._inputmaps: - return [(allsolindices, None)] - _allsolindices = allsolindices.copy() - mapbysol = {} - # compute a single map for each solution - for solindices, basemap in self._inputmaps.iteritems(): - for solindex in solindices: - if not (complete or solindex in allsolindices): - continue - solmap = mapbysol.setdefault(solindex, {}) - solmap.update(basemap) - try: - _allsolindices.remove(solindex) - except KeyError: - continue # already removed - # group results by identical input map - result = [] - for solindex, solmap in mapbysol.iteritems(): - for solindices, commonmap in result: - if commonmap == solmap: - solindices.append(solindex) - break - else: - result.append( ([solindex], solmap) ) - if _allsolindices: - result.append( (list(_allsolindices), None) ) - return result - - def build_final_part(self, select, solindices, inputmap, sources, - insertedvars): - solutions = [self._solutions[i] for i in solindices] - if self._conflicts and inputmap: - for varname, mappedto in self._conflicts: - var = select.defined_vars[varname] - newvar = select.make_variable() - # XXX should use var.scope but scope hasn't been computed yet - select.add_relation(var, 'identity', newvar) - for sol in solutions: - sol[newvar.name] = sol[varname] - inputmap[newvar.name] = mappedto - rqlst = self.plan.finalize(select, solutions, insertedvars) - if self.temptable is None and self.finaltable is None: - return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap) - table = self.temptable or self.finaltable - return FetchStep(self.plan, rqlst, sources, table, True, inputmap) - - def build_non_final_part(self, select, solindices, sources, insertedvars, - table): - """non final step, will have to store results in a temporary table""" - inputmapkey = tuple(sorted(solindices)) - solutions = [self._solutions[i] for i in solindices] - # XXX be smarter vs rql comparison - idx_key = (select.as_string(), inputmapkey, - tuple(sorted(sources)), tuple(sorted(insertedvars))) - try: - # if a similar step has already been process, simply backport its - # input map - step = self.plan.ms_steps_idx[idx_key] - except KeyError: - # processing needed - rqlst = self.plan.finalize(select, solutions, insertedvars) - step = FetchStep(self.plan, rqlst, sources, table, False) - self.plan.ms_steps_idx[idx_key] = step - self.plan.add_step(step) - # update input map for following steps, according to processed solutions - inputmap = self._inputmaps.setdefault(inputmapkey, {}) - for varname, mapping in step.outputmap.iteritems(): - if varname in inputmap and not '.' in varname and \ - not (mapping == inputmap[varname] or - self._schema.eschema(solutions[0][varname]).final): - self._conflicts.append((varname, inputmap[varname])) - inputmap.update(step.outputmap) - - -@deprecated('[3.18] old multi-source system will go away in the next version') -class MSPlanner(SSPlanner): - """MultiSourcesPlanner: build execution plan for rql queries - - decompose the RQL query according to sources'schema - """ - - def build_select_plan(self, plan, rqlst): - """build execution plan for a SELECT RQL query - - the rqlst should not be tagged at this point - """ - # preprocess deals with security insertion and returns a new syntax tree - # which have to be executed to fulfill the query: according - # to permissions for variable's type, different rql queries may have to - # be executed - plan.preprocess(rqlst) - if server.DEBUG & server.DBG_MS: - print '-'*80 - print 'PLANNING', rqlst - ppis = [PartPlanInformation(plan, select, self.rqlhelper) - for select in rqlst.children] - plan.ms_steps_idx = {} - steps = self._union_plan(plan, ppis) - if server.DEBUG & server.DBG_MS: - from pprint import pprint - for step in plan.steps: - pprint(step.test_repr()) - pprint(steps[0].test_repr()) - return steps - - def _ppi_subqueries(self, ppi): - # part plan info for subqueries - plan = ppi.plan - inputmap = {} - for subquery in ppi.rqlst.with_[:]: - sppis = [PartPlanInformation(plan, select) - for select in subquery.query.children] - for sppi in sppis: - if sppi.needsplit or sppi.part_sources != ppi.part_sources: - temptable = plan.make_temp_table_name('T%s' % make_uid(id(subquery))) - sstep = self._union_plan(plan, sppis, temptable)[0] - break - else: - sstep = None - if sstep is not None: - ppi.rqlst.with_.remove(subquery) - for i, colalias in enumerate(subquery.aliases): - inputmap[colalias.name] = '%s.C%s' % (temptable, i) - ppi.plan.add_step(sstep) - return inputmap - - def _union_plan(self, plan, ppis, temptable=None): - tosplit, cango, allsources = [], {}, set() - for planinfo in ppis: - if planinfo.needsplit: - tosplit.append(planinfo) - else: - cango.setdefault(planinfo.part_sources, []).append(planinfo) - for source in planinfo.part_sources: - allsources.add(source) - # first add steps for query parts which doesn't need to splitted - steps = [] - for sources, cppis in cango.iteritems(): - byinputmap = {} - for ppi in cppis: - select = ppi.rqlst - if sources != (ppi.system_source,): - add_types_restriction(self.schema, select) - # part plan info for subqueries - inputmap = self._ppi_subqueries(ppi) - aggrstep = need_aggr_step(select, sources) - if aggrstep: - atemptable = plan.make_temp_table_name('T%s' % make_uid(id(select))) - sunion = Union() - sunion.append(select) - selected = select.selection[:] - select_group_sort(select) - step = AggrStep(plan, selected, select, atemptable, temptable) - step.set_limit_offset(select.limit, select.offset) - select.limit = None - select.offset = 0 - fstep = FetchStep(plan, sunion, sources, atemptable, True, inputmap) - step.children.append(fstep) - steps.append(step) - else: - byinputmap.setdefault(tuple(inputmap.iteritems()), []).append( (select) ) - for inputmap, queries in byinputmap.iteritems(): - inputmap = dict(inputmap) - sunion = Union() - for select in queries: - sunion.append(select) - if temptable: - steps.append(FetchStep(plan, sunion, sources, temptable, True, inputmap)) - else: - steps.append(OneFetchStep(plan, sunion, sources, inputmap)) - # then add steps for splitted query parts - for planinfo in tosplit: - steps.append(self.split_part(planinfo, temptable)) - if len(steps) > 1: - if temptable: - step = UnionFetchStep(plan) - else: - step = UnionStep(plan) - step.children = steps - return (step,) - return steps - - # internal methods for multisources decomposition ######################### - - def split_part(self, ppi, temptable): - ppi.finaltable = temptable - plan = ppi.plan - select = ppi.rqlst - subinputmap = self._ppi_subqueries(ppi) - stepdefs = ppi.part_steps() - if need_aggr_step(select, ppi.part_sources, stepdefs): - atemptable = plan.make_temp_table_name('T%s' % make_uid(id(select))) - selection = select.selection[:] - select_group_sort(select) - else: - atemptable = None - selection = select.selection - ppi.temptable = atemptable - vfilter = TermsFiltererVisitor(self.schema, ppi) - steps = [] - multifinal = len([x for x in stepdefs if x[-1]]) >= 2 - for sources, terms, solindices, scope, needsel, final in stepdefs: - # extract an executable query using only the specified terms - if sources[0].uri == 'system': - # in this case we have to merge input maps before call to - # filter so already processed restriction are correctly - # removed - solsinputmaps = ppi.merge_input_maps( - solindices, complete=not (final and multifinal)) - for solindices, inputmap in solsinputmaps: - minrqlst, insertedvars = vfilter.filter( - sources, terms, scope, set(solindices), needsel, final) - if inputmap is None: - inputmap = subinputmap - else: - inputmap.update(subinputmap) - steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, - sources, insertedvars)) - else: - # this is a final part (i.e. retreiving results for the - # original query part) if all term / sources have been - # treated or if this is the last shot for used solutions - minrqlst, insertedvars = vfilter.filter( - sources, terms, scope, solindices, needsel, final) - if final: - solsinputmaps = ppi.merge_input_maps( - solindices, complete=not (final and multifinal)) - if len(solsinputmaps) > 1: - refrqlst = minrqlst - for solindices, inputmap in solsinputmaps: - if inputmap is None: - inputmap = subinputmap - else: - inputmap.update(subinputmap) - if len(solsinputmaps) > 1: - minrqlst = refrqlst.copy() - sources = sources[:] - if inputmap and len(sources) > 1: - sources.remove(ppi.system_source) - steps.append(ppi.build_final_part(minrqlst, solindices, None, - sources, insertedvars)) - steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, - [ppi.system_source], insertedvars)) - else: - steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, - sources, insertedvars)) - else: - table = plan.make_temp_table_name('T%s' % make_uid(id(select))) - ppi.build_non_final_part(minrqlst, solindices, sources, - insertedvars, table) - # finally: join parts, deal with aggregat/group/sorts if necessary - if atemptable is not None: - step = AggrStep(plan, selection, select, atemptable, temptable) - step.children = steps - elif len(steps) > 1: - getrschema = self.schema.rschema - if need_intersect(select, getrschema) or any(need_intersect(select, getrschema) - for step in steps - for select in step.union.children): - if temptable: - raise NotImplementedError('oops') # IntersectFetchStep(plan) - else: - step = IntersectStep(plan) - else: - if temptable: - step = UnionFetchStep(plan) - else: - step = UnionStep(plan) - step.children = steps - else: - step = steps[0] - if select.limit is not None or select.offset: - step.set_limit_offset(select.limit, select.offset) - return step - - -class UnsupportedBranch(Exception): - pass - - -class TermsFiltererVisitor(object): - def __init__(self, schema, ppi): - self.schema = schema - self.ppi = ppi - self.skip = {} - self.hasaggrstep = self.ppi.temptable - self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby - for vref in sortterm.iget_nodes(VariableRef)) - - def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None): - try: - newrestr, node_ = node.accept(self, newroot, terms[:]) - except UnsupportedBranch: - return rqlst - if setfunc is not None and newrestr is not None: - setfunc(newrestr) - if not node_ is node: - rqlst = node.parent - return rqlst - - def filter(self, sources, terms, rqlst, solindices, needsel, final): - if server.DEBUG & server.DBG_MS: - print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel - newroot = Select() - self.sources = sorted(sources) - self.terms = terms - self.solindices = solindices - self.final = final - self._pending_vrefs = [] - # terms which appear in unsupported branches - needsel |= self.extneedsel - self.needsel = needsel - # terms which appear in supported branches - self.mayneedsel = set() - # new inserted variables - self.insertedvars = [] - # other structures (XXX document) - self.mayneedvar, self.hasvar = {}, {} - self.use_only_defined = False - self.scopes = {rqlst: newroot} - self.current_scope = rqlst - if rqlst.where: - rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms, - newroot.set_where) - if isinstance(rqlst, Select): - self.use_only_defined = True - if rqlst.groupby: - groupby = [] - for node in rqlst.groupby: - rqlst = self._rqlst_accept(rqlst, node, newroot, terms, - groupby.append) - if groupby: - newroot.set_groupby(groupby) - if rqlst.having: - having = [] - for node in rqlst.having: - rqlst = self._rqlst_accept(rqlst, node, newroot, terms, - having.append) - if having: - newroot.set_having(having) - if final and rqlst.orderby and not self.hasaggrstep: - orderby = [] - for node in rqlst.orderby: - rqlst = self._rqlst_accept(rqlst, node, newroot, terms, - orderby.append) - if orderby: - newroot.set_orderby(orderby) - elif rqlst.orderby: - for sortterm in rqlst.orderby: - if any(f for f in sortterm.iget_nodes(Function) if f.name == 'FTIRANK'): - newnode, oldnode = sortterm.accept(self, newroot, terms) - if newnode is not None: - newroot.add_sort_term(newnode) - self.process_selection(newroot, terms, rqlst) - elif not newroot.where: - # no restrictions have been copied, just select terms and add - # type restriction (done later by add_types_restriction) - for v in terms: - if not isinstance(v, Variable): - continue - newroot.append_selected(VariableRef(newroot.get_variable(v.name))) - solutions = self.ppi.copy_solutions(solindices) - cleanup_solutions(newroot, solutions) - newroot.set_possible_types(solutions) - if final: - if self.hasaggrstep: - self.add_necessary_selection(newroot, self.mayneedsel & self.extneedsel) - newroot.distinct = rqlst.distinct - else: - self.add_necessary_selection(newroot, self.mayneedsel & self.needsel) - # insert vars to fetch constant values when needed - for (varname, rschema), reldefs in self.mayneedvar.iteritems(): - for rel, ored in reldefs: - if not (varname, rschema) in self.hasvar: - self.hasvar[(varname, rschema)] = None # just to avoid further insertion - cvar = newroot.make_variable() - for sol in newroot.solutions: - sol[cvar.name] = rschema.objects(sol[varname])[0] - # if the current restriction is not used in a OR branch, - # we can keep it, else we have to drop the constant - # restriction (or we may miss some results) - if not ored: - rel = rel.copy(newroot) - newroot.add_restriction(rel) - # add a relation to link the variable - newroot.remove_node(rel.children[1]) - cmp = Comparison('=') - rel.append(cmp) - cmp.append(VariableRef(cvar)) - self.insertedvars.append((varname, rschema, cvar.name)) - newroot.append_selected(VariableRef(newroot.get_variable(cvar.name))) - # NOTE: even if the restriction is done by this query, we have - # to let it in the original rqlst so that it appears anyway in - # the "final" query, else we may change the meaning of the query - # if there are NOT somewhere : - # 'NOT X relation Y, Y name "toto"' means X WHERE X isn't related - # to Y whose name is toto while - # 'NOT X relation Y' means X WHERE X has no 'relation' (whatever Y) - elif ored: - newroot.remove_node(rel) - add_types_restriction(self.schema, rqlst, newroot, solutions) - if server.DEBUG & server.DBG_MS: - print '--->', newroot - return newroot, self.insertedvars - - def visit_and(self, node, newroot, terms): - subparts = [] - for i in xrange(len(node.children)): - child = node.children[i] - try: - newchild, child_ = child.accept(self, newroot, terms) - if not child_ is child: - node = child_.parent - if newchild is None: - continue - subparts.append(newchild) - except UnsupportedBranch: - continue - if not subparts: - return None, node - if len(subparts) == 1: - return subparts[0], node - return copy_node(newroot, node, subparts), node - - visit_or = visit_and - - def _relation_supported(self, relation): - rtype = relation.r_type - for source in self.sources: - if not source.support_relation(rtype) or ( - rtype in source.cross_relations and not relation in self.terms): - return False - if not self.final and not relation in self.terms: - rschema = self.schema.rschema(relation.r_type) - if not rschema.final: - for term in relation.get_nodes((VariableRef, Constant)): - term = getattr(term, 'variable', term) - termsources = sorted(set(x[0] for x in self.ppi._term_sources(term))) - if termsources and termsources != self.sources: - return False - return True - - def visit_relation(self, node, newroot, terms): - if not node.is_types_restriction(): - if not node in terms and node in self.skip and self.solindices.issubset(self.skip[node]): - return None, node - if not self._relation_supported(node): - raise UnsupportedBranch() - # don't copy type restriction unless this is the only supported relation - # for the lhs variable, else they'll be reinserted later as needed (in - # other cases we may copy a type restriction while the variable is not - # actually used) - elif not (node.neged(strict=True) or - any(self._relation_supported(rel) - for rel in node.children[0].variable.stinfo['relations'])): - return self.visit_default(node, newroot, terms) - else: - raise UnsupportedBranch() - rschema = self.schema.rschema(node.r_type) - self._pending_vrefs = [] - try: - res = self.visit_default(node, newroot, terms)[0] - except Exception: - # when a relation isn't supported, we should dereference potentially - # introduced variable refs - for vref in self._pending_vrefs: - vref.unregister_reference() - raise - ored = node.ored() - if rschema.final or rschema.inlined: - vrefs = node.children[1].get_nodes(VariableRef) - if not vrefs: - if not ored: - self.skip.setdefault(node, set()).update(self.solindices) - else: - self.mayneedvar.setdefault((node.children[0].name, rschema), []).append( (res, ored) ) - else: - assert len(vrefs) == 1 - vref = vrefs[0] - # XXX check operator ? - self.hasvar[(node.children[0].name, rschema)] = vref - if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res): - self.skip.setdefault(node, set()).update(self.solindices) - elif not ored: - self.skip.setdefault(node, set()).update(self.solindices) - return res, node - - def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res): - var = vref.variable - if ored: - return False - if var.name in self.extneedsel or var.stinfo['selected']: - return False - if not var in terms or used_in_outer_scope(var, self.current_scope): - return False - if any(v for v, _ in var.stinfo.get('attrvars', ()) if not v in terms): - return False - return True - - def visit_exists(self, node, newroot, terms): - newexists = node.__class__() - self.scopes = {node: newexists} - subparts, node = self._visit_children(node, newroot, terms) - if not subparts: - return None, node - newexists.set_where(subparts[0]) - return newexists, node - - def visit_not(self, node, newroot, terms): - subparts, node = self._visit_children(node, newroot, terms) - if not subparts: - return None, node - return copy_node(newroot, node, subparts), node - - def visit_group(self, node, newroot, terms): - if not self.final: - return None, node - return self.visit_default(node, newroot, terms) - - def visit_variableref(self, node, newroot, terms): - if self.use_only_defined: - if not node.variable.name in newroot.defined_vars: - raise UnsupportedBranch(node.name) - elif not node.variable in terms: - raise UnsupportedBranch(node.name) - self.mayneedsel.add(node.name) - # set scope so we can insert types restriction properly - newvar = newroot.get_variable(node.name) - newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot) - vref = VariableRef(newvar) - self._pending_vrefs.append(vref) - return vref, node - - def visit_constant(self, node, newroot, terms): - return copy_node(newroot, node), node - - def visit_comparison(self, node, newroot, terms): - subparts, node = self._visit_children(node, newroot, terms) - copy = copy_node(newroot, node, subparts) - # ignore comparison operator when fetching non final query - if not self.final and isinstance(node.children[0], VariableRef): - copy.operator = '=' - return copy, node - - def visit_function(self, node, newroot, terms): - if node.name == 'FTIRANK': - # FTIRANK is somewhat special... Rank function should be included in - # the same query has the has_text relation, potentially added to - # selection for latter usage - if not self.hasaggrstep and self.final and node not in self.skip: - return self.visit_default(node, newroot, terms) - elif any(s for s in self.sources if s.uri != 'system'): - return None, node - # p = node.parent - # while p is not None and not isinstance(p, SortTerm): - # p = p.parent - # if isinstance(p, SortTerm): - if not self.hasaggrstep and self.final and node in self.skip: - return Constant(self.skip[node], 'Int'), node - # XXX only if not yet selected - newroot.append_selected(node.copy(newroot)) - self.skip[node] = len(newroot.selection) - return None, node - return self.visit_default(node, newroot, terms) - - def visit_default(self, node, newroot, terms): - subparts, node = self._visit_children(node, newroot, terms) - return copy_node(newroot, node, subparts), node - - visit_mathexpression = visit_constant = visit_default - - def visit_sortterm(self, node, newroot, terms): - subparts, node = self._visit_children(node, newroot, terms) - if not subparts: - return None, node - return copy_node(newroot, node, subparts), node - - def _visit_children(self, node, newroot, terms): - subparts = [] - for i in xrange(len(node.children)): - child = node.children[i] - newchild, child_ = child.accept(self, newroot, terms) - if not child is child_: - node = child_.parent - if newchild is not None: - subparts.append(newchild) - return subparts, node - - def process_selection(self, newroot, terms, rqlst): - if self.final: - for term in rqlst.selection: - newroot.append_selected(term.copy(newroot)) - for vref in term.get_nodes(VariableRef): - self.needsel.add(vref.name) - return - for term in rqlst.selection: - vrefs = term.get_nodes(VariableRef) - if vrefs: - supportedvars = [] - for vref in vrefs: - var = vref.variable - if var in terms: - supportedvars.append(vref) - continue - else: - self.needsel.add(vref.name) - break - else: - for vref in vrefs: - newroot.append_selected(vref.copy(newroot)) - supportedvars = [] - for vref in supportedvars: - if not vref in newroot.get_selected_variables(): - newroot.append_selected(VariableRef(newroot.get_variable(vref.name))) - elif term in self.terms: - newroot.append_selected(term.copy(newroot)) - - def add_necessary_selection(self, newroot, terms): - selected = tuple(newroot.get_selected_variables()) - for varname in terms: - var = newroot.defined_vars[varname] - for vref in var.references(): - rel = vref.relation() - if rel is None and vref in selected: - # already selected - break - else: - selvref = VariableRef(var) - newroot.append_selected(selvref) - if newroot.groupby: - newroot.add_group_var(VariableRef(selvref.variable, noautoref=1)) diff -r 0636c4960259 -r 3e7cad3967c5 server/mssteps.py --- a/server/mssteps.py Thu Jun 27 08:30:46 2013 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,309 +0,0 @@ -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""Defines the diferent querier steps usable in plans. - -FIXME : this code needs refactoring. Some problems : -* get data from the parent plan, the latest step, temporary table... -* each step has is own members (this is not necessarily bad, but a bit messy - for now) -""" -__docformat__ = "restructuredtext en" - -from rql.nodes import VariableRef, Variable, Function - -from cubicweb.server.ssplanner import (LimitOffsetMixIn, Step, OneFetchStep, - varmap_test_repr, offset_result) - -AGGR_TRANSFORMS = {'COUNT':'SUM', 'MIN':'MIN', 'MAX':'MAX', 'SUM': 'SUM'} - -class remove_and_restore_clauses(object): - def __init__(self, union, keepgroup): - self.union = union - self.keepgroup = keepgroup - self.clauses = None - - def __enter__(self): - self.clauses = clauses = [] - for select in self.union.children: - if self.keepgroup: - having, orderby = select.having, select.orderby - select.having, select.orderby = (), () - clauses.append( (having, orderby) ) - else: - groupby, having, orderby = select.groupby, select.having, select.orderby - select.groupby, select.having, select.orderby = (), (), () - clauses.append( (groupby, having, orderby) ) - - def __exit__(self, exctype, exc, traceback): - for i, select in enumerate(self.union.children): - if self.keepgroup: - select.having, select.orderby = self.clauses[i] - else: - select.groupby, select.having, select.orderby = self.clauses[i] - - -class FetchStep(OneFetchStep): - """step consisting in fetching data from sources, and storing result in - a temporary table - """ - def __init__(self, plan, union, sources, table, keepgroup, inputmap=None): - OneFetchStep.__init__(self, plan, union, sources) - # temporary table to store step result - self.table = table - # should groupby clause be kept or not - self.keepgroup = keepgroup - # variables mapping to use as input - self.inputmap = inputmap - # output variable mapping - srqlst = union.children[0] # sample select node - # add additional information to the output mapping - self.outputmap = plan.init_temp_table(table, srqlst.selection, - srqlst.solutions[0]) - for vref in srqlst.selection: - if not isinstance(vref, VariableRef): - continue - var = vref.variable - if var.stinfo.get('attrvars'): - for lhsvar, rtype in var.stinfo['attrvars']: - if lhsvar.name in srqlst.defined_vars: - key = '%s.%s' % (lhsvar.name, rtype) - self.outputmap[key] = self.outputmap[var.name] - else: - rschema = self.plan.schema.rschema - for rel in var.stinfo['rhsrelations']: - if rschema(rel.r_type).inlined: - lhsvar = rel.children[0] - if lhsvar.name in srqlst.defined_vars: - key = '%s.%s' % (lhsvar.name, rel.r_type) - self.outputmap[key] = self.outputmap[var.name] - - def execute(self): - """execute this step""" - self.execute_children() - plan = self.plan - plan.create_temp_table(self.table) - union = self.union - with remove_and_restore_clauses(union, self.keepgroup): - for source in self.sources: - source.flying_insert(self.table, plan.session, union, plan.args, - self.inputmap) - - def mytest_repr(self): - """return a representation of this step suitable for test""" - with remove_and_restore_clauses(self.union, self.keepgroup): - try: - inputmap = varmap_test_repr(self.inputmap, self.plan.tablesinorder) - outputmap = varmap_test_repr(self.outputmap, self.plan.tablesinorder) - except AttributeError: - inputmap = self.inputmap - outputmap = self.outputmap - return (self.__class__.__name__, - sorted((r.as_string(kwargs=self.plan.args), r.solutions) - for r in self.union.children), - sorted(self.sources), inputmap, outputmap) - - -class AggrStep(LimitOffsetMixIn, Step): - """step consisting in making aggregat from temporary data in the system - source - """ - def __init__(self, plan, selection, select, table, outputtable=None): - Step.__init__(self, plan) - # original selection - self.selection = selection - # original Select RQL tree - self.select = select - # table where are located temporary results - self.table = table - # optional table where to write results - self.outputtable = outputtable - if outputtable is not None: - plan.init_temp_table(outputtable, selection, select.solutions[0]) - - #self.inputmap = inputmap - - def mytest_repr(self): - """return a representation of this step suitable for test""" - try: - # rely on a monkey patch (cf unittest_querier) - table = self.plan.tablesinorder[self.table] - outputtable = self.outputtable and self.plan.tablesinorder[self.outputtable] - except AttributeError: - # not monkey patched - table = self.table - outputtable = self.outputtable - sql = self.get_sql().replace(self.table, table) - return (self.__class__.__name__, sql, outputtable) - - def execute(self): - """execute this step""" - self.execute_children() - sql = self.get_sql() - if self.outputtable: - self.plan.create_temp_table(self.outputtable) - sql = 'INSERT INTO %s %s' % (self.outputtable, sql) - self.plan.syssource.doexec(self.plan.session, sql, self.plan.args) - else: - return self.plan.sqlexec(sql, self.plan.args) - - def get_sql(self): - self.inputmap = inputmap = self.children[-1].outputmap - dbhelper=self.plan.syssource.dbhelper - # get the select clause - clause = [] - for i, term in enumerate(self.selection): - try: - var_name = inputmap[term.as_string()] - except KeyError: - var_name = 'C%s' % i - if isinstance(term, Function): - # we have to translate some aggregat function - # (for instance COUNT -> SUM) - orig_name = term.name - try: - term.name = AGGR_TRANSFORMS[term.name] - # backup and reduce children - orig_children = term.children - term.children = [VariableRef(Variable(var_name))] - clause.append(term.accept(self)) - # restaure the tree XXX necessary? - term.name = orig_name - term.children = orig_children - except KeyError: - clause.append(var_name) - else: - clause.append(var_name) - for vref in term.iget_nodes(VariableRef): - inputmap[vref.name] = var_name - # XXX handle distinct with non selected sort term - if self.select.distinct: - sql = ['SELECT DISTINCT %s' % ', '.join(clause)] - else: - sql = ['SELECT %s' % ', '.join(clause)] - sql.append("FROM %s" % self.table) - # get the group/having clauses - if self.select.groupby: - clause = [inputmap[var.name] for var in self.select.groupby] - grouped = set(var.name for var in self.select.groupby) - sql.append('GROUP BY %s' % ', '.join(clause)) - else: - grouped = None - if self.select.having: - clause = [term.accept(self) for term in self.select.having] - sql.append('HAVING %s' % ', '.join(clause)) - # get the orderby clause - if self.select.orderby: - clause = [] - for sortterm in self.select.orderby: - sqlterm = sortterm.term.accept(self) - if sortterm.asc: - clause.append(sqlterm) - else: - clause.append('%s DESC' % sqlterm) - if grouped is not None: - for vref in sortterm.iget_nodes(VariableRef): - if not vref.name in grouped: - sql[-1] += ', ' + self.inputmap[vref.name] - grouped.add(vref.name) - sql = dbhelper.sql_add_order_by(' '.join(sql), - clause, - None, False, - self.limit or self.offset) - else: - sql = ' '.join(sql) - clause = None - - sql = dbhelper.sql_add_limit_offset(sql, self.limit, self.offset, clause) - return sql - - def visit_function(self, function): - """generate SQL name for a function""" - try: - return self.children[0].outputmap[str(function)] - except KeyError: - return '%s(%s)' % (function.name, - ','.join(c.accept(self) for c in function.children)) - - def visit_variableref(self, variableref): - """get the sql name for a variable reference""" - try: - return self.inputmap[variableref.name] - except KeyError: # XXX duh? explain - return variableref.variable.name - - def visit_constant(self, constant): - """generate SQL name for a constant""" - assert constant.type == 'Int' - return str(constant.value) - - -class UnionStep(LimitOffsetMixIn, Step): - """union results of child in-memory steps (e.g. OneFetchStep / AggrStep)""" - - def execute(self): - """execute this step""" - result = [] - limit = olimit = self.limit - offset = self.offset - assert offset != 0 - if offset is not None: - limit = limit + offset - for step in self.children: - if limit is not None: - if offset is None: - limit = olimit - len(result) - step.set_limit_offset(limit, None) - result_ = step.execute() - if offset is not None: - offset, result_ = offset_result(offset, result_) - result += result_ - if limit is not None: - if len(result) >= olimit: - return result[:olimit] - return result - - def mytest_repr(self): - """return a representation of this step suitable for test""" - return (self.__class__.__name__, self.limit, self.offset) - - -class IntersectStep(UnionStep): - """return intersection of results of child in-memory steps (e.g. OneFetchStep / AggrStep)""" - - def execute(self): - """execute this step""" - result = set() - for step in self.children: - result &= frozenset(step.execute()) - result = list(result) - if self.offset: - result = result[self.offset:] - if self.limit: - result = result[:self.limit] - return result - - -class UnionFetchStep(Step): - """union results of child steps using temporary tables (e.g. FetchStep)""" - - def execute(self): - """execute this step""" - self.execute_children() - - -__all__ = ('FetchStep', 'AggrStep', 'UnionStep', 'UnionFetchStep', 'IntersectStep') diff -r 0636c4960259 -r 3e7cad3967c5 server/querier.py --- a/server/querier.py Thu Jun 27 08:30:46 2013 +0200 +++ b/server/querier.py Tue Jan 21 18:11:30 2014 +0100 @@ -36,7 +36,7 @@ from cubicweb.server.rqlannotation import SQLGenAnnotator, set_qdata from cubicweb.server.ssplanner import READ_ONLY_RTYPES, add_types_restriction from cubicweb.server.edition import EditedEntity - +from cubicweb.server.ssplanner import SSPlanner ETYPE_PYOBJ_MAP[Binary] = 'Bytes' @@ -520,23 +520,10 @@ self._parse = rqlhelper.parse self._annotate = rqlhelper.annotate # rql planner - if len(repo.sources) < 2: - from cubicweb.server.ssplanner import SSPlanner - self._planner = SSPlanner(schema, rqlhelper) - else: - from cubicweb.server.msplanner import MSPlanner - self._planner = MSPlanner(schema, rqlhelper) + self._planner = SSPlanner(schema, rqlhelper) # sql generation annotator self.sqlgen_annotate = SQLGenAnnotator(schema).annotate - def set_planner(self): - if len(self._repo.sources) < 2: - from cubicweb.server.ssplanner import SSPlanner - self._planner = SSPlanner(self.schema, self._repo.vreg.rqlhelper) - else: - from cubicweb.server.msplanner import MSPlanner - self._planner = MSPlanner(self.schema, self._repo.vreg.rqlhelper) - def parse(self, rql, annotate=False): """return a rql syntax tree for the given rql""" try: diff -r 0636c4960259 -r 3e7cad3967c5 server/repository.py --- a/server/repository.py Thu Jun 27 08:30:46 2013 +0200 +++ b/server/repository.py Tue Jan 21 18:11:30 2014 +0100 @@ -283,14 +283,14 @@ self.sources_by_eid[sourceent.eid] = self.system_source self.system_source.init(True, sourceent) continue - self.add_source(sourceent, add_to_cnxsets=False) + self.add_source(sourceent) def _clear_planning_caches(self): for cache in ('source_defs', 'is_multi_sources_relation', 'can_cross_relation', 'rel_type_sources'): clear_cache(self, cache) - def add_source(self, sourceent, add_to_cnxsets=True): + def add_source(self, sourceent): source = self.get_source(sourceent.type, sourceent.name, sourceent.host_config, sourceent.eid) self.sources_by_eid[sourceent.eid] = source @@ -301,14 +301,6 @@ # internal session, which is not possible until connections sets have been # initialized) source.init(True, sourceent) - if not source.copy_based_source: - warn('[3.18] old multi-source system will go away in the next version', - DeprecationWarning) - self.sources.append(source) - self.querier.set_planner() - if add_to_cnxsets: - for cnxset in self.cnxsets: - cnxset.add_source(source) else: source.init(False, sourceent) self._clear_planning_caches() @@ -316,11 +308,6 @@ def remove_source(self, uri): source = self.sources_by_uri.pop(uri) del self.sources_by_eid[source.eid] - if self.config.source_enabled(source) and not source.copy_based_source: - self.sources.remove(source) - self.querier.set_planner() - for cnxset in self.cnxsets: - cnxset.remove_source(source) self._clear_planning_caches() def get_source(self, type, uri, source_config, eid=None): @@ -1087,7 +1074,7 @@ 6. unless source's :attr:`should_call_hooks` tell otherwise, 'before_add_entity' hooks are called """ - uri = 'system' if source.copy_based_source else source.uri + uri = 'system' cachekey = (extid, uri) try: return self._extid_cache[cachekey] @@ -1300,8 +1287,7 @@ if suri == 'system': extid = None else: - if source.copy_based_source: - suri = 'system' + suri = 'system' extid = source.get_extid(entity) self._extid_cache[(str(extid), suri)] = entity.eid self._type_source_cache[entity.eid] = (entity.cw_etype, suri, extid, diff -r 0636c4960259 -r 3e7cad3967c5 server/sources/__init__.py --- a/server/sources/__init__.py Thu Jun 27 08:30:46 2013 +0200 +++ b/server/sources/__init__.py Tue Jan 21 18:11:30 2014 +0100 @@ -64,9 +64,6 @@ class AbstractSource(object): """an abstract class for sources""" - # does the source copy data into the system source, or is it a *true* source - # (i.e. entities are not stored physically here) - copy_based_source = False # boolean telling if modification hooks should be called when something is # modified in this source diff -r 0636c4960259 -r 3e7cad3967c5 server/sources/datafeed.py --- a/server/sources/datafeed.py Thu Jun 27 08:30:46 2013 +0200 +++ b/server/sources/datafeed.py Tue Jan 21 18:11:30 2014 +0100 @@ -35,7 +35,6 @@ class DataFeedSource(AbstractSource): - copy_based_source = True use_cwuri_as_url = True options = ( diff -r 0636c4960259 -r 3e7cad3967c5 server/sources/native.py --- a/server/sources/native.py Thu Jun 27 08:30:46 2013 +0200 +++ b/server/sources/native.py Tue Jan 21 18:11:30 2014 +0100 @@ -968,7 +968,7 @@ if extid is not None: assert isinstance(extid, str) extid = b64encode(extid) - uri = 'system' if source.copy_based_source else source.uri + uri = 'system' attrs = {'type': entity.cw_etype, 'eid': entity.eid, 'extid': extid, 'source': uri, 'asource': source.uri, 'mtime': datetime.utcnow()} self._handle_insert_entity_sql(session, self.sqlgen.insert('entities', attrs), attrs)