# HG changeset patch # User sylvain.thenault@logilab.fr # Date 1238849163 -7200 # Node ID 9f2760f15000e4dbf178bb2f93724f90e03dea6a # Parent 25cd18899830b64c1e3f4b0627a209580ee1a015# Parent e8b7c7407edfc0e16f6056879d8753f71cf2416d merge diff -r e8b7c7407edf -r 9f2760f15000 cwctl.py --- a/cwctl.py Fri Apr 03 11:04:27 2009 +0200 +++ b/cwctl.py Sat Apr 04 14:46:03 2009 +0200 @@ -619,7 +619,11 @@ config = CubicWebConfiguration.config_for(appid) config.creating = True # notice we're not starting the server config.verbosity = self.config.verbosity - config.set_sources_mode(self.config.ext_sources or ('migration',)) + try: + config.set_sources_mode(self.config.ext_sources or ('migration',)) + except AttributeError: + # not a server config + pass # get application and installed versions for the server and the componants print 'getting versions configuration from the repository...' mih = config.migration_handler() diff -r e8b7c7407edf -r 9f2760f15000 devtools/repotest.py --- a/devtools/repotest.py Fri Apr 03 11:04:27 2009 +0200 +++ b/devtools/repotest.py Sat Apr 04 14:46:03 2009 +0200 @@ -258,38 +258,38 @@ class PartPlanInformation(object): def merge_input_maps(*args): pass - def _choose_var(self, sourcevars): - pass +# def _choose_term(self, sourceterms): +# pass _orig_merge_input_maps = PartPlanInformation.merge_input_maps -_orig_choose_var = PartPlanInformation._choose_var +# _orig_choose_term = PartPlanInformation._choose_term def _merge_input_maps(*args): return sorted(_orig_merge_input_maps(*args)) -def _choose_var(self, sourcevars): - # predictable order for test purpose - def get_key(x): - try: - # variable - return x.name - except AttributeError: - try: - # relation - return x.r_type - except AttributeError: - # const - return x.value - varsinorder = sorted(sourcevars, key=get_key) - if len(self._sourcesvars) > 1: - for var in varsinorder: - if not var.scope is self.rqlst: - return var, sourcevars.pop(var) - else: - for var in varsinorder: - if var.scope is self.rqlst: - return var, sourcevars.pop(var) - var = varsinorder[0] - return var, sourcevars.pop(var) +# def _choose_term(self, sourceterms): +# # predictable order for test purpose +# def get_key(x): +# try: +# # variable +# return x.name +# except AttributeError: +# try: +# # relation +# return x.r_type +# except AttributeError: +# # const +# return x.value +# varsinorder = sorted(sourceterms, key=get_key) +# if len(self._sourcesterms) > 1: +# for var in varsinorder: +# if not var.scope is self.rqlst: +# return var, sourceterms.pop(var) +# else: +# for var in varsinorder: +# if var.scope is self.rqlst: +# return var, sourceterms.pop(var) +# var = varsinorder[0] +# return var, sourceterms.pop(var) def do_monkey_patch(): @@ -299,7 +299,7 @@ ExecutionPlan.tablesinorder = None ExecutionPlan.init_temp_table = _init_temp_table PartPlanInformation.merge_input_maps = _merge_input_maps - PartPlanInformation._choose_var = _choose_var + #PartPlanInformation._choose_term = _choose_term def undo_monkey_patch(): RQLRewriter.insert_snippets = _orig_insert_snippets @@ -307,5 +307,5 @@ ExecutionPlan._check_permissions = _orig_check_permissions ExecutionPlan.init_temp_table = _orig_init_temp_table PartPlanInformation.merge_input_maps = _orig_merge_input_maps - PartPlanInformation._choose_var = _orig_choose_var + #PartPlanInformation._choose_term = _orig_choose_term diff -r e8b7c7407edf -r 9f2760f15000 server/msplanner.py --- a/server/msplanner.py Fri Apr 03 11:04:27 2009 +0200 +++ b/server/msplanner.py Sat Apr 04 14:46:03 2009 +0200 @@ -1,32 +1,54 @@ """plan execution of rql queries on multiple sources -the best way to understand what are we trying to acheive here is to read -the unit-tests in unittest_querier_planner.py +the best way to understand what are we trying to acheive here is to read the +unit-tests in unittest_msplanner.py +What you need to know +~~~~~~~~~~~~~~~~~~~~~ +1. The system source is expected to support every entity and relation types -Split and execution specifications -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +2. Given "X relation Y": + + * if relation, X and Y types are supported by the external source, we suppose + by default that X and Y should both come from the same source as the + relation. You can specify otherwise by adding relation into the + "cross_relations" set in the source's mapping file and it that case, we'll + consider that we can also find in the system source some relation between + X and Y coming from different sources. + + * if "relation" isn't supported by the external source but X or Y + types (or both) are, we suppose by default that can find in the system + source some relation where X and/or Y come from the external source. You + can specify otherwise by adding relation into the "dont_cross_relations" + set in the source's mapping file and it that case, we'll consider that we + can only find in the system source some relation between X and Y coming + the system source. + + +Implementation +~~~~~~~~~~~~~~ +XXX explain algorithm + + +Exemples of multi-sources query execution +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ For a system source and a ldap user source (only EUser and its attributes is supported, no group or such): - :EUser X: 1. fetch EUser X from both sources and return concatenation of results - :EUser X WHERE X in_group G, G name 'users': * catch 1 - 1. fetch EUser X from both sources, store concatenation of results - into a temporary table - 2. return the result of TMP X WHERE X in_group G, G name 'users' from - the system source - + 1. fetch EUser X from both sources, store concatenation of results into a + temporary table + 2. return the result of TMP X WHERE X in_group G, G name 'users' from the + system source * catch 2 - 1. return the result of EUser X WHERE X in_group G, G name 'users' - from system source, that's enough (optimization of the sql querier - will avoid join on EUser, so we will directly get local eids) - + 1. return the result of EUser X WHERE X in_group G, G name 'users' from system + source, that's enough (optimization of the sql querier will avoid join on + EUser, so we will directly get local eids) :EUser X,L WHERE X in_group G, X login L, G name 'users': 1. fetch Any X,L WHERE X is EUser, X login L from both sources, store @@ -37,15 +59,14 @@ :Any X WHERE X owned_by Y: * catch 1 - 1. fetch EUser X from both sources, store concatenation of results - into a temporary table - 2. return the result of Any X WHERE X owned_by Y, Y is TMP from - the system source - + 1. fetch EUser X from both sources, store concatenation of results into a + temporary table + 2. return the result of Any X WHERE X owned_by Y, Y is TMP from the system + source * catch 2 - 1. return the result of Any X WHERE X owned_by Y - from system source, that's enough (optimization of the sql querier - will avoid join on EUser, so we will directly get local eids) + 1. return the result of Any X WHERE X owned_by Y from system source, that's + enough (optimization of the sql querier will avoid join on EUser, so we + will directly get local eids) :organization: Logilab @@ -66,7 +87,8 @@ from cubicweb import server from cubicweb.common.utils import make_uid from cubicweb.server.utils import cleanup_solutions -from cubicweb.server.ssplanner import SSPlanner, OneFetchStep, add_types_restriction +from cubicweb.server.ssplanner import (SSPlanner, OneFetchStep, + add_types_restriction) from cubicweb.server.mssteps import * from cubicweb.server.sources import AbstractSource @@ -77,14 +99,6 @@ AbstractSource.dont_cross_relations = () AbstractSource.cross_relations = () - -def allequals(solutions): - """return true if all solutions are identical""" - sol = solutions.next() - for sol_ in solutions: - if sol_ != sol: - return False - return True def need_aggr_step(select, sources, stepdefs=None): """return True if a temporary table is necessary to store some partial @@ -114,24 +128,6 @@ fstepsolindices.update(stepdef[2]) return False -def copy_node(newroot, node, subparts=()): - newnode = node.__class__(*node.initargs(newroot)) - for part in subparts: - newnode.append(part) - return newnode - -def same_scope(var): - """return true if the variable is always used in the same scope""" - try: - return var.stinfo['samescope'] - except KeyError: - for rel in var.stinfo['relations']: - if not rel.scope is var.scope: - var.stinfo['samescope'] = False - return False - var.stinfo['samescope'] = True - return True - def select_group_sort(select): # XXX something similar done in rql2sql # add variables used in groups and sort terms to the selection # if necessary @@ -150,7 +146,17 @@ if select.groupby and not vref in select.groupby: select.add_group_var(vref.copy(select)) -# XXX move to rql +def allequals(solutions): + """return true if all solutions are identical""" + sol = solutions.next() + noconstsol = None + for sol_ in solutions: + if sol_ != sol: + return False + return True + +# XXX move functions below to rql ############################################## + def is_ancestor(n1, n2): p = n1.parent while p is not None: @@ -159,49 +165,104 @@ p = p.parent return False +def copy_node(newroot, node, subparts=()): + newnode = node.__class__(*node.initargs(newroot)) + for part in subparts: + newnode.append(part) + return newnode + +def same_scope(var): + """return true if the variable is always used in the same scope""" + try: + return var.stinfo['samescope'] + except KeyError: + for rel in var.stinfo['relations']: + if not rel.scope is var.scope: + var.stinfo['samescope'] = False + return False + var.stinfo['samescope'] = True + return True + +################################################################################ + class PartPlanInformation(object): """regroups necessary information to execute some part of a "global" rql query ("global" means as received by the querier, which may result in - several internal queries, e.g. parts, due to security insertions) + several internal queries, e.g. parts, due to security insertions). Actually + a PPI is created for each subquery and for each query in a union. - it exposes as well some methods helping in executing this part on a + It exposes as well some methods helping in executing this part on a multi-sources repository, modifying its internal structure during the - process + process. - :attr solutions: a list of mappings (varname -> vartype) - :attr sourcesvars: - a dictionnary telling for each source which variable/solution are - supported, of the form {source : {varname: [solution index, ]}} + :attr plan: + the execution plan + :attr rqlst: + the original rql syntax tree handled by this part + + :attr needsplit: + bool telling if the query has to be split into multiple steps for + execution or if it can be executed at once + + :attr temptable: + a SQL temporary table name or None, if necessary to handle aggregate / + sorting for this part of the query + + :attr finaltable: + a SQL table name or None, if results for this part of the query should be + written into a temporary table (usually shared by multiple PPI) + + :attr sourcesterms: + a dictionary {source : {term: set([solution index, ])}} telling for each + source which terms are supported for which solutions. A "term" may be + either a rql Variable, Constant or Relation node. """ def __init__(self, plan, rqlst, rqlhelper=None): + self.plan = plan + self.rqlst = rqlst self.needsplit = False self.temptable = None self.finaltable = None - self.plan = plan - self.rqlst = rqlst + # shortcuts + self._schema = plan.schema self._session = plan.session + self._repo = self._session.repo self._solutions = rqlst.solutions self._solindices = range(len(self._solutions)) - # source : {var: [solution index, ]} - self.sourcesvars = self._sourcesvars = {} + self.system_source = self._repo.system_source + # source : {term: [solution index, ]} + self.sourcesterms = self._sourcesterms = {} # source : {relation: set(child variable and constant)} self._crossrelations = {} - # dictionnary of variables which are linked to each other using a non - # final relation which is supported by multiple sources - self._linkedvars = {} - self._crosslinkedvars = {} + # dictionary of variables and constants which are linked to each other + # using a non final relation supported by multiple sources (crossed or + # not). + self._linkedterms = {} # processing - self._compute_sourcesvars() - self._remove_invalid_sources() + termssources = self._compute_sourcesterms() + self._remove_invalid_sources(termssources) self._compute_needsplit() - self.sourcesvars = {} - for k, v in self._sourcesvars.iteritems(): - self.sourcesvars[k] = {} + # after initialisation, .sourcesterms contains the same thing as + # ._sourcesterms though during plan construction, ._sourcesterms will + # be modified while .sourcesterms will be kept unmodified + self.sourcesterms = {} + for k, v in self._sourcesterms.iteritems(): + self.sourcesterms[k] = {} for k2, v2 in v.iteritems(): - self.sourcesvars[k][k2] = v2.copy() + self.sourcesterms[k][k2] = v2.copy() + # cleanup linked var + for var, linkedrelsinfo in self._linkedterms.iteritems(): + self._linkedterms[var] = frozenset(x[0] for x in linkedrelsinfo) + # map output of a step to input of a following step self._inputmaps = {} + # record input map conflicts to resolve them on final step generation + self._conflicts = [] if rqlhelper is not None: # else test self._insert_identity_variable = rqlhelper._annotator.rewrite_shared_optional + if server.DEBUG: + print 'sourcesterms:' + for source, terms in self.sourcesterms.items(): + print source, terms def copy_solutions(self, solindices): return [self._solutions[solidx].copy() for solidx in solindices] @@ -209,14 +270,14 @@ @property @cached def part_sources(self): - if self._sourcesvars: - return tuple(sorted(self._sourcesvars)) - return (self._session.repo.system_source,) + if self._sourcesterms: + return tuple(sorted(self._sourcesterms)) + return (self.system_source,) @property @cached def _sys_source_set(self): - return frozenset((self._session.repo.system_source, solindex) + return frozenset((self.system_source, solindex) for solindex in self._solindices) @cached @@ -224,20 +285,18 @@ """return a set of (source, solindex) where source doesn't support the relation """ - return frozenset((source, solidx) for source in self._session.repo.sources + return frozenset((source, solidx) for source in self._repo.sources for solidx in self._solindices - if not ((source.support_relation(relation.r_type) and - not self.crossed_relation(source, relation)) + if not ((source.support_relation(relation.r_type)) or relation.r_type in source.dont_cross_relations)) - - def _compute_sourcesvars(self): - """compute for each variable/solution in the rqlst which sources support - them + def _compute_sourcesterms(self): + """compute for each term (variable, rewritten constant, relation) and + for each solution in the rqlst which sources support them """ - repo = self._session.repo - eschema = repo.schema.eschema - sourcesvars = self._sourcesvars + repo = self._repo + eschema = self._schema.eschema + sourcesterms = self._sourcesterms # find for each source which variable/solution are supported for varname, varobj in self.rqlst.defined_vars.items(): # if variable has an eid specified, we can get its source directly @@ -245,22 +304,22 @@ if varobj.stinfo['uidrels']: vrels = varobj.stinfo['relations'] - varobj.stinfo['uidrels'] for rel in varobj.stinfo['uidrels']: - if rel.neged(strict=True) or rel.operator() != '=': + if rel.neged(strict=True) or rel.operator() != '=': continue for const in rel.children[1].get_nodes(Constant): eid = const.eval(self.plan.args) source = self._session.source_from_eid(eid) if vrels and not any(source.support_relation(r.r_type) for r in vrels): - self._set_source_for_var(repo.system_source, varobj) + self._set_source_for_term(self.system_source, varobj) else: - self._set_source_for_var(source, varobj) + self._set_source_for_term(source, varobj) continue rels = varobj.stinfo['relations'] if not rels and not varobj.stinfo['typerels']: # (rare) case where the variable has no type specified nor # relation accessed ex. "Any MAX(X)" - self._set_source_for_var(repo.system_source, varobj) + self._set_source_for_term(self.system_source, varobj) continue for i, sol in enumerate(self._solutions): vartype = sol[varname] @@ -276,64 +335,35 @@ if not varobj._q_invariant or \ any(imap(source.support_relation, (r.r_type for r in rels if r.r_type != 'eid'))): - sourcesvars.setdefault(source, {}).setdefault(varobj, set()).add(i) + sourcesterms.setdefault(source, {}).setdefault(varobj, set()).add(i) # if variable is not invariant and is used by a relation # not supported by this source, we'll have to split the # query if not varobj._q_invariant and any(ifilterfalse( source.support_relation, (r.r_type for r in rels))): self.needsplit = True - - def _handle_cross_relation(self, rel, relsources, vsources): - crossvars = None - for source in relsources: - if rel.r_type in source.cross_relations: - crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) - crossvars.update(frozenset(x for x in rel.get_nodes(Constant))) - assert len(crossvars) == 2 - ssource = self._session.repo.system_source - needsplit = True - flag = 0 - for v in crossvars: - if isinstance(v, Constant): - allsols = set(self._solindices) - try: - self._sourcesvars[ssource][v] = allsols - except KeyError: - self._sourcesvars[ssource] = {v: allsols} - if len(vsources[v]) == 1: - if iter(vsources[v]).next()[0].uri == 'system': - flag = 1 - for ov in crossvars: - if ov is not v and (isinstance(ov, Constant) or ov._q_invariant): - ssset = frozenset((ssource,)) - self._remove_sources(ov, vsources[ov] - ssset) - else: - for ov in crossvars: - if ov is not v and (isinstance(ov, Constant) or ov._q_invariant): - needsplit = False - break - else: - continue - if not rel.neged(strict=True): - break - else: - self._crossrelations.setdefault(source, {})[rel] = crossvars - if not flag: - self._sourcesvars.setdefault(source, {})[rel] = set(self._solindices) - self._sourcesvars.setdefault(ssource, {})[rel] = set(self._solindices) - if needsplit: - self.needsplit = True - return crossvars is None - - def _remove_invalid_sources(self): - """removes invalid sources from `sourcesvars` member according to - traversed relations and their properties (which sources support them, - can they cross sources, etc...) - """ - repo = self._session.repo - rschema = repo.schema.rschema - vsources = {} + # add source for rewritten constants to sourcesterms + for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): + const = vconsts[0] + source = self._session.source_from_eid(const.eval(self.plan.args)) + if source is self.system_source: + for const in vconsts: + self._set_source_for_term(source, const) + elif source in self._sourcesterms: + source_scopes = frozenset(t.scope for t in self._sourcesterms[source]) + for const in vconsts: + if const.scope in source_scopes: + self._set_source_for_term(source, const) + # if system source is used, add every rewritten constant + # to its supported terms even when associated entity + # doesn't actually comes from it so we get a changes + # that allequals will return True as expected when + # computing needsplit + if self.system_source in sourcesterms: + self._set_source_for_term(self.system_source, const) + # add source for relations + rschema = self._schema.rschema + termssources = {} for rel in self.rqlst.iget_nodes(Relation): # process non final relations only # note: don't try to get schema for 'is' relation (not available @@ -346,7 +376,6 @@ # XXX code below don't deal if some source allow relation # crossing but not another one relsources = repo.rel_type_sources(rel.r_type) - crossvars = None if len(relsources) < 2: # filter out sources being there because they have this # relation in their dont_cross_relations attribute @@ -355,25 +384,54 @@ if relsources: # this means the relation is using a variable inlined as # a constant and another unsupported variable, in which - # case we put the relation in sourcesvars - self._sourcesvars.setdefault(relsources[0], {})[rel] = set(self._solindices) + # case we put the relation in sourcesterms + self._sourcesterms.setdefault(relsources[0], {})[rel] = set(self._solindices) continue lhs, rhs = rel.get_variable_parts() lhsv, rhsv = getattr(lhs, 'variable', lhs), getattr(rhs, 'variable', rhs) - # update dictionnary of sources supporting lhs and rhs vars - if not lhsv in vsources: - vsources[lhsv] = self._term_sources(lhs) - if not rhsv in vsources: - vsources[rhsv] = self._term_sources(rhs) - if self._handle_cross_relation(rel, relsources, vsources): - self._linkedvars.setdefault(lhsv, set()).add((rhsv, rel)) - self._linkedvars.setdefault(rhsv, set()).add((lhsv, rel)) + # update dictionary of sources supporting lhs and rhs vars + if not lhsv in termssources: + termssources[lhsv] = self._term_sources(lhs) + if not rhsv in termssources: + termssources[rhsv] = self._term_sources(rhs) + self._handle_cross_relation(rel, relsources, termssources) + self._linkedterms.setdefault(lhsv, set()).add((rhsv, rel)) + self._linkedterms.setdefault(rhsv, set()).add((lhsv, rel)) + return termssources + + def _handle_cross_relation(self, rel, relsources, termssources): + for source in relsources: + if rel.r_type in source.cross_relations: + ssource = self.system_source + crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) + for const in rel.get_nodes(Constant): + if source.uri != 'system' and not const in self._sourcesterms.get(source, ()): + continue + crossvars.add(const) + self._crossrelations.setdefault(source, {})[rel] = crossvars + if len(crossvars) < 2: + # this means there is a constant in the relation which is + # not supported by the source, so we can stop here + continue + self._sourcesterms.setdefault(ssource, {})[rel] = set(self._solindices) + for term in crossvars: + if len(termssources[term]) == 1 and iter(termssources[term]).next()[0].uri == 'system': + for ov in crossvars: + if ov is not term and (isinstance(ov, Constant) or ov._q_invariant): + ssset = frozenset((ssource,)) + self._remove_sources(ov, termssources[ov] - ssset) + break else: - self._crosslinkedvars.setdefault(lhsv, set()).add((rhsv, rel)) - self._crosslinkedvars.setdefault(rhsv, set()).add((lhsv, rel)) - for term in self._linkedvars: - self._remove_sources_until_stable(term, vsources) - if len(self._sourcesvars) > 1 and hasattr(self.plan.rqlst, 'main_relations'): + self._sourcesterms.setdefault(source, {})[rel] = set(self._solindices) + + def _remove_invalid_sources(self, termssources): + """removes invalid sources from `sourcesterms` member according to + traversed relations and their properties (which sources support them, + can they cross sources, etc...) + """ + for term in self._linkedterms: + self._remove_sources_until_stable(term, termssources) + if len(self._sourcesterms) > 1 and hasattr(self.plan.rqlst, 'main_relations'): # the querier doesn't annotate write queries, need to do it here self.plan.annotate_rqlst() # insert/update/delete queries, we may get extra information from @@ -383,6 +441,8 @@ for etype, vref in self.plan.rqlst.main_variables) else: inserted = {} + repo = self._repo + rschema = self._schema.rschema for rel in self.plan.rqlst.main_relations: if not rschema(rel.r_type).is_final(): # nothing to do if relation is not supported by multiple sources @@ -390,96 +450,88 @@ continue lhs, rhs = rel.get_variable_parts() try: - lhsv = self._extern_term(lhs, vsources, inserted) - rhsv = self._extern_term(rhs, vsources, inserted) + lhsv = self._extern_term(lhs, termssources, inserted) + rhsv = self._extern_term(rhs, termssources, inserted) except KeyError, ex: continue - norelsup = self._norel_support_set(rel) - self._remove_var_sources(lhsv, norelsup, rhsv, vsources) - self._remove_var_sources(rhsv, norelsup, lhsv, vsources) - # cleanup linked var - for var, linkedrelsinfo in self._linkedvars.iteritems(): - self._linkedvars[var] = frozenset(x[0] for x in linkedrelsinfo) - # if there are other sources than the system source, consider simplified - # variables'source - if self._sourcesvars and self._sourcesvars.keys() != [self._session.repo.system_source]: - # add source for rewritten constants to sourcesvars - for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): - const = vconsts[0] - eid = const.eval(self.plan.args) - source = self._session.source_from_eid(eid) - if source is self._session.repo.system_source: - for const in vconsts: - self._set_source_for_var(source, const) - elif source in self._sourcesvars: - source_scopes = frozenset(v.scope for v in self._sourcesvars[source]) - for const in vconsts: - if const.scope in source_scopes: - self._set_source_for_var(source, const) - - def _extern_term(self, term, vsources, inserted): + self._remove_term_sources(lhsv, rel, rhsv, termssources) + self._remove_term_sources(rhsv, rel, lhsv, termssources) + + def _extern_term(self, term, termssources, inserted): var = term.variable if var.stinfo['constnode']: termv = var.stinfo['constnode'] - vsources[termv] = self._term_sources(termv) + termssources[termv] = self._term_sources(termv) elif var in inserted: termv = var - source = self._session.repo.locate_etype_source(inserted[var]) - vsources[termv] = set((source, solindex) for solindex in self._solindices) + source = self._repo.locate_etype_source(inserted[var]) + termssources[termv] = set((source, solindex) + for solindex in self._solindices) else: termv = self.rqlst.defined_vars[var.name] - if not termv in vsources: - vsources[termv] = self._term_sources(termv) + if not termv in termssources: + termssources[termv] = self._term_sources(termv) return termv - def _remove_sources_until_stable(self, var, vsources): - sourcesvars = self._sourcesvars - for ovar, rel in self._linkedvars.get(var, ()): - if not var.scope is ovar.scope and rel.scope.neged(strict=True): + def _remove_sources_until_stable(self, term, termssources): + sourcesterms = self._sourcesterms + for oterm, rel in self._linkedterms.get(term, ()): + if not term.scope is oterm.scope and rel.scope.neged(strict=True): # can't get information from relation inside a NOT exists - # where variables don't belong to the same scope + # where terms don't belong to the same scope continue need_ancestor_scope = False - if not (var.scope is rel.scope and ovar.scope is rel.scope): + if not (term.scope is rel.scope and oterm.scope is rel.scope): if rel.ored(): continue if rel.ored(traverse_scope=True): # if relation has some OR as parent, constraints should only # propagate from parent scope to child scope, nothing else need_ancestor_scope = True - relsources = self._session.repo.rel_type_sources(rel.r_type) + relsources = self._repo.rel_type_sources(rel.r_type) if rel.neged(strict=True) and ( len(relsources) < 2 - or not isinstance(ovar, Variable) - or ovar.valuable_references() != 1 - or any(sourcesvars[source][var] != sourcesvars[source][ovar] + or not isinstance(oterm, Variable) + or oterm.valuable_references() != 1 + or any(sourcesterms[source][term] != sourcesterms[source][oterm] for source in relsources - if var in sourcesvars.get(source, ()) - and ovar in sourcesvars.get(source, ()))): - # neged relation doesn't allow to infer variable sources unless we're - # on a multisource relation for a variable only used by this relation - # (eg "Any X WHERE NOT X multisource_rel Y" and over is Y), iif + if term in sourcesterms.get(source, ()) + and oterm in sourcesterms.get(source, ()))): + # neged relation doesn't allow to infer term sources unless + # we're on a multisource relation for a term only used by this + # relation (eg "Any X WHERE NOT X multisource_rel Y" and over is + # Y) continue - norelsup = self._norel_support_set(rel) - # compute invalid sources for variables and remove them - if not need_ancestor_scope or is_ancestor(var.scope, ovar.scope): - self._remove_var_sources(var, norelsup, ovar, vsources) - if not need_ancestor_scope or is_ancestor(ovar.scope, var.scope): - self._remove_var_sources(ovar, norelsup, var, vsources) + # compute invalid sources for terms and remove them + if not need_ancestor_scope or is_ancestor(term.scope, oterm.scope): + self._remove_term_sources(term, rel, oterm, termssources) + if not need_ancestor_scope or is_ancestor(oterm.scope, term.scope): + self._remove_term_sources(oterm, rel, term, termssources) - def _remove_var_sources(self, var, norelsup, ovar, vsources): - """remove invalid sources for var according to ovar's sources and the - relation between those two variables. + def _remove_term_sources(self, term, rel, oterm, termssources): + """remove invalid sources for term according to oterm's sources and the + relation between those two terms. """ - varsources = vsources[var] - invalid_sources = varsources - (vsources[ovar] | norelsup) + norelsup = self._norel_support_set(rel) + termsources = termssources[term] + invalid_sources = termsources - (termssources[oterm] | norelsup) + if invalid_sources and self._repo.can_cross_relation(rel.r_type): + invalid_sources -= self._sys_source_set + if invalid_sources and isinstance(term, Variable) \ + and self._need_ext_source_access(term, rel): + # if the term is a not invariant variable, we should filter out + # source where the relation is a cross relation from invalid + # sources + invalid_sources = frozenset((s, solidx) for s, solidx in invalid_sources + if not (s in self._crossrelations and + rel in self._crossrelations[s])) if invalid_sources: - self._remove_sources(var, invalid_sources) - varsources -= invalid_sources - self._remove_sources_until_stable(var, vsources) + self._remove_sources(term, invalid_sources) + termsources -= invalid_sources + self._remove_sources_until_stable(term, termssources) def _compute_needsplit(self): - """tell according to sourcesvars if the rqlst has to be splitted for + """tell according to sourcesterms if the rqlst has to be splitted for execution among multiple sources the execution has to be split if @@ -491,20 +543,43 @@ be fetched from some source """ # NOTE: < 2 since may be 0 on queries such as Any X WHERE X eid 2 - if len(self._sourcesvars) < 2: + if len(self._sourcesterms) < 2: self.needsplit = False elif not self.needsplit: - if not allequals(self._sourcesvars.itervalues()): + if not allequals(self._sourcesterms.itervalues()): self.needsplit = True else: - sample = self._sourcesvars.itervalues().next() - if len(sample) > 1 and any(v for v in sample - if not v in self._linkedvars - and not v in self._crosslinkedvars): - self.needsplit = True - - def _set_source_for_var(self, source, var): - self._sourcesvars.setdefault(source, {})[var] = set(self._solindices) + sample = self._sourcesterms.itervalues().next() + if len(sample) > 1: + for term in sample: + # need split if unlinked variable + if isinstance(term, Variable) and not term in self._linkedterms: + self.needsplit = True + break + else: + # need split if there are some cross relation on non + # invariant variable or if the variable is used in + # multi-sources relation + if self._crossrelations: + for reldict in self._crossrelations.itervalues(): + for rel, terms in reldict.iteritems(): + for term in terms: + if isinstance(term, Variable) \ + and self._need_ext_source_access(term, rel): + self.needsplit = True + return + + @cached + def _need_ext_source_access(self, var, rel): + if not var._q_invariant: + return True + if any(r for x, r in self._linkedterms[var] + if not r is rel and self._repo.is_multi_sources_relation(r.r_type)): + return True + return False + + def _set_source_for_term(self, source, term): + self._sourcesterms.setdefault(source, {})[term] = set(self._solindices) def _term_sources(self, term): """returns possible sources for terms `term`""" @@ -513,27 +588,27 @@ return set((source, solindex) for solindex in self._solindices) else: var = getattr(term, 'variable', term) - sources = [source for source, varobjs in self.sourcesvars.iteritems() + sources = [source for source, varobjs in self.sourcesterms.iteritems() if var in varobjs] return set((source, solindex) for source in sources - for solindex in self.sourcesvars[source][var]) + for solindex in self.sourcesterms[source][var]) - def _remove_sources(self, var, sources): - """removes invalid sources (`sources`) from `sourcesvars` + def _remove_sources(self, term, sources): + """removes invalid sources (`sources`) from `sourcesterms` :param sources: the list of sources to remove - :param var: the analyzed variable + :param term: the analyzed term """ - sourcesvars = self._sourcesvars + sourcesterms = self._sourcesterms for source, solindex in sources: try: - sourcesvars[source][var].remove(solindex) + sourcesterms[source][term].remove(solindex) except KeyError: return # may occur with subquery column alias - if not sourcesvars[source][var]: - del sourcesvars[source][var] - if not sourcesvars[source]: - del sourcesvars[source] + if not sourcesterms[source][term]: + del sourcesterms[source][term] + if not sourcesterms[source]: + del sourcesterms[source] def crossed_relation(self, source, relation): return relation in self._crossrelations.get(source, ()) @@ -545,45 +620,97 @@ """ steps = [] select = self.rqlst - rschema = self.plan.schema.rschema + rschema = self._schema.rschema for source in self.part_sources: - sourcevars = self._sourcesvars[source] - while sourcevars: - # take a variable randomly, and all variables supporting the + try: + sourceterms = self._sourcesterms[source] + except KeyError: + continue # already proceed + while sourceterms: + # take a term randomly, and all terms supporting the # same solutions - var, solindices = self._choose_var(sourcevars) + term, solindices = self._choose_term(sourceterms) if source.uri == 'system': # ensure all variables are available for the latest step # (missing one will be available from temporary tables # of previous steps) scope = select - variables = scope.defined_vars.values() + scope.aliases.values() - sourcevars.clear() + terms = scope.defined_vars.values() + scope.aliases.values() + sourceterms.clear() + sources = [source] else: - scope = var.scope - variables = self._expand_vars(var, source, sourcevars, scope, solindices) - if not sourcevars: - del self._sourcesvars[source] - # find which sources support the same variables/solutions - sources = self._expand_sources(source, variables, solindices) - # suppose this is a final step until the contrary is proven - final = scope is select - # set of variables which should be additionaly selected when + scope = term.scope + # find which sources support the same term and solutions + sources = self._expand_sources(source, term, solindices) + # no try to get as much terms as possible + terms = self._expand_terms(term, sources, sourceterms, + scope, solindices) + if len(terms) == 1 and isinstance(terms[0], Constant): + # we can't generate anything interesting with a single + # constant term (will generate an empty "Any" query), + # go to the next iteration directly! + continue + if not sourceterms: + try: + del self._sourcesterms[source] + except KeyError: + # XXX already cleaned + pass + # set of terms which should be additionaly selected when # possible needsel = set() - # add attribute variables and mark variables which should be - # additionaly selected when possible - for var in select.defined_vars.itervalues(): - if not var in variables: - stinfo = var.stinfo - for ovar, rtype in stinfo['attrvars']: - if ovar in variables: + if not self._sourcesterms: + terms += scope.defined_vars.values() + scope.aliases.values() + final = True + else: + # suppose this is a final step until the contrary is proven + final = scope is select + # add attribute variables and mark variables which should be + # additionaly selected when possible + for var in select.defined_vars.itervalues(): + if not var in terms: + stinfo = var.stinfo + for ovar, rtype in stinfo['attrvars']: + if ovar in terms: + needsel.add(var.name) + terms.append(var) + break + else: needsel.add(var.name) - variables.append(var) + final = False + # check where all relations are supported by the sources + for rel in scope.iget_nodes(Relation): + if rel.is_types_restriction(): + continue + # take care not overwriting the existing "source" identifier + for _source in sources: + if not _source.support_relation(rel.r_type) or ( + self.crossed_relation(_source, rel) and not rel in terms): + for vref in rel.iget_nodes(VariableRef): + needsel.add(vref.name) + final = False break else: - needsel.add(var.name) - final = False + if not scope is select: + self._exists_relation(rel, terms, needsel) + # if relation is supported by all sources and some of + # its lhs/rhs variable isn't in "terms", and the + # other end *is* in "terms", mark it have to be + # selected + if source.uri != 'system' and not rschema(rel.r_type).is_final(): + lhs, rhs = rel.get_variable_parts() + try: + lhsvar = lhs.variable + except AttributeError: + lhsvar = lhs + try: + rhsvar = rhs.variable + except AttributeError: + rhsvar = rhs + if lhsvar in terms and not rhsvar in terms: + needsel.add(lhsvar.name) + elif rhsvar in terms and not lhsvar in terms: + needsel.add(rhsvar.name) if final and source.uri != 'system': # check rewritten constants for vconsts in select.stinfo['rewritten'].itervalues(): @@ -591,62 +718,25 @@ eid = const.eval(self.plan.args) _source = self._session.source_from_eid(eid) if len(sources) > 1 or not _source in sources: - # if there is some rewriten constant used by a - # not neged relation while there are some source - # not supporting the associated entity, this step - # can't be final (unless the relation is explicitly - # in `variables`, eg cross relations) + # if there is some rewriten constant used by a not + # neged relation while there are some source not + # supporting the associated entity, this step can't + # be final (unless the relation is explicitly in + # `terms`, eg cross relations) for c in vconsts: rel = c.relation() - if rel is None or not (rel in variables or rel.neged(strict=True)): - #if rel is not None and rel.r_type == 'identity' and not rel.neged(strict=True): + if rel is None or not (rel in terms or rel.neged(strict=True)): final = False break break - # check where all relations are supported by the sources - for rel in scope.iget_nodes(Relation): - if rel.is_types_restriction(): - continue - # take care not overwriting the existing "source" identifier - for _source in sources: - if not _source.support_relation(rel.r_type): - for vref in rel.iget_nodes(VariableRef): - needsel.add(vref.name) - final = False - break - elif self.crossed_relation(_source, rel) and not rel in variables: - final = False - break - else: - if not scope is select: - self._exists_relation(rel, variables, needsel) - # if relation is supported by all sources and some of - # its lhs/rhs variable isn't in "variables", and the - # other end *is* in "variables", mark it have to be - # selected - if source.uri != 'system' and not rschema(rel.r_type).is_final(): - lhs, rhs = rel.get_variable_parts() - try: - lhsvar = lhs.variable - except AttributeError: - lhsvar = lhs - try: - rhsvar = rhs.variable - except AttributeError: - rhsvar = rhs - if lhsvar in variables and not rhsvar in variables: - needsel.add(lhsvar.name) - elif rhsvar in variables and not lhsvar in variables: - needsel.add(rhsvar.name) if final: - self._cleanup_sourcesvars(sources, solindices) - # XXX rename: variables may contain Relation and Constant nodes... - steps.append( (sources, variables, solindices, scope, needsel, - final) ) + self._cleanup_sourcesterms(sources, solindices) + steps.append((sources, terms, solindices, scope, needsel, final) + ) return steps - def _exists_relation(self, rel, variables, needsel): - rschema = self.plan.schema.rschema(rel.r_type) + def _exists_relation(self, rel, terms, needsel): + rschema = self._schema.rschema(rel.r_type) lhs, rhs = rel.get_variable_parts() try: lhsvar, rhsvar = lhs.variable, rhs.variable @@ -658,135 +748,165 @@ # variable is refed by an outer scope and should be substituted # using an 'identity' relation (else we'll get a conflict of # temporary tables) - if rhsvar in variables and not lhsvar in variables: - self._identity_substitute(rel, lhsvar, variables, needsel) - elif lhsvar in variables and not rhsvar in variables: - self._identity_substitute(rel, rhsvar, variables, needsel) + if rhsvar in terms and not lhsvar in terms: + self._identity_substitute(rel, lhsvar, terms, needsel) + elif lhsvar in terms and not rhsvar in terms: + self._identity_substitute(rel, rhsvar, terms, needsel) - def _identity_substitute(self, relation, var, variables, needsel): + def _identity_substitute(self, relation, var, terms, needsel): newvar = self._insert_identity_variable(relation.scope, var) if newvar is not None: # ensure relation is using '=' operator, else we rely on a # sqlgenerator side effect (it won't insert an inequality operator # in this case) relation.children[1].operator = '=' - variables.append(newvar) + terms.append(newvar) needsel.add(newvar.name) - def _choose_var(self, sourcevars): + def _choose_term(self, sourceterms): + """pick one term among terms supported by a source, which will be used + as a base to generate an execution step + """ secondchoice = None - if len(self._sourcesvars) > 1: + if len(self._sourcesterms) > 1: # priority to variable from subscopes - for var in sourcevars: - if not var.scope is self.rqlst: - if isinstance(var, Variable): - return var, sourcevars.pop(var) - secondchoice = var + for term in sourceterms: + if not term.scope is self.rqlst: + if isinstance(term, Variable): + return term, sourceterms.pop(term) + secondchoice = term else: - # priority to variable outer scope - for var in sourcevars: - if var.scope is self.rqlst: - if isinstance(var, Variable): - return var, sourcevars.pop(var) - secondchoice = var + # priority to variable from outer scope + for term in sourceterms: + if term.scope is self.rqlst: + if isinstance(term, Variable): + return term, sourceterms.pop(term) + secondchoice = term if secondchoice is not None: - return secondchoice, sourcevars.pop(secondchoice) - # priority to variable - for var in sourcevars: - if isinstance(var, Variable): - return var, sourcevars.pop(var) - # whatever - var = iter(sourcevars).next() - return var, sourcevars.pop(var) - + return secondchoice, sourceterms.pop(secondchoice) + # priority to variable with the less solutions supported and with the + # most valuable refs + variables = sorted([(var, sols) for (var, sols) in sourceterms.items() + if isinstance(var, Variable)], + key=lambda (v, s): (len(s), -v.valuable_references())) + if variables: + var = variables[0][0] + return var, sourceterms.pop(var) + # priority to constant + for term in sourceterms: + if isinstance(term, Constant): + return term, sourceterms.pop(term) + # whatever (relation) + term = iter(sourceterms).next() + return term, sourceterms.pop(term) - def _expand_vars(self, var, source, sourcevars, scope, solindices): - variables = [var] + def _expand_sources(self, selected_source, term, solindices): + """return all sources supporting given term / solindices""" + sources = [selected_source] + sourcesterms = self._sourcesterms + for source in sourcesterms.keys(): + if source is selected_source: + continue + if not (term in sourcesterms[source] and + solindices.issubset(sourcesterms[source][term])): + continue + sources.append(source) + if source.uri != 'system' or not (isinstance(term, Variable) and not term in self._linkedterms): + termsolindices = sourcesterms[source][term] + termsolindices -= solindices + if not termsolindices: + del sourcesterms[source][term] + if not sourcesterms[source]: + del sourcesterms[source] + return sources + + def _expand_terms(self, term, sources, sourceterms, scope, solindices): + terms = [term] + sources = sorted(sources) + sourcesterms = self._sourcesterms nbunlinked = 1 - linkedvars = self._linkedvars - # variable has to belong to the same scope if there is more + linkedterms = self._linkedterms + # term has to belong to the same scope if there is more # than the system source remaining - if len(self._sourcesvars) > 1 and not scope is self.rqlst: - candidates = (v for v in sourcevars.keys() if scope is v.scope) + if len(sourcesterms) > 1 and not scope is self.rqlst: + candidates = (t for t in sourceterms.keys() if scope is t.scope) else: - candidates = sourcevars #.iterkeys() - # we only want one unlinked variable in each generated query - candidates = [v for v in candidates - if isinstance(v, Constant) or - (solindices.issubset(sourcevars[v]) and v in linkedvars)] - accept_var = lambda x: (isinstance(x, Constant) or any(v for v in variables if v in linkedvars.get(x, ()))) - source_cross_rels = self._crossrelations.get(source, ()) - if isinstance(var, Relation) and var in source_cross_rels: - cross_vars = source_cross_rels.pop(var) - base_accept_var = accept_var - accept_var = lambda x: (base_accept_var(x) or x in cross_vars) - for refed in cross_vars: + candidates = sourceterms #.iterkeys() + # we only want one unlinked term in each generated query + candidates = [t for t in candidates + if isinstance(t, (Constant, Relation)) or + (solindices.issubset(sourceterms[t]) and t in linkedterms)] + cross_rels = {} + for source in sources: + cross_rels.update(self._crossrelations.get(source, {})) + exclude = {} + for rel, crossvars in cross_rels.iteritems(): + vars = [t for t in crossvars if isinstance(t, Variable)] + try: + exclude[vars[0]] = vars[1] + exclude[vars[1]] = vars[0] + except IndexError: + pass + accept_term = lambda x: (not any(s for s in sources if not x in sourcesterms.get(s, ())) + and any(t for t in terms if t in linkedterms.get(x, ())) + and not exclude.get(x) in terms) + if isinstance(term, Relation) and term in cross_rels: + cross_terms = cross_rels.pop(term) + base_accept_term = accept_term + accept_term = lambda x: (base_accept_term(x) or x in cross_terms) + for refed in cross_terms: if not refed in candidates: - candidates.append(refed) - else: - cross_vars = () - # repeat until no variable can't be added, since addition of a new - # variable may permit to another one to be added + terms.append(refed) + # repeat until no term can't be added, since addition of a new + # term may permit to another one to be added modified = True while modified and candidates: modified = False - for var in candidates[:]: - if accept_var(var): - variables.append(var) - try: - # constant nodes should be systematically deleted - if isinstance(var, Constant): - del sourcevars[var] - else: - # variable nodes should be deleted once all possible - # solutions indices have been consumed - sourcevars[var] -= solindices - if not sourcevars[var]: - del sourcevars[var] - except KeyError: - assert var in cross_vars - candidates.remove(var) + for term in candidates[:]: + if isinstance(term, Constant): + relation = term.relation() + if sorted(set(x[0] for x in self._term_sources(term))) != sources: + continue + terms.append(term) + candidates.remove(term) modified = True - return variables + del sourceterms[term] + elif accept_term(term): + terms.append(term) + candidates.remove(term) + modified = True + self._cleanup_sourcesterms(sources, solindices, term) + return terms - def _expand_sources(self, selected_source, vars, solindices): - sources = [selected_source] - sourcesvars = self._sourcesvars - for source in sourcesvars: - if source is selected_source: - continue - for var in vars: - if not (var in sourcesvars[source] and - solindices.issubset(sourcesvars[source][var])): - break - else: - sources.append(source) - if source.uri != 'system': - for var in vars: - varsolindices = sourcesvars[source][var] - varsolindices -= solindices - if not varsolindices: - del sourcesvars[source][var] - return sources - - def _cleanup_sourcesvars(self, sources, solindices): - """on final parts, remove solutions so we know they are already processed""" + def _cleanup_sourcesterms(self, sources, solindices, term=None): + """remove solutions so we know they are already processed""" for source in sources: try: - sourcevars = self._sourcesvars[source] + sourceterms = self._sourcesterms[source] except KeyError: continue - for var, varsolindices in sourcevars.items(): - if isinstance(var, Relation) and self.crossed_relation(source, var): - continue - varsolindices -= solindices - if not varsolindices: - del sourcevars[var] - + if term is None: + for term, termsolindices in sourceterms.items(): + if isinstance(term, Relation) and self.crossed_relation(source, term): + continue + termsolindices -= solindices + if not termsolindices: + del sourceterms[term] + else: + try: + sourceterms[term] -= solindices + if not sourceterms[term]: + del sourceterms[term] + except KeyError: + pass + #assert term in cross_terms + if not sourceterms: + del self._sourcesterms[source] + def merge_input_maps(self, allsolindices): - """inputmaps is a dictionary with tuple of solution indices as key with an - associateed input map as value. This function compute for each solution - its necessary input map and return them grouped + """inputmaps is a dictionary with tuple of solution indices as key with + an associated input map as value. This function compute for each + solution its necessary input map and return them grouped ex: inputmaps = {(0, 1, 2): {'A': 't1.login1', 'U': 't1.C0', 'U.login': 't1.login1'}, @@ -822,26 +942,38 @@ def build_final_part(self, select, solindices, inputmap, sources, insertedvars): - plan = self.plan - rqlst = plan.finalize(select, [self._solutions[i] for i in solindices], - insertedvars) + solutions = [self._solutions[i] for i in solindices] + if self._conflicts: + for varname, mappedto in self._conflicts: + var = select.defined_vars[varname] + newvar = select.make_variable() + # XXX should use var.scope but scope hasn't been computed yet + select.add_relation(var, 'identity', newvar) + for sol in solutions: + sol[newvar.name] = sol[varname] + inputmap[newvar.name] = mappedto + rqlst = self.plan.finalize(select, solutions, insertedvars) if self.temptable is None and self.finaltable is None: - return OneFetchStep(plan, rqlst, sources, inputmap=inputmap) + return OneFetchStep(self.plan, rqlst, sources, inputmap=inputmap) table = self.temptable or self.finaltable - return FetchStep(plan, rqlst, sources, table, True, inputmap) + return FetchStep(self.plan, rqlst, sources, table, True, inputmap) def build_non_final_part(self, select, solindices, sources, insertedvars, table): """non final step, will have to store results in a temporary table""" - plan = self.plan - rqlst = plan.finalize(select, [self._solutions[i] for i in solindices], - insertedvars) - step = FetchStep(plan, rqlst, sources, table, False) + solutions = [self._solutions[i] for i in solindices] + rqlst = self.plan.finalize(select, solutions, insertedvars) + step = FetchStep(self.plan, rqlst, sources, table, False) # update input map for following steps, according to processed solutions inputmapkey = tuple(sorted(solindices)) inputmap = self._inputmaps.setdefault(inputmapkey, {}) + for varname, mapping in step.outputmap.iteritems(): + if varname in inputmap and \ + not (mapping == inputmap[varname] or + self._schema.eschema(solutions[0][varname]).is_final()): + self._conflicts.append((varname, inputmap[varname])) inputmap.update(step.outputmap) - plan.add_step(step) + self.plan.add_step(step) class MSPlanner(SSPlanner): @@ -915,7 +1047,7 @@ byinputmap = {} for ppi in cppis: select = ppi.rqlst - if sources != (plan.session.repo.system_source,): + if sources != (ppi.system_source,): add_types_restriction(self.schema, select) # part plan info for subqueries inputmap = self._ppi_subqueries(ppi) @@ -972,10 +1104,10 @@ atemptable = None selection = select.selection ppi.temptable = atemptable - vfilter = VariablesFiltererVisitor(self.schema, ppi) + vfilter = TermsFiltererVisitor(self.schema, ppi) steps = [] - for sources, variables, solindices, scope, needsel, final in stepdefs: - # extract an executable query using only the specified variables + for sources, terms, solindices, scope, needsel, final in stepdefs: + # extract an executable query using only the specified terms if sources[0].uri == 'system': # in this case we have to merge input maps before call to # filter so already processed restriction are correctly @@ -983,7 +1115,7 @@ solsinputmaps = ppi.merge_input_maps(solindices) for solindices, inputmap in solsinputmaps: minrqlst, insertedvars = vfilter.filter( - sources, variables, scope, set(solindices), needsel, final) + sources, terms, scope, set(solindices), needsel, final) if inputmap is None: inputmap = subinputmap else: @@ -992,10 +1124,10 @@ sources, insertedvars)) else: # this is a final part (i.e. retreiving results for the - # original query part) if all variable / sources have been + # original query part) if all term / sources have been # treated or if this is the last shot for used solutions minrqlst, insertedvars = vfilter.filter( - sources, variables, scope, solindices, needsel, final) + sources, terms, scope, solindices, needsel, final) if final: solsinputmaps = ppi.merge_input_maps(solindices) for solindices, inputmap in solsinputmaps: @@ -1003,10 +1135,17 @@ inputmap = subinputmap else: inputmap.update(subinputmap) - steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, - sources, insertedvars)) + if inputmap and len(sources) > 1: + sources.remove(ppi.system_source) + steps.append(ppi.build_final_part(minrqlst, solindices, None, + sources, insertedvars)) + steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, + [ppi.system_source], insertedvars)) + else: + steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, + sources, insertedvars)) else: - table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in variables)), + table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)), ''.join(sorted(str(i) for i in solindices))) ppi.build_non_final_part(minrqlst, solindices, sources, insertedvars, table) @@ -1039,7 +1178,7 @@ pass -class VariablesFiltererVisitor(object): +class TermsFiltererVisitor(object): def __init__(self, schema, ppi): self.schema = schema self.ppi = ppi @@ -1048,9 +1187,9 @@ self.extneedsel = frozenset(vref.name for sortterm in ppi.rqlst.orderby for vref in sortterm.iget_nodes(VariableRef)) - def _rqlst_accept(self, rqlst, node, newroot, variables, setfunc=None): + def _rqlst_accept(self, rqlst, node, newroot, terms, setfunc=None): try: - newrestr, node_ = node.accept(self, newroot, variables[:]) + newrestr, node_ = node.accept(self, newroot, terms[:]) except UnsupportedBranch: return rqlst if setfunc is not None and newrestr is not None: @@ -1059,18 +1198,18 @@ rqlst = node.parent return rqlst - def filter(self, sources, variables, rqlst, solindices, needsel, final): + def filter(self, sources, terms, rqlst, solindices, needsel, final): if server.DEBUG: - print 'filter', final and 'final' or '', sources, variables, rqlst, solindices, needsel + print 'filter', final and 'final' or '', sources, terms, rqlst, solindices, needsel newroot = Select() self.sources = sorted(sources) - self.variables = variables + self.terms = terms self.solindices = solindices self.final = final - # variables which appear in unsupported branches + # terms which appear in unsupported branches needsel |= self.extneedsel self.needsel = needsel - # variables which appear in supported branches + # terms which appear in supported branches self.mayneedsel = set() # new inserted variables self.insertedvars = [] @@ -1079,36 +1218,36 @@ self.use_only_defined = False self.scopes = {rqlst: newroot} if rqlst.where: - rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, variables, + rqlst = self._rqlst_accept(rqlst, rqlst.where, newroot, terms, newroot.set_where) if isinstance(rqlst, Select): self.use_only_defined = True if rqlst.groupby: groupby = [] for node in rqlst.groupby: - rqlst = self._rqlst_accept(rqlst, node, newroot, variables, + rqlst = self._rqlst_accept(rqlst, node, newroot, terms, groupby.append) if groupby: newroot.set_groupby(groupby) if rqlst.having: having = [] for node in rqlst.having: - rqlst = self._rqlst_accept(rqlst, node, newroot, variables, + rqlst = self._rqlst_accept(rqlst, node, newroot, terms, having.append) if having: newroot.set_having(having) if final and rqlst.orderby and not self.hasaggrstep: orderby = [] for node in rqlst.orderby: - rqlst = self._rqlst_accept(rqlst, node, newroot, variables, + rqlst = self._rqlst_accept(rqlst, node, newroot, terms, orderby.append) if orderby: newroot.set_orderby(orderby) - self.process_selection(newroot, variables, rqlst) + self.process_selection(newroot, terms, rqlst) elif not newroot.where: - # no restrictions have been copied, just select variables and add + # no restrictions have been copied, just select terms and add # type restriction (done later by add_types_restriction) - for v in variables: + for v in terms: if not isinstance(v, Variable): continue newroot.append_selected(VariableRef(newroot.get_variable(v.name))) @@ -1156,12 +1295,12 @@ print '--->', newroot return newroot, self.insertedvars - def visit_and(self, node, newroot, variables): + def visit_and(self, node, newroot, terms): subparts = [] for i in xrange(len(node.children)): child = node.children[i] try: - newchild, child_ = child.accept(self, newroot, variables) + newchild, child_ = child.accept(self, newroot, terms) if not child_ is child: node = child_.parent if newchild is None: @@ -1180,10 +1319,10 @@ def _relation_supported(self, relation): rtype = relation.r_type for source in self.sources: - if not source.support_relation(rtype) \ - or (rtype in source.cross_relations and not relation in self.variables):#self.ppi.crossed_relation(source, relation): + if not source.support_relation(rtype) or ( + rtype in source.cross_relations and not relation in self.terms): return False - if not self.final: + if not self.final and not relation in self.terms: rschema = self.schema.rschema(relation.r_type) if not rschema.is_final(): for term in relation.get_nodes((VariableRef, Constant)): @@ -1193,7 +1332,7 @@ return False return True - def visit_relation(self, node, newroot, variables): + def visit_relation(self, node, newroot, terms): if not node.is_types_restriction(): if node in self.skip and self.solindices.issubset(self.skip[node]): if not self.schema.rschema(node.r_type).is_final(): @@ -1214,12 +1353,15 @@ # copy a type restriction while the variable is not actually used) elif not any(self._relation_supported(rel) for rel in node.children[0].variable.stinfo['relations']): - rel, node = self.visit_default(node, newroot, variables) + rel, node = self.visit_default(node, newroot, terms) return rel, node else: raise UnsupportedBranch() rschema = self.schema.rschema(node.r_type) - res = self.visit_default(node, newroot, variables)[0] + try: + res = self.visit_default(node, newroot, terms)[0] + except Exception, ex: + raise ored = node.ored() if rschema.is_final() or rschema.inlined: vrefs = node.children[1].get_nodes(VariableRef) @@ -1233,13 +1375,13 @@ vref = vrefs[0] # XXX check operator ? self.hasvar[(node.children[0].name, rschema)] = vref - if self._may_skip_attr_rel(rschema, node, vref, ored, variables, res): + if self._may_skip_attr_rel(rschema, node, vref, ored, terms, res): self.skip.setdefault(node, set()).update(self.solindices) elif not ored: self.skip.setdefault(node, set()).update(self.solindices) return res, node - def _may_skip_attr_rel(self, rschema, rel, vref, ored, variables, res): + def _may_skip_attr_rel(self, rschema, rel, vref, ored, terms, res): var = vref.variable if ored: return False @@ -1247,35 +1389,35 @@ return False if not same_scope(var): return False - if any(v for v,_ in var.stinfo['attrvars'] if not v.name in variables): + if any(v for v,_ in var.stinfo['attrvars'] if not v.name in terms): return False return True - def visit_exists(self, node, newroot, variables): + def visit_exists(self, node, newroot, terms): newexists = node.__class__() self.scopes = {node: newexists} - subparts, node = self._visit_children(node, newroot, variables) + subparts, node = self._visit_children(node, newroot, terms) if not subparts: return None, node newexists.set_where(subparts[0]) return newexists, node - def visit_not(self, node, newroot, variables): - subparts, node = self._visit_children(node, newroot, variables) + def visit_not(self, node, newroot, terms): + subparts, node = self._visit_children(node, newroot, terms) if not subparts: return None, node return copy_node(newroot, node, subparts), node - def visit_group(self, node, newroot, variables): + def visit_group(self, node, newroot, terms): if not self.final: return None, node - return self.visit_default(node, newroot, variables) + return self.visit_default(node, newroot, terms) - def visit_variableref(self, node, newroot, variables): + def visit_variableref(self, node, newroot, terms): if self.use_only_defined: if not node.variable.name in newroot.defined_vars: raise UnsupportedBranch(node.name) - elif not node.variable in variables: + elif not node.variable in terms: raise UnsupportedBranch(node.name) self.mayneedsel.add(node.name) # set scope so we can insert types restriction properly @@ -1283,28 +1425,28 @@ newvar.stinfo['scope'] = self.scopes.get(node.variable.scope, newroot) return VariableRef(newvar), node - def visit_constant(self, node, newroot, variables): + def visit_constant(self, node, newroot, terms): return copy_node(newroot, node), node - def visit_default(self, node, newroot, variables): - subparts, node = self._visit_children(node, newroot, variables) + def visit_default(self, node, newroot, terms): + subparts, node = self._visit_children(node, newroot, terms) return copy_node(newroot, node, subparts), node visit_comparison = visit_mathexpression = visit_constant = visit_function = visit_default visit_sort = visit_sortterm = visit_default - def _visit_children(self, node, newroot, variables): + def _visit_children(self, node, newroot, terms): subparts = [] for i in xrange(len(node.children)): child = node.children[i] - newchild, child_ = child.accept(self, newroot, variables) + newchild, child_ = child.accept(self, newroot, terms) if not child is child_: node = child_.parent if newchild is not None: subparts.append(newchild) return subparts, node - def process_selection(self, newroot, variables, rqlst): + def process_selection(self, newroot, terms, rqlst): if self.final: for term in rqlst.selection: newroot.append_selected(term.copy(newroot)) @@ -1317,7 +1459,7 @@ supportedvars = [] for vref in vrefs: var = vref.variable - if var in variables: + if var in terms: supportedvars.append(vref) continue else: @@ -1331,9 +1473,9 @@ if not vref in newroot.get_selected_variables(): newroot.append_selected(VariableRef(newroot.get_variable(vref.name))) - def add_necessary_selection(self, newroot, variables): + def add_necessary_selection(self, newroot, terms): selected = tuple(newroot.get_selected_variables()) - for varname in variables: + for varname in terms: var = newroot.defined_vars[varname] for vref in var.references(): rel = vref.relation() diff -r e8b7c7407edf -r 9f2760f15000 server/repository.py --- a/server/repository.py Fri Apr 03 11:04:27 2009 +0200 +++ b/server/repository.py Sat Apr 04 14:46:03 2009 +0200 @@ -901,11 +901,6 @@ source = subjsource return source - @cached - def rel_type_sources(self, rtype): - return [source for source in self.sources - if source.support_relation(rtype) or rtype in source.dont_cross_relations] - def locate_etype_source(self, etype): for source in self.sources: if source.support_entity(etype, 1): @@ -1099,6 +1094,26 @@ pass return nameserver + # multi-sources planner helpers ########################################### + + @cached + def rel_type_sources(self, rtype): + return [source for source in self.sources + if source.support_relation(rtype) + or rtype in source.dont_cross_relations] + + @cached + def can_cross_relation(self, rtype): + return [source for source in self.sources + if source.support_relation(rtype) + and rtype in source.cross_relations] + + @cached + def is_multi_sources_relation(self, rtype): + return any(source for source in self.sources + if not source is self.system_source + and source.support_relation(rtype)) + def pyro_unregister(config): """unregister the repository from the pyro name server""" diff -r e8b7c7407edf -r 9f2760f15000 server/test/unittest_msplanner.py --- a/server/test/unittest_msplanner.py Fri Apr 03 11:04:27 2009 +0200 +++ b/server/test/unittest_msplanner.py Sat Apr 04 14:46:03 2009 +0200 @@ -48,6 +48,12 @@ {'X': 'Societe'}, {'X': 'State'}, {'X': 'SubDivision'}, {'X': 'Tag'}, {'X': 'TrInfo'}, {'X': 'Transition'}]) +def clear_ms_caches(repo): + clear_cache(repo, 'rel_type_sources') + clear_cache(repo, 'can_cross_relation') + clear_cache(repo, 'is_multi_sources_relation') + # XXX source_defs + # keep cnx so it's not garbage collected and the associated session is closed repo, cnx = init_test_database('sqlite') @@ -90,6 +96,7 @@ repo.sources_by_uri['cards'] = self.sources[-1] self.rql = self.sources[-1] do_monkey_patch() + clear_ms_caches(repo) def tearDown(self): undo_monkey_patch() @@ -118,19 +125,19 @@ def _test(self, rql, *args): if len(args) == 3: - kwargs, sourcesvars, needsplit = args + kwargs, sourcesterms, needsplit = args else: - sourcesvars, needsplit = args + sourcesterms, needsplit = args kwargs = None plan = self._prepare_plan(rql, kwargs) union = plan.rqlst plan.preprocess(union) ppi = PartPlanInformation(plan, union.children[0]) - for sourcevars in ppi._sourcesvars.itervalues(): + for sourcevars in ppi._sourcesterms.itervalues(): for var in sourcevars.keys(): solindices = sourcevars.pop(var) sourcevars[var._ms_table_key()] = solindices - self.assertEquals(ppi._sourcesvars, sourcesvars) + self.assertEquals(ppi._sourcesterms, sourcesterms) self.assertEquals(ppi.needsplit, needsplit) @@ -163,7 +170,7 @@ """ ueid = self.session.user.eid self._test('Any X WHERE X eid %(x)s', {'x': ueid}, - {}, False) + {self.system: {'x': s[0]}}, False) def test_simple_invariant(self): """retrieve EUser X from system source only (X is invariant and in_group not supported by ldap source) @@ -241,8 +248,11 @@ def test_complex_optional(self): ueid = self.session.user.eid self._test('Any U WHERE WF wf_info_for X, X eid %(x)s, WF owned_by U?, WF from_state FS', {'x': ueid}, - {self.system: {'WF': s[0], 'FS': s[0], 'U': s[0], 'from_state': s[0], 'owned_by': s[0], 'wf_info_for': s[0]}}, False) - + {self.system: {'WF': s[0], 'FS': s[0], 'U': s[0], + 'from_state': s[0], 'owned_by': s[0], 'wf_info_for': s[0], + 'x': s[0]}}, + False) + def test_exists4(self): """ State S could come from both rql source and system source, @@ -253,29 +263,99 @@ 'EXISTS(X copain T, T login L, T login in ("comme", "cochon")) OR ' 'EXISTS(X in_state S, S name "pascontent", NOT X copain T2, T2 login "billy")', {self.system: {'X': s[0], 'S': s[0], 'T2': s[0], 'T': s[0], 'G': s[0], 'copain': s[0], 'in_group': s[0]}, - self.ldap: {'X': s[0], 'T2': s[0], 'T': s[0]}}, True) + self.ldap: {'X': s[0], 'T2': s[0], 'T': s[0]}}, + True) def test_relation_need_split(self): self._test('Any X, S WHERE X in_state S', {self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2]}, - self.rql: {'X': s[2], 'S': s[2]}}, True) + self.rql: {'X': s[2], 'S': s[2]}}, + True) + + def test_not_relation_need_split(self): + self._test('Any SN WHERE NOT X in_state S, S name SN', + {self.rql: {'X': s[2], 'S': s[0, 1, 2]}, + self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2]}}, + True) + + def test_not_relation_no_split_external(self): + repo._type_source_cache[999999] = ('Note', 'cards', 999999) + # similar to the above test but with an eid coming from the external source. + # the same plan may be used, since we won't find any record in the system source + # linking 9999999 to a state + self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN', + {'x': 999999}, + {self.rql: {'x': s[0], 'S': s[0]}, + self.system: {'x': s[0], 'S': s[0]}}, + False) def test_relation_restriction_ambigous_need_split(self): self._test('Any X,T WHERE X in_state S, S name "pending", T tags X', {self.system: {'X': s[0, 1, 2], 'S': s[0, 1, 2], 'T': s[0, 1, 2], 'tags': s[0, 1, 2]}, - self.rql: {'X': s[2], 'S': s[2]}}, True) + self.rql: {'X': s[2], 'S': s[2]}}, + True) def test_simplified_var(self): repo._type_source_cache[999999] = ('Note', 'cards', 999999) self._test('Any U WHERE U in_group G, (G name IN ("managers", "logilab") OR (X require_permission P?, P name "bla", P require_group G)), X eid %(x)s, U eid %(u)s', {'x': 999999, 'u': self.session.user.eid}, - {self.system: {'P': s[0], 'G': s[0], 'X': s[0], 'require_permission': s[0], 'in_group': s[0], 'P': s[0], 'require_group': s[0]}}, False) + {self.system: {'P': s[0], 'G': s[0], 'X': s[0], + 'require_permission': s[0], 'in_group': s[0], 'P': s[0], 'require_group': s[0], + 'u': s[0]}}, + False) def test_delete_relation1(self): ueid = self.session.user.eid self._test('Any X, Y WHERE X created_by Y, X eid %(x)s, NOT Y eid %(y)s', {'x': ueid, 'y': ueid}, - {self.system: {'Y': s[0], 'created_by': s[0]}}, False) + {self.system: {'Y': s[0], 'created_by': s[0], 'x': s[0]}}, + False) + + def test_crossed_relation_eid_1_needattr(self): + repo._type_source_cache[999999] = ('Note', 'system', 999999) + ueid = self.session.user.eid + self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', + {'x': 999999,}, + {self.rql: {'Y': s[0]}, self.system: {'Y': s[0], 'x': s[0]}}, + True) + + def test_crossed_relation_eid_1_invariant(self): + repo._type_source_cache[999999] = ('Note', 'system', 999999) + self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', + {'x': 999999}, + {self.system: {'Y': s[0], 'x': s[0]}}, + False) + + def test_crossed_relation_eid_2_invariant(self): + repo._type_source_cache[999999] = ('Note', 'cards', 999999) + self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', + {'x': 999999,}, + {self.rql: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}, + self.system: {'Y': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}}, + False) + + def test_version_crossed_depends_on_1(self): + repo._type_source_cache[999999] = ('Note', 'cards', 999999) + self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', + {'x': 999999}, + {self.rql: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}, + self.system: {'X': s[0], 'AD': s[0], 'multisource_crossed_rel': s[0], 'x': s[0]}}, + True) + + def test_version_crossed_depends_on_2(self): + repo._type_source_cache[999999] = ('Note', 'system', 999999) + self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', + {'x': 999999}, + {self.rql: {'X': s[0], 'AD': s[0]}, + self.system: {'X': s[0], 'AD': s[0], 'x': s[0]}}, + True) + + def test_simplified_var_3(self): + repo._type_source_cache[999999] = ('Note', 'cards', 999999) + repo._type_source_cache[999998] = ('State', 'cards', 999998) + self._test('Any S,T WHERE S eid %(s)s, N eid %(n)s, N type T, N is Note, S is State', + {'n': 999999, 's': 999998}, + {self.rql: {'s': s[0], 'N': s[0]}}, False) @@ -1142,50 +1222,40 @@ # in the source where %(x)s is not coming from and will be removed during rql # generation for the external source self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN', - [('OneFetchStep', [('Any SN WHERE NOT 5 in_state S, S name SN, S is State', [{'S': 'State', 'SN': 'String'}])], + [('OneFetchStep', [('Any SN WHERE NOT 5 in_state S, S name SN, S is State', + [{'S': 'State', 'SN': 'String'}])], None, None, [self.rql, self.system], {}, [])], {'x': ueid}) def test_not_relation_no_split_external(self): repo._type_source_cache[999999] = ('Note', 'cards', 999999) - # similar to the above test but with an eid coming from the external source + # similar to the above test but with an eid coming from the external source. + # the same plan may be used, since we won't find any record in the system source + # linking 9999999 to a state self._test('Any SN WHERE NOT X in_state S, X eid %(x)s, S name SN', - [('UnionStep', None, None, - [('OneFetchStep', - [('Any SN WHERE NOT 999999 in_state S, S name SN, S is State', - [{'S': 'State', 'SN': 'String'}])], - None, None, [self.rql], {}, - []), - ('OneFetchStep', - [('Any SN WHERE S name SN, S is State', - [{'S': 'State', 'SN': 'String'}])], - None, None, [self.system], {}, - [])] - )], + [('OneFetchStep', [('Any SN WHERE NOT 999999 in_state S, S name SN, S is State', + [{'S': 'State', 'SN': 'String'}])], + None, None, [self.rql, self.system], {}, [])], {'x': 999999}) def test_not_relation_need_split(self): - ueid = self.session.user.eid self._test('Any SN WHERE NOT X in_state S, S name SN', - [('FetchStep', [('Any SN,S WHERE S name SN, S is State', [{'S': 'State', 'SN': 'String'}])], + [('FetchStep', [('Any SN,S WHERE S name SN, S is State', + [{'S': 'State', 'SN': 'String'}])], [self.rql, self.system], None, {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'}, []), - ('FetchStep', [('Any X WHERE X is Note', [{'X': 'Note'}])], - [self.rql, self.system], None, {'X': 'table1.C0'}, - []), ('IntersectStep', None, None, [('OneFetchStep', + [('Any SN WHERE NOT X in_state S, S name SN, S is State, X is Note', + [{'S': 'State', 'SN': 'String', 'X': 'Note'}])], + None, None, [self.rql, self.system], {}, + []), + ('OneFetchStep', [('Any SN WHERE NOT X in_state S, S name SN, S is State, X is IN(Affaire, EUser)', [{'S': 'State', 'SN': 'String', 'X': 'Affaire'}, {'S': 'State', 'SN': 'String', 'X': 'EUser'}])], None, None, [self.system], {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0'}, - []), - ('OneFetchStep', - [('Any SN WHERE NOT X in_state S, S name SN, S is State, X is Note', - [{'S': 'State', 'SN': 'String', 'X': 'Note'}])], - None, None, [self.system], {'S': 'table0.C1', 'S.name': 'table0.C0', 'SN': 'table0.C0', - 'X': 'table1.C0'}, - [])] + []),] )]) def test_external_attributes_and_relation(self): @@ -1372,7 +1442,6 @@ def test_crossed_relation_eid_1_invariant(self): repo._type_source_cache[999999] = ('Note', 'system', 999999) - ueid = self.session.user.eid self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', [('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y', [{u'Y': 'Note'}])], None, None, [self.system], {}, []) @@ -1381,7 +1450,6 @@ def test_crossed_relation_eid_1_needattr(self): repo._type_source_cache[999999] = ('Note', 'system', 999999) - ueid = self.session.user.eid self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', [('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], [self.rql, self.system], None, @@ -1395,7 +1463,6 @@ def test_crossed_relation_eid_2_invariant(self): repo._type_source_cache[999999] = ('Note', 'cards', 999999) - ueid = self.session.user.eid self._test('Any Y WHERE X eid %(x)s, X multisource_crossed_rel Y', [('OneFetchStep', [('Any Y WHERE 999999 multisource_crossed_rel Y, Y is Note', [{'Y': 'Note'}])], None, None, [self.rql, self.system], {}, []) @@ -1404,22 +1471,25 @@ def test_crossed_relation_eid_2_needattr(self): repo._type_source_cache[999999] = ('Note', 'cards', 999999) - ueid = self.session.user.eid - self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', + self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', [('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], [self.rql, self.system], None, {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, []), - ('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', - [{'T': 'String', 'Y': 'Note'}])], - None, None, [self.rql, self.system], - {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, - []) - ], + ('UnionStep', None, None, + [('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', + [{'T': 'String', 'Y': 'Note'}])], + None, None, [self.rql], None, + []), + ('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', + [{'T': 'String', 'Y': 'Note'}])], + None, None, [self.system], + {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, + [])] + )], {'x': 999999,}) def test_crossed_relation_eid_not_1(self): repo._type_source_cache[999999] = ('Note', 'system', 999999) - ueid = self.session.user.eid self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y', [('FetchStep', [('Any Y WHERE Y is Note', [{'Y': 'Note'}])], [self.rql, self.system], None, {'Y': 'table0.C0'}, []), @@ -1431,14 +1501,12 @@ # def test_crossed_relation_eid_not_2(self): # repo._type_source_cache[999999] = ('Note', 'cards', 999999) -# ueid = self.session.user.eid # self._test('Any Y WHERE X eid %(x)s, NOT X multisource_crossed_rel Y', # [], # {'x': 999999,}) - def test_crossed_relation_base(self): + def test_crossed_relation_base_XXXFIXME(self): repo._type_source_cache[999999] = ('Note', 'system', 999999) - ueid = self.session.user.eid self._test('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T', [('FetchStep', [('Any X,T WHERE X type T, X is Note', [{'T': 'String', 'X': 'Note'}])], [self.rql, self.system], None, @@ -1446,12 +1514,18 @@ ('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], [self.rql, self.system], None, {'T': 'table1.C1', 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, []), - ('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, X is Note, Y is Note', - [{'T': 'String', 'X': 'Note', 'Y': 'Note'}])], - None, None, [self.rql, self.system], - {'T': 'table1.C1', 'X': 'table0.C0', 'X.type': 'table0.C1', - 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, - [])], + ('UnionStep', None, None, + [('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, X is Note, Y is Note', + [{'T': 'String', 'X': 'Note', 'Y': 'Note'}])], + None, None, [self.rql], None, + []), + ('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, X is Note, Y is Note', + [{'T': 'String', 'X': 'Note', 'Y': 'Note'}])], + None, None, [self.system], + {'T': 'table1.C1', 'X': 'table0.C0', 'X.type': 'table0.C1', + 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, + [])] + )], {'x': 999999,}) # edition queries tests ################################################### @@ -1768,8 +1842,8 @@ [('FetchStep', [('Any X,AA,AB WHERE X login AA, X modification_date AB, X is EUser', [{'AA': 'String', 'AB': 'Datetime', 'X': 'EUser'}])], - [self.ldap], None, {'AA': 'table0.C1', 'AB': 'table0.C2', - 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'}, + [self.ldap, self.system], None, {'AA': 'table0.C1', 'AB': 'table0.C2', + 'X': 'table0.C0', 'X.login': 'table0.C1', 'X.modification_date': 'table0.C2'}, []), ('OneFetchStep', [('Any X,AA,AB ORDERBY AA WHERE 999999 owned_by X, X login AA, X modification_date AB, X is EUser', @@ -1799,7 +1873,7 @@ self._test('Any X ORDERBY Z DESC WHERE X modification_date Z, E eid %(x)s, E see_also X', [('FetchStep', [('Any X,Z WHERE X modification_date Z, X is Note', [{'X': 'Note', 'Z': 'Datetime'}])], - [self.rql], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'Z': 'table0.C1'}, + [self.rql, self.system], None, {'X': 'table0.C0', 'X.modification_date': 'table0.C1', 'Z': 'table0.C1'}, []), ('AggrStep', 'Any X ORDERBY Z DESC', None, None, 'table1', None, @@ -1853,7 +1927,7 @@ # IMO this is normal, unless we introduce a special case for the # identity relation. BUT I think it's better to leave it as is and to # explain constraint propagation rules, and so why this should be - # wrapped in exists() is used in multi-source + # wrapped in exists() if used in multi-source self.skip('take a look at me if you wish') self._test('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (U identity ME ' @@ -1881,12 +1955,11 @@ class MSPlannerTwoSameExternalSourcesTC(BasePlannerTC): """test planner related feature on a 3-sources repository: - * 2 rql source supporting Card + * 2 rql sources supporting Card """ repo = repo def setUp(self): - #_QuerierTC.setUp(self) self.o = repo.querier self.session = repo._sessions.values()[0] self.pool = self.session.set_pool() @@ -1903,7 +1976,11 @@ self.rql2 = self.sources[-1] do_monkey_patch() self.planner = MSPlanner(self.o.schema, self.o._rqlhelper) - + assert repo.sources_by_uri['cards2'].support_relation('multisource_crossed_rel') + assert 'multisource_crossed_rel' in repo.sources_by_uri['cards2'].cross_relations + assert repo.sources_by_uri['cards'].support_relation('multisource_crossed_rel') + assert 'multisource_crossed_rel' in repo.sources_by_uri['cards'].cross_relations + clear_ms_caches(repo) _test = test_plan def tearDown(self): @@ -1928,7 +2005,92 @@ {'X': 'table0.C0', 'X.title': 'table0.C1', 'XT': 'table0.C1'}, [])], {'t': 999999}) - + + def test_version_depends_on(self): + self.repo._type_source_cache[999999] = ('Note', 'cards', 999999) + self._test('Any X,AD,AE WHERE E eid %(x)s, E migrated_from X, X in_state AD, AD name AE', + [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + []), + ('OneFetchStep', [('Any X,AD,AE WHERE 999999 migrated_from X, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + None, None, [self.system], + {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'}, + [])], + {'x': 999999}) + + def test_version_crossed_depends_on_1(self): + self.repo._type_source_cache[999999] = ('Note', 'cards', 999999) + self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', + [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + []), + ('UnionStep', None, None, + [('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + None, None, [self.rql], None, + []), + ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + None, None, [self.system], + {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + [])] + )], + {'x': 999999}) + + def test_version_crossed_depends_on_2(self): + self.repo._type_source_cache[999999] = ('Note', 'system', 999999) + self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', + [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + []), + ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + None, None, [self.system], + {'AD': 'table0.C1', 'AD.name': 'table0.C2', 'AE': 'table0.C2', 'X': 'table0.C0'}, + [])], + {'x': 999999}) + + def test_version_crossed_depends_on_3(self): + self._test('Any X,AD,AE WHERE E multisource_crossed_rel X, X in_state AD, AD name AE, E is Note', + [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + []), + ('FetchStep', [('Any E WHERE E is Note', [{'E': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'E': 'table1.C0'}, + []), + ('UnionStep', None, None, + [('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note', + [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])], + None, None, [self.rql, self.rql2], None, + []), + ('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note', + [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])], + None, None, [self.system], + {'AD': 'table0.C1', + 'AD.name': 'table0.C2', + 'AE': 'table0.C2', + 'E': 'table1.C0', + 'X': 'table0.C0'}, + [])] + )] + ) + + if __name__ == '__main__': from logilab.common.testlib import unittest_main unittest_main()