server/msplanner.py
changeset 1231 1457a545af03
parent 1230 232e16835fff
child 1237 c836bdb3b17b
equal deleted inserted replaced
1230:232e16835fff 1231:1457a545af03
   221         self.plan = plan
   221         self.plan = plan
   222         self.rqlst = rqlst
   222         self.rqlst = rqlst
   223         self.needsplit = False
   223         self.needsplit = False
   224         self.temptable = None
   224         self.temptable = None
   225         self.finaltable = None
   225         self.finaltable = None
       
   226         # shortcuts
   226         self._schema = plan.schema
   227         self._schema = plan.schema
   227         self._session = plan.session
   228         self._session = plan.session
   228         self._repo = self._session.repo
   229         self._repo = self._session.repo
   229         self._solutions = rqlst.solutions
   230         self._solutions = rqlst.solutions
   230         self._solindices = range(len(self._solutions))
   231         self._solindices = range(len(self._solutions))
       
   232         self.system_source = self._repo.system_source
   231         # source : {term: [solution index, ]}
   233         # source : {term: [solution index, ]}
   232         self.sourcesterms = self._sourcesterms = {}
   234         self.sourcesterms = self._sourcesterms = {}
   233         # source : {relation: set(child variable and constant)}
   235         # source : {relation: set(child variable and constant)}
   234         self._crossrelations = {}
   236         self._crossrelations = {}
   235         # dictionary of variables and constants which are linked to each other
   237         # dictionary of variables and constants which are linked to each other
   268     @property
   270     @property
   269     @cached
   271     @cached
   270     def part_sources(self):
   272     def part_sources(self):
   271         if self._sourcesterms:
   273         if self._sourcesterms:
   272             return tuple(sorted(self._sourcesterms))
   274             return tuple(sorted(self._sourcesterms))
   273         return (self._repo.system_source,)
   275         return (self.system_source,)
   274     
   276     
   275     @property
   277     @property
   276     @cached
   278     @cached
   277     def _sys_source_set(self):
   279     def _sys_source_set(self):
   278         return frozenset((self._repo.system_source, solindex)
   280         return frozenset((self.system_source, solindex)
   279                          for solindex in self._solindices)        
   281                          for solindex in self._solindices)        
   280        
   282        
   281     @cached
   283     @cached
   282     def _norel_support_set(self, relation):
   284     def _norel_support_set(self, relation):
   283         """return a set of (source, solindex) where source doesn't support the
   285         """return a set of (source, solindex) where source doesn't support the
   307                     for const in rel.children[1].get_nodes(Constant):
   309                     for const in rel.children[1].get_nodes(Constant):
   308                         eid = const.eval(self.plan.args)
   310                         eid = const.eval(self.plan.args)
   309                         source = self._session.source_from_eid(eid)
   311                         source = self._session.source_from_eid(eid)
   310                         if vrels and not any(source.support_relation(r.r_type)
   312                         if vrels and not any(source.support_relation(r.r_type)
   311                                              for r in vrels):
   313                                              for r in vrels):
   312                             self._set_source_for_term(repo.system_source, varobj)
   314                             self._set_source_for_term(self.system_source, varobj)
   313                         else:
   315                         else:
   314                             self._set_source_for_term(source, varobj)
   316                             self._set_source_for_term(source, varobj)
   315                 continue
   317                 continue
   316             rels = varobj.stinfo['relations']
   318             rels = varobj.stinfo['relations']
   317             if not rels and not varobj.stinfo['typerels']:
   319             if not rels and not varobj.stinfo['typerels']:
   318                 # (rare) case where the variable has no type specified nor
   320                 # (rare) case where the variable has no type specified nor
   319                 # relation accessed ex. "Any MAX(X)"
   321                 # relation accessed ex. "Any MAX(X)"
   320                 self._set_source_for_term(repo.system_source, varobj)
   322                 self._set_source_for_term(self.system_source, varobj)
   321                 continue
   323                 continue
   322             for i, sol in enumerate(self._solutions):
   324             for i, sol in enumerate(self._solutions):
   323                 vartype = sol[varname]
   325                 vartype = sol[varname]
   324                 # skip final variable
   326                 # skip final variable
   325                 if eschema(vartype).is_final():
   327                 if eschema(vartype).is_final():
   342                             self.needsplit = True               
   344                             self.needsplit = True               
   343         # add source for rewritten constants to sourcesterms
   345         # add source for rewritten constants to sourcesterms
   344         for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
   346         for vconsts in self.rqlst.stinfo['rewritten'].itervalues():
   345             const = vconsts[0]
   347             const = vconsts[0]
   346             source = self._session.source_from_eid(const.eval(self.plan.args))
   348             source = self._session.source_from_eid(const.eval(self.plan.args))
   347             if source is repo.system_source:
   349             if source is self.system_source:
   348                 for const in vconsts:
   350                 for const in vconsts:
   349                     self._set_source_for_term(source, const)
   351                     self._set_source_for_term(source, const)
   350             elif source in self._sourcesterms:
   352             elif source in self._sourcesterms:
   351                 source_scopes = frozenset(t.scope for t in self._sourcesterms[source])
   353                 source_scopes = frozenset(t.scope for t in self._sourcesterms[source])
   352                 for const in vconsts:
   354                 for const in vconsts:
   355                         # if system source is used, add every rewritten constant
   357                         # if system source is used, add every rewritten constant
   356                         # to its supported terms even when associated entity
   358                         # to its supported terms even when associated entity
   357                         # doesn't actually comes from it so we get a changes
   359                         # doesn't actually comes from it so we get a changes
   358                         # that allequals will return True as expected when
   360                         # that allequals will return True as expected when
   359                         # computing needsplit
   361                         # computing needsplit
   360                         if repo.system_source in sourcesterms:
   362                         if self.system_source in sourcesterms:
   361                             self._set_source_for_term(repo.system_source, const)
   363                             self._set_source_for_term(self.system_source, const)
   362         # add source for relations
   364         # add source for relations
   363         rschema = self._schema.rschema
   365         rschema = self._schema.rschema
   364         termssources = {}
   366         termssources = {}
   365         for rel in self.rqlst.iget_nodes(Relation):
   367         for rel in self.rqlst.iget_nodes(Relation):
   366             # process non final relations only
   368             # process non final relations only
   398         return termssources
   400         return termssources
   399             
   401             
   400     def _handle_cross_relation(self, rel, relsources, termssources):
   402     def _handle_cross_relation(self, rel, relsources, termssources):
   401         for source in relsources:
   403         for source in relsources:
   402             if rel.r_type in source.cross_relations:
   404             if rel.r_type in source.cross_relations:
   403                 ssource = self._repo.system_source
   405                 ssource = self.system_source
   404                 crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
   406                 crossvars = set(x.variable for x in rel.get_nodes(VariableRef))
   405                 for const in rel.get_nodes(Constant):
   407                 for const in rel.get_nodes(Constant):
   406                     if source.uri != 'system' and not const in self._sourcesterms.get(source, ()):
   408                     if source.uri != 'system' and not const in self._sourcesterms.get(source, ()):
   407                         continue
   409                         continue
   408                     crossvars.add(const)
   410                     crossvars.add(const)
   518             if invalid_sources and isinstance(term, Variable) \
   520             if invalid_sources and isinstance(term, Variable) \
   519                    and self._need_ext_source_access(term, rel):
   521                    and self._need_ext_source_access(term, rel):
   520                 # if the term is a not invariant variable, we should filter out
   522                 # if the term is a not invariant variable, we should filter out
   521                 # source where the relation is a cross relation from invalid
   523                 # source where the relation is a cross relation from invalid
   522                 # sources
   524                 # sources
   523                 invalid_sources = frozenset([(s, solidx) for s, solidx in invalid_sources
   525                 invalid_sources = frozenset((s, solidx) for s, solidx in invalid_sources
   524                                              if not (s in self._crossrelations and
   526                                             if not (s in self._crossrelations and
   525                                                      rel in self._crossrelations[s])])
   527                                                     rel in self._crossrelations[s]))
   526         if invalid_sources:
   528         if invalid_sources:
   527             self._remove_sources(term, invalid_sources)
   529             self._remove_sources(term, invalid_sources)
   528             termsources -= invalid_sources
   530             termsources -= invalid_sources
   529             self._remove_sources_until_stable(term, termssources)
   531             self._remove_sources_until_stable(term, termssources)
   530         
   532         
   618         """
   620         """
   619         steps = []
   621         steps = []
   620         select = self.rqlst
   622         select = self.rqlst
   621         rschema = self._schema.rschema
   623         rschema = self._schema.rschema
   622         for source in self.part_sources:
   624         for source in self.part_sources:
   623             sourceterms = self._sourcesterms[source]
   625             try:
       
   626                 sourceterms = self._sourcesterms[source]
       
   627             except KeyError:
       
   628                 continue # already proceed
   624             while sourceterms:
   629             while sourceterms:
   625                 # take a term randomly, and all terms supporting the
   630                 # take a term randomly, and all terms supporting the
   626                 # same solutions
   631                 # same solutions
   627                 term, solindices = self._choose_term(sourceterms)
   632                 term, solindices = self._choose_term(sourceterms)
   628                 if source.uri == 'system':
   633                 if source.uri == 'system':
   644                         # we can't generate anything interesting with a single
   649                         # we can't generate anything interesting with a single
   645                         # constant term (will generate an empty "Any" query),
   650                         # constant term (will generate an empty "Any" query),
   646                         # go to the next iteration directly!
   651                         # go to the next iteration directly!
   647                         continue
   652                         continue
   648                     if not sourceterms:
   653                     if not sourceterms:
   649                         del self._sourcesterms[source]
   654                          try:
   650                 # suppose this is a final step until the contrary is proven
   655                              del self._sourcesterms[source]
   651                 final = scope is select
   656                          except KeyError:
       
   657                              # XXX already cleaned
       
   658                              pass
   652                 # set of terms which should be additionaly selected when
   659                 # set of terms which should be additionaly selected when
   653                 # possible
   660                 # possible
   654                 needsel = set()
   661                 needsel = set()
   655                 # add attribute variables and mark variables which should be
   662                 if not self._sourcesterms:
   656                 # additionaly selected when possible
   663                     terms += scope.defined_vars.values() + scope.aliases.values()
   657                 for var in select.defined_vars.itervalues():
   664                     final = True
   658                     if not var in terms:
   665                 else:
   659                         stinfo = var.stinfo
   666                     # suppose this is a final step until the contrary is proven
   660                         for ovar, rtype in stinfo['attrvars']:
   667                     final = scope is select
   661                             if ovar in terms:
   668                     # add attribute variables and mark variables which should be
       
   669                     # additionaly selected when possible
       
   670                     for var in select.defined_vars.itervalues():
       
   671                         if not var in terms:
       
   672                             stinfo = var.stinfo
       
   673                             for ovar, rtype in stinfo['attrvars']:
       
   674                                 if ovar in terms:
       
   675                                     needsel.add(var.name)
       
   676                                     terms.append(var)
       
   677                                     break
       
   678                             else:
   662                                 needsel.add(var.name)
   679                                 needsel.add(var.name)
   663                                 terms.append(var)
   680                                 final = False
       
   681                     # check where all relations are supported by the sources
       
   682                     for rel in scope.iget_nodes(Relation):
       
   683                         if rel.is_types_restriction():
       
   684                             continue
       
   685                         # take care not overwriting the existing "source" identifier
       
   686                         for _source in sources:
       
   687                             if not _source.support_relation(rel.r_type) or (
       
   688                                 self.crossed_relation(_source, rel) and not rel in terms):
       
   689                                 for vref in rel.iget_nodes(VariableRef):
       
   690                                     needsel.add(vref.name)
       
   691                                 final = False
   664                                 break
   692                                 break
   665                         else:
   693                         else:
   666                             needsel.add(var.name)
   694                             if not scope is select:
   667                             final = False
   695                                 self._exists_relation(rel, terms, needsel)
       
   696                             # if relation is supported by all sources and some of
       
   697                             # its lhs/rhs variable isn't in "terms", and the
       
   698                             # other end *is* in "terms", mark it have to be
       
   699                             # selected
       
   700                             if source.uri != 'system' and not rschema(rel.r_type).is_final():
       
   701                                 lhs, rhs = rel.get_variable_parts()
       
   702                                 try:
       
   703                                     lhsvar = lhs.variable
       
   704                                 except AttributeError:
       
   705                                     lhsvar = lhs
       
   706                                 try:
       
   707                                     rhsvar = rhs.variable
       
   708                                 except AttributeError:
       
   709                                     rhsvar = rhs
       
   710                                 if lhsvar in terms and not rhsvar in terms:
       
   711                                     needsel.add(lhsvar.name)
       
   712                                 elif rhsvar in terms and not lhsvar in terms:
       
   713                                     needsel.add(rhsvar.name)
   668                 if final and source.uri != 'system':
   714                 if final and source.uri != 'system':
   669                     # check rewritten constants
   715                     # check rewritten constants
   670                     for vconsts in select.stinfo['rewritten'].itervalues():
   716                     for vconsts in select.stinfo['rewritten'].itervalues():
   671                         const = vconsts[0]
   717                         const = vconsts[0]
   672                         eid = const.eval(self.plan.args)
   718                         eid = const.eval(self.plan.args)
   681                                 rel = c.relation()
   727                                 rel = c.relation()
   682                                 if rel is None or not (rel in terms or rel.neged(strict=True)):
   728                                 if rel is None or not (rel in terms or rel.neged(strict=True)):
   683                                     final = False
   729                                     final = False
   684                                     break
   730                                     break
   685                             break
   731                             break
   686                 # check where all relations are supported by the sources
       
   687                 for rel in scope.iget_nodes(Relation):
       
   688                     if rel.is_types_restriction():
       
   689                         continue
       
   690                     # take care not overwriting the existing "source" identifier
       
   691                     for _source in sources:
       
   692                         if not _source.support_relation(rel.r_type):
       
   693                             for vref in rel.iget_nodes(VariableRef):
       
   694                                 needsel.add(vref.name)
       
   695                             final = False
       
   696                             break
       
   697                         elif self.crossed_relation(_source, rel) and not rel in terms:
       
   698                             final = False
       
   699                             break
       
   700                     else:
       
   701                         if not scope is select:
       
   702                             self._exists_relation(rel, terms, needsel)
       
   703                         # if relation is supported by all sources and some of
       
   704                         # its lhs/rhs variable isn't in "terms", and the
       
   705                         # other end *is* in "terms", mark it have to be
       
   706                         # selected
       
   707                         if source.uri != 'system' and not rschema(rel.r_type).is_final():
       
   708                             lhs, rhs = rel.get_variable_parts()
       
   709                             try:
       
   710                                 lhsvar = lhs.variable
       
   711                             except AttributeError:
       
   712                                 lhsvar = lhs
       
   713                             try:
       
   714                                 rhsvar = rhs.variable
       
   715                             except AttributeError:
       
   716                                 rhsvar = rhs
       
   717                             if lhsvar in terms and not rhsvar in terms:
       
   718                                 needsel.add(lhsvar.name)
       
   719                             elif rhsvar in terms and not lhsvar in terms:
       
   720                                 needsel.add(rhsvar.name)
       
   721                 if final:
   732                 if final:
   722                     self._cleanup_sourcesterms(sources, solindices)
   733                     self._cleanup_sourcesterms(sources, solindices)
   723                 steps.append((sources, terms, solindices, scope, needsel, final)
   734                 steps.append((sources, terms, solindices, scope, needsel, final)
   724                              )
   735                              )
   725         return steps
   736         return steps
   757         as a base to generate an execution step
   768         as a base to generate an execution step
   758         """
   769         """
   759         secondchoice = None
   770         secondchoice = None
   760         if len(self._sourcesterms) > 1:
   771         if len(self._sourcesterms) > 1:
   761             # priority to variable from subscopes
   772             # priority to variable from subscopes
   762             for var in sourceterms:
   773             for term in sourceterms:
   763                 if not var.scope is self.rqlst:
   774                 if not term.scope is self.rqlst:
   764                     if isinstance(var, Variable):
   775                     if isinstance(term, Variable):
   765                         return var, sourceterms.pop(var)
   776                         return term, sourceterms.pop(term)
   766                     secondchoice = var
   777                     secondchoice = term
   767         else:
   778         else:
   768             # priority to variable outer scope
   779             # priority to variable from outer scope
   769             for var in sourceterms:
   780             for term in sourceterms:
   770                 if var.scope is self.rqlst:
   781                 if term.scope is self.rqlst:
   771                     if isinstance(var, Variable):
   782                     if isinstance(term, Variable):
   772                         return var, sourceterms.pop(var)
   783                         return term, sourceterms.pop(term)
   773                     secondchoice = var
   784                     secondchoice = term
   774         if secondchoice is not None:
   785         if secondchoice is not None:
   775             return secondchoice, sourceterms.pop(secondchoice)
   786             return secondchoice, sourceterms.pop(secondchoice)
   776         # priority to variable
   787         # priority to variable with the less solutions supported and with the
   777         for var in sourceterms:
   788         # most valuable refs
   778             if isinstance(var, Variable):
   789         variables = sorted([(var, sols) for (var, sols) in sourceterms.items()
   779                 return var, sourceterms.pop(var)
   790                             if isinstance(var, Variable)],
   780         # whatever
   791                            key=lambda (v, s): (len(s), -v.valuable_references()))
   781         var = iter(sourceterms).next()
   792         if variables:
   782         return var, sourceterms.pop(var)
   793             var = variables[0][0]
       
   794             return var, sourceterms.pop(var)
       
   795         # priority to constant
       
   796         for term in sourceterms:
       
   797             if isinstance(term, Constant):
       
   798                 return term, sourceterms.pop(term)
       
   799         # whatever (relation)
       
   800         term = iter(sourceterms).next()
       
   801         return term, sourceterms.pop(term)
   783             
   802             
   784     def _expand_sources(self, selected_source, term, solindices):
   803     def _expand_sources(self, selected_source, term, solindices):
   785         """return all sources supporting given term / solindices"""
   804         """return all sources supporting given term / solindices"""
   786         sources = [selected_source]
   805         sources = [selected_source]
   787         sourcesterms = self._sourcesterms
   806         sourcesterms = self._sourcesterms
   788         for source in sourcesterms:
   807         for source in sourcesterms.keys():
   789             if source is selected_source:
   808             if source is selected_source:
   790                 continue
   809                 continue
   791             if not (term in sourcesterms[source] and 
   810             if not (term in sourcesterms[source] and 
   792                     solindices.issubset(sourcesterms[source][term])):
   811                     solindices.issubset(sourcesterms[source][term])):
   793                 continue
   812                 continue
   794             sources.append(source)
   813             sources.append(source)
   795             if source.uri != 'system':
   814             if source.uri != 'system' or not (isinstance(term, Variable) and not term in self._linkedterms):
   796                 termsolindices = sourcesterms[source][term]
   815                 termsolindices = sourcesterms[source][term]
   797                 termsolindices -= solindices
   816                 termsolindices -= solindices
   798                 if not termsolindices:
   817                 if not termsolindices:
   799                     del sourcesterms[source][term]                
   818                     del sourcesterms[source][term]
       
   819                     if not sourcesterms[source]:
       
   820                         del sourcesterms[source]
   800         return sources
   821         return sources
   801             
   822             
   802     def _expand_terms(self, term, sources, sourceterms, scope, solindices):
   823     def _expand_terms(self, term, sources, sourceterms, scope, solindices):
   803         terms = [term]
   824         terms = [term]
   804         sources = sorted(sources)
   825         sources = sorted(sources)
       
   826         sourcesterms = self._sourcesterms
   805         nbunlinked = 1
   827         nbunlinked = 1
   806         linkedterms = self._linkedterms
   828         linkedterms = self._linkedterms
   807         # term has to belong to the same scope if there is more
   829         # term has to belong to the same scope if there is more
   808         # than the system source remaining
   830         # than the system source remaining
   809         if len(self._sourcesterms) > 1 and not scope is self.rqlst:
   831         if len(sourcesterms) > 1 and not scope is self.rqlst:
   810             candidates = (t for t in sourceterms.keys() if scope is t.scope)
   832             candidates = (t for t in sourceterms.keys() if scope is t.scope)
   811         else:
   833         else:
   812             candidates = sourceterms #.iterkeys()
   834             candidates = sourceterms #.iterkeys()
   813         # we only want one unlinked term in each generated query
   835         # we only want one unlinked term in each generated query
   814         candidates = [t for t in candidates
   836         candidates = [t for t in candidates
   815                       if isinstance(t, Constant) or
   837                       if isinstance(t, (Constant, Relation)) or
   816                       (solindices.issubset(sourceterms[t]) and t in linkedterms)]
   838                       (solindices.issubset(sourceterms[t]) and t in linkedterms)]
   817         accept_term = lambda x: (not any(s for s in sources if not x in self._sourcesterms[s])
   839         cross_rels = {}
   818                                 and any(t for t in terms if t in linkedterms.get(x, ())))
       
   819         source_cross_rels = {}
       
   820         for source in sources:
   840         for source in sources:
   821             source_cross_rels.update(self._crossrelations.get(source, {}))
   841             cross_rels.update(self._crossrelations.get(source, {}))
   822         if isinstance(term, Relation) and term in source_cross_rels:
   842         exclude = {}
   823             cross_terms = source_cross_rels.pop(term)
   843         for rel, crossvars in cross_rels.iteritems():
       
   844             vars = [t for t in crossvars if isinstance(t, Variable)]
       
   845             try:
       
   846                 exclude[vars[0]] = vars[1]
       
   847                 exclude[vars[1]] = vars[0]
       
   848             except IndexError:
       
   849                 pass
       
   850         accept_term = lambda x: (not any(s for s in sources if not x in sourcesterms.get(s, ()))
       
   851                                  and any(t for t in terms if t in linkedterms.get(x, ()))
       
   852                                  and not exclude.get(x) in terms)
       
   853         if isinstance(term, Relation) and term in cross_rels:
       
   854             cross_terms = cross_rels.pop(term)
   824             base_accept_term = accept_term
   855             base_accept_term = accept_term
   825             accept_term = lambda x: (base_accept_term(x) or x in cross_terms)
   856             accept_term = lambda x: (base_accept_term(x) or x in cross_terms)
   826             for refed in cross_terms:
   857             for refed in cross_terms:
   827                 if not refed in candidates:
   858                 if not refed in candidates:
   828                     candidates.append(refed)
   859                     terms.append(refed)
   829         else:
       
   830             cross_terms = ()
       
   831         # repeat until no term can't be added, since addition of a new
   860         # repeat until no term can't be added, since addition of a new
   832         # term may permit to another one to be added
   861         # term may permit to another one to be added
   833         modified = True
   862         modified = True
   834         while modified and candidates:
   863         while modified and candidates:
   835             modified = False
   864             modified = False
   844                     del sourceterms[term]
   873                     del sourceterms[term]
   845                 elif accept_term(term):
   874                 elif accept_term(term):
   846                     terms.append(term)
   875                     terms.append(term)
   847                     candidates.remove(term)
   876                     candidates.remove(term)
   848                     modified = True
   877                     modified = True
   849                     for source in sources:
   878                     self._cleanup_sourcesterms(sources, solindices, term)
   850                         sourceterms = self._sourcesterms[source]
       
   851                         # terms should be deleted once all possible solutions
       
   852                         # indices have been consumed
       
   853                         try:
       
   854                             sourceterms[term] -= solindices
       
   855                             if not sourceterms[term]:
       
   856                                 del sourceterms[term]
       
   857                         except KeyError:
       
   858                             assert term in cross_terms
       
   859         return terms
   879         return terms
   860     
   880     
   861     def _cleanup_sourcesterms(self, sources, solindices):
   881     def _cleanup_sourcesterms(self, sources, solindices, term=None):
   862         """on final parts, remove solutions so we know they are already processed"""
   882         """remove solutions so we know they are already processed"""
   863         for source in sources:
   883         for source in sources:
   864             try:
   884             try:
   865                 sourceterms = self._sourcesterms[source]
   885                 sourceterms = self._sourcesterms[source]
   866             except KeyError:
   886             except KeyError:
   867                 continue
   887                 continue
   868             for term, termsolindices in sourceterms.items():
   888             if term is None:
   869                 if isinstance(term, Relation) and self.crossed_relation(source, term):
   889                 for term, termsolindices in sourceterms.items():
   870                     continue
   890                     if isinstance(term, Relation) and self.crossed_relation(source, term):
   871                 termsolindices -= solindices
   891                         continue
   872                 if not termsolindices:
   892                     termsolindices -= solindices
   873                     del sourceterms[term]
   893                     if not termsolindices:
   874                     
   894                         del sourceterms[term]
       
   895             else:
       
   896                 try:
       
   897                     sourceterms[term] -= solindices
       
   898                     if not sourceterms[term]:
       
   899                         del sourceterms[term]
       
   900                 except KeyError:
       
   901                     pass
       
   902                     #assert term in cross_terms
       
   903             if not sourceterms:
       
   904                 del self._sourcesterms[source]
       
   905                 
   875     def merge_input_maps(self, allsolindices):
   906     def merge_input_maps(self, allsolindices):
   876         """inputmaps is a dictionary with tuple of solution indices as key with
   907         """inputmaps is a dictionary with tuple of solution indices as key with
   877         an associated input map as value. This function compute for each
   908         an associated input map as value. This function compute for each
   878         solution its necessary input map and return them grouped
   909         solution its necessary input map and return them grouped
   879 
   910 
  1014         steps = []
  1045         steps = []
  1015         for sources, cppis in cango.iteritems():
  1046         for sources, cppis in cango.iteritems():
  1016             byinputmap = {}
  1047             byinputmap = {}
  1017             for ppi in cppis:
  1048             for ppi in cppis:
  1018                 select = ppi.rqlst
  1049                 select = ppi.rqlst
  1019                 if sources != (plan.session.repo.system_source,):
  1050                 if sources != (ppi.system_source,):
  1020                     add_types_restriction(self.schema, select)
  1051                     add_types_restriction(self.schema, select)
  1021                 # part plan info for subqueries
  1052                 # part plan info for subqueries
  1022                 inputmap = self._ppi_subqueries(ppi)
  1053                 inputmap = self._ppi_subqueries(ppi)
  1023                 aggrstep = need_aggr_step(select, sources)
  1054                 aggrstep = need_aggr_step(select, sources)
  1024                 if aggrstep:
  1055                 if aggrstep:
  1102                     for solindices, inputmap in solsinputmaps:
  1133                     for solindices, inputmap in solsinputmaps:
  1103                         if inputmap is None:
  1134                         if inputmap is None:
  1104                             inputmap = subinputmap
  1135                             inputmap = subinputmap
  1105                         else:
  1136                         else:
  1106                             inputmap.update(subinputmap)
  1137                             inputmap.update(subinputmap)
  1107                         steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
  1138                         if inputmap and len(sources) > 1:
  1108                                                           sources, insertedvars))
  1139                             sources.remove(ppi.system_source)
       
  1140                             steps.append(ppi.build_final_part(minrqlst, solindices, None,
       
  1141                                                               sources, insertedvars))
       
  1142                             steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
  1143                                                               [ppi.system_source], insertedvars))
       
  1144                         else:
       
  1145                             steps.append(ppi.build_final_part(minrqlst, solindices, inputmap,
       
  1146                                                               sources, insertedvars))
  1109                 else:
  1147                 else:
  1110                     table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)),
  1148                     table = '_T%s%s' % (''.join(sorted(v._ms_table_key() for v in terms)),
  1111                                         ''.join(sorted(str(i) for i in solindices)))
  1149                                         ''.join(sorted(str(i) for i in solindices)))
  1112                     ppi.build_non_final_part(minrqlst, solindices, sources,
  1150                     ppi.build_non_final_part(minrqlst, solindices, sources,
  1113                                              insertedvars, table)
  1151                                              insertedvars, table)