server/msplanner.py
branchstable
changeset 6794 140d42b41b31
parent 6759 5d016d5bacca
child 6943 406a41c25e13
equal deleted inserted replaced
6793:308cf1eaf576 6794:140d42b41b31
   822             except KeyError:
   822             except KeyError:
   823                 continue # already proceed
   823                 continue # already proceed
   824             while sourceterms:
   824             while sourceterms:
   825                 # take a term randomly, and all terms supporting the
   825                 # take a term randomly, and all terms supporting the
   826                 # same solutions
   826                 # same solutions
   827                 term, solindices = self._choose_term(sourceterms)
   827                 term, solindices = self._choose_term(source, sourceterms)
   828                 if source.uri == 'system':
   828                 if source.uri == 'system':
   829                     # ensure all variables are available for the latest step
   829                     # ensure all variables are available for the latest step
   830                     # (missing one will be available from temporary tables
   830                     # (missing one will be available from temporary tables
   831                     # of previous steps)
   831                     # of previous steps)
   832                     scope = select
   832                     scope = select
   852                             # XXX already cleaned
   852                             # XXX already cleaned
   853                             pass
   853                             pass
   854                 # set of terms which should be additionaly selected when
   854                 # set of terms which should be additionaly selected when
   855                 # possible
   855                 # possible
   856                 needsel = set()
   856                 needsel = set()
   857                 if not self._sourcesterms:
   857                 if not self._sourcesterms and scope is select:
   858                     terms += scope.defined_vars.values() + scope.aliases.values()
   858                     terms += scope.defined_vars.values() + scope.aliases.values()
   859                     if isinstance(term, Relation) and len(sources) > 1:
   859                     if isinstance(term, Relation) and len(sources) > 1:
   860                         variants = set()
   860                         variants = set()
   861                         partterms = [term]
   861                         partterms = [term]
   862                         for vref in term.get_nodes(VariableRef):
   862                         for vref in term.get_nodes(VariableRef):
   865                         if len(variants) == 2:
   865                         if len(variants) == 2:
   866                             # we need an extra-step to fetch relations from each source
   866                             # we need an extra-step to fetch relations from each source
   867                             # before a join with prefetched inputs
   867                             # before a join with prefetched inputs
   868                             # (see test_crossed_relation_noeid_needattr in
   868                             # (see test_crossed_relation_noeid_needattr in
   869                             #  unittest_msplanner / unittest_multisources)
   869                             #  unittest_msplanner / unittest_multisources)
   870                             needsel2 = needsel.copy()
       
   871                             needsel2.update(variants)
       
   872                             lhs, rhs = term.get_variable_parts()
   870                             lhs, rhs = term.get_variable_parts()
   873                             steps.append( (sources, [term, getattr(lhs, 'variable', lhs),
   871                             steps.append( (sources, [term, getattr(lhs, 'variable', lhs),
   874                                                      getattr(rhs, 'variable', rhs)],
   872                                                      getattr(rhs, 'variable', rhs)],
   875                                            solindices, scope,
   873                                            solindices, scope, variants, False) )
   876                                            needsel2, False) )
       
   877                             sources = [self.system_source]
   874                             sources = [self.system_source]
   878                     final = True
   875                     final = True
   879                 else:
   876                 else:
   880                     # suppose this is a final step until the contrary is proven
   877                     # suppose this is a final step until the contrary is proven
   881                     final = scope is select
   878                     final = scope is select
   904                                     needsel.add(vref.name)
   901                                     needsel.add(vref.name)
   905                                 final = False
   902                                 final = False
   906                                 break
   903                                 break
   907                         else:
   904                         else:
   908                             if not scope is select:
   905                             if not scope is select:
   909                                 self._exists_relation(rel, terms, needsel)
   906                                 self._exists_relation(rel, terms, needsel, source)
   910                             # if relation is supported by all sources and some of
   907                             # if relation is supported by all sources and some of
   911                             # its lhs/rhs variable isn't in "terms", and the
   908                             # its lhs/rhs variable isn't in "terms", and the
   912                             # other end *is* in "terms", mark it have to be
   909                             # other end *is* in "terms", mark it have to be
   913                             # selected
   910                             # selected
   914                             if source.uri != 'system' and not rschema(rel.r_type).final:
   911                             if source.uri != 'system' and not rschema(rel.r_type).final:
   948                             break
   945                             break
   949                 if final:
   946                 if final:
   950                     self._cleanup_sourcesterms(sources, solindices)
   947                     self._cleanup_sourcesterms(sources, solindices)
   951                 steps.append((sources, terms, solindices, scope, needsel, final)
   948                 steps.append((sources, terms, solindices, scope, needsel, final)
   952                              )
   949                              )
       
   950         if not steps[-1][-1]:
       
   951             # add a final step
       
   952             terms = select.defined_vars.values() + select.aliases.values()
       
   953             steps.append( ([self.system_source], terms, set(self._solindices),
       
   954                            select, set(), True) )
   953         return steps
   955         return steps
   954 
   956 
   955     def _exists_relation(self, rel, terms, needsel):
   957     def _exists_relation(self, rel, terms, needsel, source):
   956         rschema = self._schema.rschema(rel.r_type)
   958         rschema = self._schema.rschema(rel.r_type)
   957         lhs, rhs = rel.get_variable_parts()
   959         lhs, rhs = rel.get_variable_parts()
   958         try:
   960         try:
   959             lhsvar, rhsvar = lhs.variable, rhs.variable
   961             lhsvar, rhsvar = lhs.variable, rhs.variable
   960         except AttributeError:
   962         except AttributeError:
   963             # supported relation with at least one end supported, check the
   965             # supported relation with at least one end supported, check the
   964             # other end is in as well. If not this usually means the
   966             # other end is in as well. If not this usually means the
   965             # variable is refed by an outer scope and should be substituted
   967             # variable is refed by an outer scope and should be substituted
   966             # using an 'identity' relation (else we'll get a conflict of
   968             # using an 'identity' relation (else we'll get a conflict of
   967             # temporary tables)
   969             # temporary tables)
   968             if rhsvar in terms and not lhsvar in terms and ms_scope(lhsvar) is lhsvar.stmt:
   970             relscope = ms_scope(rel)
   969                 self._identity_substitute(rel, lhsvar, terms, needsel)
   971             lhsscope = ms_scope(lhsvar)
   970             elif lhsvar in terms and not rhsvar in terms and ms_scope(rhsvar) is rhsvar.stmt:
   972             rhsscope = ms_scope(rhsvar)
   971                 self._identity_substitute(rel, rhsvar, terms, needsel)
   973             if rhsvar in terms and not lhsvar in terms and lhsscope is lhsvar.stmt:
   972 
   974                 self._identity_substitute(rel, lhsvar, terms, needsel, relscope)
   973     def _identity_substitute(self, relation, var, terms, needsel):
   975             elif lhsvar in terms and not rhsvar in terms and rhsscope is rhsvar.stmt:
   974         newvar = self._insert_identity_variable(ms_scope(relation), var)
   976                 self._identity_substitute(rel, rhsvar, terms, needsel, relscope)
       
   977             elif self.crossed_relation(source, rel):
       
   978                 if lhsscope is not relscope:
       
   979                     self._identity_substitute(rel, lhsvar, terms, needsel,
       
   980                                               relscope, lhsscope)
       
   981                 if rhsscope is not relscope:
       
   982                     self._identity_substitute(rel, rhsvar, terms, needsel,
       
   983                                               relscope, rhsscope)
       
   984 
       
   985     def _identity_substitute(self, relation, var, terms, needsel, exist,
       
   986                              idrelscope=None):
       
   987         newvar = self._insert_identity_variable(exist, var, idrelscope)
   975         # ensure relation is using '=' operator, else we rely on a
   988         # ensure relation is using '=' operator, else we rely on a
   976         # sqlgenerator side effect (it won't insert an inequality operator
   989         # sqlgenerator side effect (it won't insert an inequality operator
   977         # in this case)
   990         # in this case)
   978         relation.children[1].operator = '='
   991         relation.children[1].operator = '='
   979         terms.append(newvar)
   992         terms.append(newvar)
   980         needsel.add(newvar.name)
   993         needsel.add(newvar.name)
   981 
   994 
   982     def _choose_term(self, sourceterms):
   995     def _choose_term(self, source, sourceterms):
   983         """pick one term among terms supported by a source, which will be used
   996         """pick one term among terms supported by a source, which will be used
   984         as a base to generate an execution step
   997         as a base to generate an execution step
   985         """
   998         """
   986         secondchoice = None
   999         secondchoice = None
   987         if len(self._sourcesterms) > 1:
  1000         if len(self._sourcesterms) > 1:
       
  1001             # first, return non invariant variable of crossed relation, then the
       
  1002             # crossed relation itself
       
  1003             for term in sourceterms:
       
  1004                 if (isinstance(term, Relation)
       
  1005                     and self.crossed_relation(source, term)
       
  1006                     and not ms_scope(term) is self.rqlst):
       
  1007                     for vref in term.get_variable_parts():
       
  1008                         try:
       
  1009                             var = vref.variable
       
  1010                         except AttributeError:
       
  1011                             # Constant
       
  1012                             continue
       
  1013                         if ((len(var.stinfo['relations']) > 1 or var.stinfo['selected'])
       
  1014                             and var in sourceterms):
       
  1015                             return var, sourceterms.pop(var)
       
  1016                     return term, sourceterms.pop(term)
   988             # priority to variable from subscopes
  1017             # priority to variable from subscopes
   989             for term in sourceterms:
  1018             for term in sourceterms:
   990                 if not ms_scope(term) is self.rqlst:
  1019                 if not ms_scope(term) is self.rqlst:
   991                     if isinstance(term, Variable):
  1020                     if isinstance(term, Variable):
   992                         return term, sourceterms.pop(term)
  1021                         return term, sourceterms.pop(term)