--- a/cubicweb/rqlrewrite.py Wed Apr 19 09:05:10 2017 +0200
+++ b/cubicweb/rqlrewrite.py Fri Apr 21 09:57:04 2017 +0200
@@ -649,6 +649,10 @@
# the snippet has introduced some ambiguities, we have to resolve them
# "manually"
variantes = self.build_variantes(newsolutions)
+ # if all ambiguities have been generated by variables within a "NOT
+ # EXISTS()#" or with type explicitly specified, we've nothing to change
+ if not variantes:
+ return newsolutions
# insert "is" where necessary
varexistsmap = {}
self.removing_ambiguity = True
@@ -680,21 +684,32 @@
variantes = set()
for sol in newsolutions:
variante = []
- for key, newvar in self.rewritten.items():
- variante.append((key, sol[newvar]))
- variantes.add(tuple(variante))
- # rebuild variantes as dict
- variantes = [dict(v) for v in variantes]
- # remove variable which have always the same type
- for key in self.rewritten:
- it = iter(variantes)
- etype = next(it)[key]
- for variante in it:
- if variante[key] != etype:
- break
- else:
- for variante in variantes:
- del variante[key]
+ for key, var_name in self.rewritten.items():
+ var = self.select.defined_vars[var_name]
+ # skip variable which are only in a NOT EXISTS
+ if len(var.stinfo['relations']) == 1 and isinstance(var.scope.parent, n.Not):
+ continue
+ # skip variable whose type is already explicitly specified
+ if var.stinfo['typerel']:
+ continue
+ variante.append((key, sol[var_name]))
+ if variante:
+ variantes.add(tuple(variante))
+
+ if variantes:
+ # rebuild variantes as dict
+ variantes = [dict(v) for v in variantes]
+ # remove variable which have always the same type
+ for key in self.rewritten:
+ it = iter(variantes)
+ etype = next(it)[key]
+ for variante in it:
+ if variante[key] != etype:
+ break
+ else:
+ for variante in variantes:
+ del variante[key]
+
return variantes
def _cleanup_inserted(self, node):
--- a/cubicweb/test/unittest_rqlrewrite.py Wed Apr 19 09:05:10 2017 +0200
+++ b/cubicweb/test/unittest_rqlrewrite.py Fri Apr 21 09:57:04 2017 +0200
@@ -67,14 +67,28 @@
def rewrite(rqlst, snippets_map, kwargs, existingvars=None):
rewriter = _prepare_rewriter(rqlrewrite.RQLRewriter, kwargs)
+ # turn {(V1, V2): constraints} into [(varmap, constraints)]
snippets = []
+ snippet_varmap = {}
for v, exprs in sorted(snippets_map.items()):
- rqlexprs = [isinstance(snippet, string_types)
- and mock_object(snippet_rqlst=parse(u'Any X WHERE '+snippet).children[0],
- expression=u'Any X WHERE '+snippet)
- or snippet
- for snippet in exprs]
- snippets.append((dict([v]), rqlexprs))
+ rqlexprs = []
+ varmap = dict([v])
+ for snippet in exprs:
+ # when the same snippet is impacting several variables, group them
+ # unless there is some conflicts on the snippet's variable name (we
+ # only want that for constraint on relations using both S and O)
+ if snippet in snippet_varmap and not (
+ set(varmap.values()) & set(snippet_varmap[snippet].values())):
+ snippet_varmap[snippet].update(varmap)
+ continue
+ snippet_varmap[snippet] = varmap
+ if isinstance(snippet, string_types):
+ snippet = mock_object(snippet_rqlst=parse(u'Any X WHERE ' + snippet).children[0],
+ expression=u'Any X WHERE ' + snippet)
+ rqlexprs.append(snippet)
+ if rqlexprs:
+ snippets.append((varmap, rqlexprs))
+
rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
rewriter.rewrite(rqlst.children[0], snippets, kwargs, existingvars)
check_vrefs(rqlst.children[0])
@@ -503,6 +517,36 @@
'Any O WHERE S use_email O, S is CWUser, O is EmailAddress, '
'EXISTS(NOT S in_group A, A name "guests", A is CWGroup)')
+ def test_ambiguous_constraint_not_exists(self):
+ state_constraint = (
+ 'NOT EXISTS(A require_permission S) '
+ 'OR EXISTS(B require_permission S, B is Card, O name "state1")'
+ 'OR EXISTS(C require_permission S, C is Note, O name "state2")'
+ )
+ rqlst = parse(u'Any P WHERE NOT P require_state S')
+ rewrite(rqlst, {('P', 'S'): (state_constraint,), ('S', 'O'): (state_constraint,)}, {})
+ self.assertMultiLineEqual(
+ rqlst.as_string(),
+ u'Any P WHERE NOT P require_state S, '
+ 'EXISTS(((NOT EXISTS(A require_permission P, A is IN(Card, Note)))'
+ ' OR (EXISTS(B require_permission P, B is Card, S name "state1")))'
+ ' OR (EXISTS(C require_permission P, C is Note, S name "state2"))), '
+ 'P is CWPermission, S is State')
+
+ def test_ambiguous_using_is_in_function(self):
+ state_constraint = (
+ 'NOT EXISTS(A require_permission S) '
+ 'OR EXISTS(B require_permission S, B is IN (Card, Note), O name "state1")'
+ )
+ rqlst = parse(u'Any P WHERE NOT P require_state S')
+ rewrite(rqlst, {('P', 'S'): (state_constraint,), ('S', 'O'): (state_constraint,)}, {})
+ self.assertMultiLineEqual(
+ rqlst.as_string(),
+ u'Any P WHERE NOT P require_state S, '
+ 'EXISTS((NOT EXISTS(A require_permission P, A is IN(Card, Note))) '
+ 'OR (EXISTS(B require_permission P, B is IN(Card, Note), S name "state1"))), '
+ 'P is CWPermission, S is State')
+
from cubicweb.devtools.testlib import CubicWebTC
class RewriteFullTC(CubicWebTC):