# HG changeset patch # User sylvain.thenault@logilab.fr # Date 1238849044 -7200 # Node ID 1457a545af0358d8a3250e5b43eba883e17c021f # Parent 232e16835fff2539b7fc4ab9fd4719862f01aa16 more fix of cross relation handling diff -r 232e16835fff -r 1457a545af03 server/msplanner.py --- a/server/msplanner.py Sat Apr 04 13:21:56 2009 +0200 +++ b/server/msplanner.py Sat Apr 04 14:44:04 2009 +0200 @@ -223,11 +223,13 @@ self.needsplit = False self.temptable = None self.finaltable = None + # shortcuts self._schema = plan.schema self._session = plan.session self._repo = self._session.repo self._solutions = rqlst.solutions self._solindices = range(len(self._solutions)) + self.system_source = self._repo.system_source # source : {term: [solution index, ]} self.sourcesterms = self._sourcesterms = {} # source : {relation: set(child variable and constant)} @@ -270,12 +272,12 @@ def part_sources(self): if self._sourcesterms: return tuple(sorted(self._sourcesterms)) - return (self._repo.system_source,) + return (self.system_source,) @property @cached def _sys_source_set(self): - return frozenset((self._repo.system_source, solindex) + return frozenset((self.system_source, solindex) for solindex in self._solindices) @cached @@ -309,7 +311,7 @@ source = self._session.source_from_eid(eid) if vrels and not any(source.support_relation(r.r_type) for r in vrels): - self._set_source_for_term(repo.system_source, varobj) + self._set_source_for_term(self.system_source, varobj) else: self._set_source_for_term(source, varobj) continue @@ -317,7 +319,7 @@ if not rels and not varobj.stinfo['typerels']: # (rare) case where the variable has no type specified nor # relation accessed ex. "Any MAX(X)" - self._set_source_for_term(repo.system_source, varobj) + self._set_source_for_term(self.system_source, varobj) continue for i, sol in enumerate(self._solutions): vartype = sol[varname] @@ -344,7 +346,7 @@ for vconsts in self.rqlst.stinfo['rewritten'].itervalues(): const = vconsts[0] source = self._session.source_from_eid(const.eval(self.plan.args)) - if source is repo.system_source: + if source is self.system_source: for const in vconsts: self._set_source_for_term(source, const) elif source in self._sourcesterms: @@ -357,8 +359,8 @@ # doesn't actually comes from it so we get a changes # that allequals will return True as expected when # computing needsplit - if repo.system_source in sourcesterms: - self._set_source_for_term(repo.system_source, const) + if self.system_source in sourcesterms: + self._set_source_for_term(self.system_source, const) # add source for relations rschema = self._schema.rschema termssources = {} @@ -400,7 +402,7 @@ def _handle_cross_relation(self, rel, relsources, termssources): for source in relsources: if rel.r_type in source.cross_relations: - ssource = self._repo.system_source + ssource = self.system_source crossvars = set(x.variable for x in rel.get_nodes(VariableRef)) for const in rel.get_nodes(Constant): if source.uri != 'system' and not const in self._sourcesterms.get(source, ()): @@ -520,9 +522,9 @@ # if the term is a not invariant variable, we should filter out # source where the relation is a cross relation from invalid # sources - invalid_sources = frozenset([(s, solidx) for s, solidx in invalid_sources - if not (s in self._crossrelations and - rel in self._crossrelations[s])]) + invalid_sources = frozenset((s, solidx) for s, solidx in invalid_sources + if not (s in self._crossrelations and + rel in self._crossrelations[s])) if invalid_sources: self._remove_sources(term, invalid_sources) termsources -= invalid_sources @@ -620,7 +622,10 @@ select = self.rqlst rschema = self._schema.rschema for source in self.part_sources: - sourceterms = self._sourcesterms[source] + try: + sourceterms = self._sourcesterms[source] + except KeyError: + continue # already proceed while sourceterms: # take a term randomly, and all terms supporting the # same solutions @@ -646,25 +651,66 @@ # go to the next iteration directly! continue if not sourceterms: - del self._sourcesterms[source] - # suppose this is a final step until the contrary is proven - final = scope is select + try: + del self._sourcesterms[source] + except KeyError: + # XXX already cleaned + pass # set of terms which should be additionaly selected when # possible needsel = set() - # add attribute variables and mark variables which should be - # additionaly selected when possible - for var in select.defined_vars.itervalues(): - if not var in terms: - stinfo = var.stinfo - for ovar, rtype in stinfo['attrvars']: - if ovar in terms: + if not self._sourcesterms: + terms += scope.defined_vars.values() + scope.aliases.values() + final = True + else: + # suppose this is a final step until the contrary is proven + final = scope is select + # add attribute variables and mark variables which should be + # additionaly selected when possible + for var in select.defined_vars.itervalues(): + if not var in terms: + stinfo = var.stinfo + for ovar, rtype in stinfo['attrvars']: + if ovar in terms: + needsel.add(var.name) + terms.append(var) + break + else: needsel.add(var.name) - terms.append(var) + final = False + # check where all relations are supported by the sources + for rel in scope.iget_nodes(Relation): + if rel.is_types_restriction(): + continue + # take care not overwriting the existing "source" identifier + for _source in sources: + if not _source.support_relation(rel.r_type) or ( + self.crossed_relation(_source, rel) and not rel in terms): + for vref in rel.iget_nodes(VariableRef): + needsel.add(vref.name) + final = False break else: - needsel.add(var.name) - final = False + if not scope is select: + self._exists_relation(rel, terms, needsel) + # if relation is supported by all sources and some of + # its lhs/rhs variable isn't in "terms", and the + # other end *is* in "terms", mark it have to be + # selected + if source.uri != 'system' and not rschema(rel.r_type).is_final(): + lhs, rhs = rel.get_variable_parts() + try: + lhsvar = lhs.variable + except AttributeError: + lhsvar = lhs + try: + rhsvar = rhs.variable + except AttributeError: + rhsvar = rhs + if lhsvar in terms and not rhsvar in terms: + needsel.add(lhsvar.name) + elif rhsvar in terms and not lhsvar in terms: + needsel.add(rhsvar.name) if final and source.uri != 'system': # check rewritten constants for vconsts in select.stinfo['rewritten'].itervalues(): @@ -683,41 +729,6 @@ final = False break break - # check where all relations are supported by the sources - for rel in scope.iget_nodes(Relation): - if rel.is_types_restriction(): - continue - # take care not overwriting the existing "source" identifier - for _source in sources: - if not _source.support_relation(rel.r_type): - for vref in rel.iget_nodes(VariableRef): - needsel.add(vref.name) - final = False - break - elif self.crossed_relation(_source, rel) and not rel in terms: - final = False - break - else: - if not scope is select: - self._exists_relation(rel, terms, needsel) - # if relation is supported by all sources and some of - # its lhs/rhs variable isn't in "terms", and the - # other end *is* in "terms", mark it have to be - # selected - if source.uri != 'system' and not rschema(rel.r_type).is_final(): - lhs, rhs = rel.get_variable_parts() - try: - lhsvar = lhs.variable - except AttributeError: - lhsvar = lhs - try: - rhsvar = rhs.variable - except AttributeError: - rhsvar = rhs - if lhsvar in terms and not rhsvar in terms: - needsel.add(lhsvar.name) - elif rhsvar in terms and not lhsvar in terms: - needsel.add(rhsvar.name) if final: self._cleanup_sourcesterms(sources, solindices) steps.append((sources, terms, solindices, scope, needsel, final) @@ -759,75 +770,93 @@ secondchoice = None if len(self._sourcesterms) > 1: # priority to variable from subscopes - for var in sourceterms: - if not var.scope is self.rqlst: - if isinstance(var, Variable): - return var, sourceterms.pop(var) - secondchoice = var + for term in sourceterms: + if not term.scope is self.rqlst: + if isinstance(term, Variable): + return term, sourceterms.pop(term) + secondchoice = term else: - # priority to variable outer scope - for var in sourceterms: - if var.scope is self.rqlst: - if isinstance(var, Variable): - return var, sourceterms.pop(var) - secondchoice = var + # priority to variable from outer scope + for term in sourceterms: + if term.scope is self.rqlst: + if isinstance(term, Variable): + return term, sourceterms.pop(term) + secondchoice = term if secondchoice is not None: return secondchoice, sourceterms.pop(secondchoice) - # priority to variable - for var in sourceterms: - if isinstance(var, Variable): - return var, sourceterms.pop(var) - # whatever - var = iter(sourceterms).next() - return var, sourceterms.pop(var) + # priority to variable with the less solutions supported and with the + # most valuable refs + variables = sorted([(var, sols) for (var, sols) in sourceterms.items() + if isinstance(var, Variable)], + key=lambda (v, s): (len(s), -v.valuable_references())) + if variables: + var = variables[0][0] + return var, sourceterms.pop(var) + # priority to constant + for term in sourceterms: + if isinstance(term, Constant): + return term, sourceterms.pop(term) + # whatever (relation) + term = iter(sourceterms).next() + return term, sourceterms.pop(term) def _expand_sources(self, selected_source, term, solindices): """return all sources supporting given term / solindices""" sources = [selected_source] sourcesterms = self._sourcesterms - for source in sourcesterms: + for source in sourcesterms.keys(): if source is selected_source: continue if not (term in sourcesterms[source] and solindices.issubset(sourcesterms[source][term])): continue sources.append(source) - if source.uri != 'system': + if source.uri != 'system' or not (isinstance(term, Variable) and not term in self._linkedterms): termsolindices = sourcesterms[source][term] termsolindices -= solindices if not termsolindices: - del sourcesterms[source][term] + del sourcesterms[source][term] + if not sourcesterms[source]: + del sourcesterms[source] return sources def _expand_terms(self, term, sources, sourceterms, scope, solindices): terms = [term] sources = sorted(sources) + sourcesterms = self._sourcesterms nbunlinked = 1 linkedterms = self._linkedterms # term has to belong to the same scope if there is more # than the system source remaining - if len(self._sourcesterms) > 1 and not scope is self.rqlst: + if len(sourcesterms) > 1 and not scope is self.rqlst: candidates = (t for t in sourceterms.keys() if scope is t.scope) else: candidates = sourceterms #.iterkeys() # we only want one unlinked term in each generated query candidates = [t for t in candidates - if isinstance(t, Constant) or + if isinstance(t, (Constant, Relation)) or (solindices.issubset(sourceterms[t]) and t in linkedterms)] - accept_term = lambda x: (not any(s for s in sources if not x in self._sourcesterms[s]) - and any(t for t in terms if t in linkedterms.get(x, ()))) - source_cross_rels = {} + cross_rels = {} for source in sources: - source_cross_rels.update(self._crossrelations.get(source, {})) - if isinstance(term, Relation) and term in source_cross_rels: - cross_terms = source_cross_rels.pop(term) + cross_rels.update(self._crossrelations.get(source, {})) + exclude = {} + for rel, crossvars in cross_rels.iteritems(): + vars = [t for t in crossvars if isinstance(t, Variable)] + try: + exclude[vars[0]] = vars[1] + exclude[vars[1]] = vars[0] + except IndexError: + pass + accept_term = lambda x: (not any(s for s in sources if not x in sourcesterms.get(s, ())) + and any(t for t in terms if t in linkedterms.get(x, ())) + and not exclude.get(x) in terms) + if isinstance(term, Relation) and term in cross_rels: + cross_terms = cross_rels.pop(term) base_accept_term = accept_term accept_term = lambda x: (base_accept_term(x) or x in cross_terms) for refed in cross_terms: if not refed in candidates: - candidates.append(refed) - else: - cross_terms = () + terms.append(refed) # repeat until no term can't be added, since addition of a new # term may permit to another one to be added modified = True @@ -846,32 +875,34 @@ terms.append(term) candidates.remove(term) modified = True - for source in sources: - sourceterms = self._sourcesterms[source] - # terms should be deleted once all possible solutions - # indices have been consumed - try: - sourceterms[term] -= solindices - if not sourceterms[term]: - del sourceterms[term] - except KeyError: - assert term in cross_terms + self._cleanup_sourcesterms(sources, solindices, term) return terms - def _cleanup_sourcesterms(self, sources, solindices): - """on final parts, remove solutions so we know they are already processed""" + def _cleanup_sourcesterms(self, sources, solindices, term=None): + """remove solutions so we know they are already processed""" for source in sources: try: sourceterms = self._sourcesterms[source] except KeyError: continue - for term, termsolindices in sourceterms.items(): - if isinstance(term, Relation) and self.crossed_relation(source, term): - continue - termsolindices -= solindices - if not termsolindices: - del sourceterms[term] - + if term is None: + for term, termsolindices in sourceterms.items(): + if isinstance(term, Relation) and self.crossed_relation(source, term): + continue + termsolindices -= solindices + if not termsolindices: + del sourceterms[term] + else: + try: + sourceterms[term] -= solindices + if not sourceterms[term]: + del sourceterms[term] + except KeyError: + pass + #assert term in cross_terms + if not sourceterms: + del self._sourcesterms[source] + def merge_input_maps(self, allsolindices): """inputmaps is a dictionary with tuple of solution indices as key with an associated input map as value. This function compute for each @@ -1016,7 +1047,7 @@ byinputmap = {} for ppi in cppis: select = ppi.rqlst - if sources != (plan.session.repo.system_source,): + if sources != (ppi.system_source,): add_types_restriction(self.schema, select) # part plan info for subqueries inputmap = self._ppi_subqueries(ppi) @@ -1104,8 +1135,15 @@ inputmap = subinputmap else: inputmap.update(subinputmap) - steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, - sources, insertedvars)) + if inputmap and len(sources) > 1: + sources.remove(ppi.system_source) + steps.append(ppi.build_final_part(minrqlst, solindices, None, + sources, insertedvars)) + steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, + [ppi.system_source], insertedvars)) + else: + steps.append(ppi.build_final_part(minrqlst, solindices, inputmap, + sources, insertedvars)) else: table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)), ''.join(sorted(str(i) for i in solindices))) diff -r 232e16835fff -r 1457a545af03 server/test/unittest_msplanner.py --- a/server/test/unittest_msplanner.py Sat Apr 04 13:21:56 2009 +0200 +++ b/server/test/unittest_msplanner.py Sat Apr 04 14:44:04 2009 +0200 @@ -351,7 +351,6 @@ True) def test_simplified_var_3(self): - self.set_debug(True) repo._type_source_cache[999999] = ('Note', 'cards', 999999) repo._type_source_cache[999998] = ('State', 'cards', 999998) self._test('Any S,T WHERE S eid %(s)s, N eid %(n)s, N type T, N is Note, S is State', @@ -1240,7 +1239,6 @@ {'x': 999999}) def test_not_relation_need_split(self): - ueid = self.session.user.eid self._test('Any SN WHERE NOT X in_state S, S name SN', [('FetchStep', [('Any SN,S WHERE S name SN, S is State', [{'S': 'State', 'SN': 'String'}])], @@ -1471,18 +1469,23 @@ ], {'x': 999999,}) - def test_crossed_relation_eid_2_needattr_XXXFIXME(self): + def test_crossed_relation_eid_2_needattr(self): repo._type_source_cache[999999] = ('Note', 'cards', 999999) - self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', + self._test('Any Y,T WHERE X eid %(x)s, X multisource_crossed_rel Y, Y type T', [('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], [self.rql, self.system], None, {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, []), - ('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', - [{'T': 'String', 'Y': 'Note'}])], - None, None, [self.rql, self.system], - {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, - []) - ], + ('UnionStep', None, None, + [('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', + [{'T': 'String', 'Y': 'Note'}])], + None, None, [self.rql], None, + []), + ('OneFetchStep', [('Any Y,T WHERE 999999 multisource_crossed_rel Y, Y type T, Y is Note', + [{'T': 'String', 'Y': 'Note'}])], + None, None, [self.system], + {'T': 'table0.C1', 'Y': 'table0.C0', 'Y.type': 'table0.C1'}, + [])] + )], {'x': 999999,}) def test_crossed_relation_eid_not_1(self): @@ -1511,12 +1514,18 @@ ('FetchStep', [('Any Y,T WHERE Y type T, Y is Note', [{'T': 'String', 'Y': 'Note'}])], [self.rql, self.system], None, {'T': 'table1.C1', 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, []), - ('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, X is Note, Y is Note', - [{'T': 'String', 'X': 'Note', 'Y': 'Note'}])], - None, None, [self.rql, self.system], - {'T': 'table1.C1', 'X': 'table0.C0', 'X.type': 'table0.C1', - 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, - [])], + ('UnionStep', None, None, + [('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, X is Note, Y is Note', + [{'T': 'String', 'X': 'Note', 'Y': 'Note'}])], + None, None, [self.rql], None, + []), + ('OneFetchStep', [('Any X,Y,T WHERE X multisource_crossed_rel Y, Y type T, X type T, X is Note, Y is Note', + [{'T': 'String', 'X': 'Note', 'Y': 'Note'}])], + None, None, [self.system], + {'T': 'table1.C1', 'X': 'table0.C0', 'X.type': 'table0.C1', + 'Y': 'table1.C0', 'Y.type': 'table1.C1'}, + [])] + )], {'x': 999999,}) # edition queries tests ################################################### @@ -1918,7 +1927,7 @@ # IMO this is normal, unless we introduce a special case for the # identity relation. BUT I think it's better to leave it as is and to # explain constraint propagation rules, and so why this should be - # wrapped in exists() is used in multi-source + # wrapped in exists() if used in multi-source self.skip('take a look at me if you wish') self._test('Any B,U,UL GROUPBY B,U,UL WHERE B created_by U?, B is File ' 'WITH U,UL BEING (Any U,UL WHERE ME eid %(x)s, (U identity ME ' @@ -2016,26 +2025,27 @@ def test_version_crossed_depends_on_1(self): self.repo._type_source_cache[999999] = ('Note', 'cards', 999999) self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', - [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', - [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - [self.rql, self.rql2, self.system], - None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table0.C0'}, - []), - ('FetchStep', [('Any X WHERE 999999 multisource_crossed_rel X, X is Note', - [{'X': 'Note'}])], - [self.rql, self.system], None, {'X': 'table1.C0'}, - []), - ('OneFetchStep', [('Any X,AD,AE WHERE AD name AE, AD is State, X is Note, X identity A', - [{'A': 'Note', 'AD': 'State', 'AE': 'String', 'X': 'Note'}])], - None, None, [self.system], - {'A': 'table0.C0', 'AD': 'table0.C1', 'AD.name': 'table0.C2', - 'AE': 'table0.C2', 'X': 'table1.C0'}, - [])], - {'x': 999999}) + [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + []), + ('UnionStep', None, None, + [('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + None, None, [self.rql], None, + []), + ('OneFetchStep', [('Any X,AD,AE WHERE 999999 multisource_crossed_rel X, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + None, None, [self.system], + {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + [])] + )], + {'x': 999999}) def test_version_crossed_depends_on_2(self): - self.set_debug(True) self.repo._type_source_cache[999999] = ('Note', 'system', 999999) self._test('Any X,AD,AE WHERE E eid %(x)s, E multisource_crossed_rel X, X in_state AD, AD name AE', [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', @@ -2051,9 +2061,34 @@ [])], {'x': 999999}) - def test_version_crossed_depends_on_3_XXXFIXME(self): + def test_version_crossed_depends_on_3(self): self._test('Any X,AD,AE WHERE E multisource_crossed_rel X, X in_state AD, AD name AE, E is Note', - []) + [('FetchStep', [('Any X,AD,AE WHERE X in_state AD, AD name AE, AD is State, X is Note', + [{'AD': 'State', 'AE': 'String', 'X': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'AD': 'table0.C1', 'AD.name': 'table0.C2', + 'AE': 'table0.C2', 'X': 'table0.C0'}, + []), + ('FetchStep', [('Any E WHERE E is Note', [{'E': 'Note'}])], + [self.rql, self.rql2, self.system], + None, {'E': 'table1.C0'}, + []), + ('UnionStep', None, None, + [('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note', + [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])], + None, None, [self.rql, self.rql2], None, + []), + ('OneFetchStep', [('Any X,AD,AE WHERE E multisource_crossed_rel X, AD name AE, AD is State, E is Note, X is Note', + [{'AD': 'State', 'AE': 'String', 'E': 'Note', 'X': 'Note'}])], + None, None, [self.system], + {'AD': 'table0.C1', + 'AD.name': 'table0.C2', + 'AE': 'table0.C2', + 'E': 'table1.C0', + 'X': 'table0.C0'}, + [])] + )] + ) if __name__ == '__main__':