--- a/cubicweb/test/unittest_rqlrewrite.py Tue Sep 12 09:48:02 2017 +0200
+++ b/cubicweb/test/unittest_rqlrewrite.py Tue Sep 12 09:49:30 2017 +0200
@@ -18,15 +18,17 @@
from six import string_types
-from logilab.common.testlib import unittest_main, TestCase
from logilab.common.testlib import mock_object
+from logilab.common.decorators import monkeypatch
from yams import BadSchemaDefinition
from yams.buildobjs import RelationDefinition
from rql import parse, nodes, RQLHelper
from cubicweb import Unauthorized, rqlrewrite, devtools
+from cubicweb.rqlrewrite import RQLRewriter
from cubicweb.schema import RRQLExpression, ERQLExpression
from cubicweb.devtools import repotest
+from cubicweb.devtools.testlib import CubicWebTC, TestCase
def setUpModule(*args):
@@ -40,46 +42,65 @@
'has_text': 'fti'})
repotest.do_monkey_patch()
+
def tearDownModule(*args):
repotest.undo_monkey_patch()
global rqlhelper, schema
del rqlhelper, schema
+
def eid_func_map(eid):
return {1: 'CWUser',
2: 'Card',
3: 'Affaire'}[eid]
+
def _prepare_rewriter(rewriter_cls, kwargs):
class FakeVReg:
schema = schema
+
@staticmethod
def solutions(sqlcursor, rqlst, kwargs):
rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs)
+
class rqlhelper:
@staticmethod
def annotate(rqlst):
rqlhelper.annotate(rqlst)
- @staticmethod
- def simplify(mainrqlst, needcopy=False):
- rqlhelper.simplify(rqlst, needcopy)
+
return rewriter_cls(mock_object(vreg=FakeVReg, user=(mock_object(eid=1))))
+
def rewrite(rqlst, snippets_map, kwargs, existingvars=None):
rewriter = _prepare_rewriter(rqlrewrite.RQLRewriter, kwargs)
+ # turn {(V1, V2): constraints} into [(varmap, constraints)]
snippets = []
+ snippet_varmap = {}
for v, exprs in sorted(snippets_map.items()):
- rqlexprs = [isinstance(snippet, string_types)
- and mock_object(snippet_rqlst=parse(u'Any X WHERE '+snippet).children[0],
- expression=u'Any X WHERE '+snippet)
- or snippet
- for snippet in exprs]
- snippets.append((dict([v]), rqlexprs))
+ rqlexprs = []
+ varmap = dict([v])
+ for snippet in exprs:
+ # when the same snippet is impacting several variables, group them
+ # unless there is some conflicts on the snippet's variable name (we
+ # only want that for constraint on relations using both S and O)
+ if snippet in snippet_varmap and not (
+ set(varmap.values()) & set(snippet_varmap[snippet].values())):
+ snippet_varmap[snippet].update(varmap)
+ continue
+ snippet_varmap[snippet] = varmap
+ if isinstance(snippet, string_types):
+ snippet = mock_object(snippet_rqlst=parse(u'Any X WHERE ' + snippet).children[0],
+ expression=u'Any X WHERE ' + snippet)
+ rqlexprs.append(snippet)
+ if rqlexprs:
+ snippets.append((varmap, rqlexprs))
+
rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs)
rewriter.rewrite(rqlst.children[0], snippets, kwargs, existingvars)
check_vrefs(rqlst.children[0])
return rewriter.rewritten
+
def check_vrefs(node):
vrefmaps = {}
selects = []
@@ -88,14 +109,15 @@
try:
vrefmaps[stmt].setdefault(vref.name, set()).add(vref)
except KeyError:
- vrefmaps[stmt] = {vref.name: set( (vref,) )}
+ vrefmaps[stmt] = {vref.name: set((vref,))}
selects.append(stmt)
assert node in selects, (node, selects)
for stmt in selects:
for var in stmt.defined_vars.values():
assert var.stinfo['references']
vrefmap = vrefmaps[stmt]
- assert not (var.stinfo['references'] ^ vrefmap[var.name]), (node.as_string(), var, var.stinfo['references'], vrefmap[var.name])
+ assert not (var.stinfo['references'] ^ vrefmap[var.name]), (
+ node.as_string(), var, var.stinfo['references'], vrefmap[var.name])
class RQLRewriteTC(TestCase):
@@ -109,72 +131,81 @@
def test_base_var(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
+ 'P name "read", P require_group G')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, B eid %(D)s, '
- 'EXISTS(C in_state A, B in_group E, F require_state A, '
- 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, B eid %(D)s, '
+ 'EXISTS(C in_state A, B in_group E, F require_state A, '
+ 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission)')
def test_multiple_var(self):
card_constraint = ('X in_state S, U in_group G, P require_state S,'
'P name "read", P require_group G')
affaire_constraints = ('X ref LIKE "PUBLIC%"', 'U in_group G, G name "public"')
- kwargs = {'u':2}
+ kwargs = {'u': 2}
rqlst = parse(u'Any S WHERE S documented_by C, C eid %(u)s')
rewrite(rqlst, {('C', 'X'): (card_constraint,), ('S', 'X'): affaire_constraints},
kwargs)
self.assertMultiLineEqual(
rqlst.as_string(),
u'Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, '
- 'EXISTS(C in_state A, B in_group E, F require_state A, '
- 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission), '
- '(EXISTS(S ref LIKE "PUBLIC%")) OR (EXISTS(B in_group G, G name "public", G is CWGroup)), '
- 'S is Affaire')
+ 'EXISTS(C in_state A, B in_group E, F require_state A, '
+ 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission), '
+ '(EXISTS(S ref LIKE "PUBLIC%")) '
+ 'OR (EXISTS(B in_group G, G name "public", G is CWGroup)), '
+ 'S is Affaire')
self.assertIn('D', kwargs)
def test_or(self):
- constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
+ constraint = (
+ '(X identity U) OR '
+ '(X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
+ )
rqlst = parse(u'Any S WHERE S owned_by C, C eid %(u)s, S is in (CWUser, CWGroup)')
- rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u':1})
- self.assertEqual(rqlst.as_string(),
- 'Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, '
- 'EXISTS((C identity A) OR (C in_state D, E identity A, '
- 'E in_state D, D name "subscribed"), D is State, E is CWUser)')
+ rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u': 1})
+ self.assertEqual(
+ rqlst.as_string(),
+ 'Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, '
+ 'EXISTS((C identity A) OR (C in_state D, E identity A, '
+ 'E in_state D, D name "subscribed"), D is State, E is CWUser)')
def test_simplified_rqlst(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
- rqlst = parse(u'Any 2') # this is the simplified rql st for Any X WHERE X eid 12
+ 'P name "read", P require_group G')
+ rqlst = parse(u'Any 2') # this is the simplified rql st for Any X WHERE X eid 12
rewrite(rqlst, {('2', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any 2 WHERE B eid %(C)s, '
- 'EXISTS(2 in_state A, B in_group D, E require_state A, '
- 'E name "read", E require_group D, A is State, D is CWGroup, E is CWPermission)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any 2 WHERE B eid %(C)s, '
+ 'EXISTS(2 in_state A, B in_group D, E require_state A, '
+ 'E name "read", E require_group D, A is State, D is CWGroup, E is CWPermission)')
def test_optional_var_1(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
+ 'P name "read", P require_group G')
rqlst = parse(u'Any A,C WHERE A documented_by C?')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any A,C WHERE A documented_by C?, A is Affaire '
- 'WITH C BEING '
- '(Any C WHERE EXISTS(C in_state B, D in_group F, G require_state B, G name "read", '
- 'G require_group F), D eid %(A)s, C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,C WHERE A documented_by C?, A is Affaire '
+ 'WITH C BEING '
+ '(Any C WHERE EXISTS(C in_state B, D in_group F, G require_state B, G name "read", '
+ 'G require_group F), D eid %(A)s, C is Card)')
def test_optional_var_2(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
+ 'P name "read", P require_group G')
rqlst = parse(u'Any A,C,T WHERE A documented_by C?, C title T')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any A,C,T WHERE A documented_by C?, A is Affaire '
- 'WITH C,T BEING '
- '(Any C,T WHERE C title T, EXISTS(C in_state B, D in_group F, '
- 'G require_state B, G name "read", G require_group F), '
- 'D eid %(A)s, C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,C,T WHERE A documented_by C?, A is Affaire '
+ 'WITH C,T BEING '
+ '(Any C,T WHERE C title T, EXISTS(C in_state B, D in_group F, '
+ 'G require_state B, G name "read", G require_group F), '
+ 'D eid %(A)s, C is Card)')
def test_optional_var_3(self):
constraint1 = ('X in_state S, U in_group G, P require_state S,'
@@ -182,12 +213,14 @@
constraint2 = 'X in_state S, S name "public"'
rqlst = parse(u'Any A,C,T WHERE A documented_by C?, C title T')
rewrite(rqlst, {('C', 'X'): (constraint1, constraint2)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any A,C,T WHERE A documented_by C?, A is Affaire '
- 'WITH C,T BEING (Any C,T WHERE C title T, '
- '(EXISTS(C in_state B, D in_group F, G require_state B, G name "read", G require_group F)) '
- 'OR (EXISTS(C in_state E, E name "public")), '
- 'D eid %(A)s, C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,C,T WHERE A documented_by C?, A is Affaire '
+ 'WITH C,T BEING (Any C,T WHERE C title T, '
+ '(EXISTS(C in_state B, D in_group F, G require_state B, '
+ 'G name "read", G require_group F)) '
+ 'OR (EXISTS(C in_state E, E name "public")), '
+ 'D eid %(A)s, C is Card)')
def test_optional_var_4(self):
constraint1 = 'A created_by U, X documented_by A'
@@ -197,29 +230,38 @@
rewrite(rqlst, {('LA', 'X'): (constraint1, constraint2),
('X', 'X'): (constraint3,),
('Y', 'X'): (constraint3,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y, B eid %(C)s, '
- 'EXISTS(X created_by B), EXISTS(Y created_by B), '
- 'X is Card, Y is IN(Division, Note, Societe) '
- 'WITH LA BEING (Any LA WHERE (EXISTS(A created_by B, LA documented_by A)) OR (EXISTS(E created_by B, LA concerne E)), '
- 'B eid %(D)s, LA is Affaire)')
-
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y, B eid %(C)s, '
+ 'EXISTS(X created_by B), EXISTS(Y created_by B), '
+ 'X is Card, Y is IN(Division, Note, Societe) '
+ 'WITH LA BEING (Any LA WHERE (EXISTS(A created_by B, LA documented_by A)) '
+ 'OR (EXISTS(E created_by B, LA concerne E)), '
+ 'B eid %(D)s, LA is Affaire)')
def test_ambiguous_optional_same_exprs(self):
"""See #3013535"""
# see test of the same name in RewriteFullTC: original problem is
# unreproducible here because it actually lies in
# RQLRewriter.insert_local_checks
- rqlst = parse(u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s, X creation_date CD')
- rewrite(rqlst, {('X', 'X'): ('X created_by U',),}, {'a': 3})
- self.assertEqual(rqlst.as_string(),
- u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s WITH X,CD BEING (Any X,CD WHERE X creation_date CD, EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
+ rqlst = parse(u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, '
+ 'A eid %(a)s, X creation_date CD')
+ rewrite(rqlst, {('X', 'X'): ('X created_by U',)}, {'a': 3})
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s '
+ 'WITH X,CD BEING (Any X,CD WHERE X creation_date CD, '
+ 'EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
def test_ambiguous_optional_same_exprs_constant(self):
- rqlst = parse(u'Any A,AR,X WHERE A concerne X?, A ref AR, A eid %(a)s, X creation_date TODAY')
- rewrite(rqlst, {('X', 'X'): ('X created_by U',),}, {'a': 3})
- self.assertEqual(rqlst.as_string(),
- u'Any A,AR,X WHERE A concerne X?, A ref AR, A eid %(a)s WITH X BEING (Any X WHERE X creation_date TODAY, EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
+ rqlst = parse(u'Any A,AR,X WHERE A concerne X?, A ref AR, '
+ 'A eid %(a)s, X creation_date TODAY')
+ rewrite(rqlst, {('X', 'X'): ('X created_by U',)}, {'a': 3})
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,AR,X WHERE A concerne X?, A ref AR, A eid %(a)s '
+ 'WITH X BEING (Any X WHERE X creation_date TODAY, '
+ 'EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
def test_optional_var_inlined(self):
c1 = ('X require_permission P')
@@ -229,12 +271,13 @@
('A', 'X'): (c2,),
}, {})
# XXX suboptimal
- self.assertEqual(rqlst.as_string(),
- "Any C,A,R WITH A,C,R BEING "
- "(Any A,C,R WHERE A? inlined_card C, A ref R, "
- "(A is NULL) OR (EXISTS(A inlined_card B, B require_permission D, "
- "B is Card, D is CWPermission)), "
- "A is Affaire, C is Card, EXISTS(C require_permission E, E is CWPermission))")
+ self.assertEqual(
+ rqlst.as_string(),
+ "Any C,A,R WITH A,C,R BEING "
+ "(Any A,C,R WHERE A? inlined_card C, A ref R, "
+ "(A is NULL) OR (EXISTS(A inlined_card B, B require_permission D, "
+ "B is Card, D is CWPermission)), "
+ "A is Affaire, C is Card, EXISTS(C require_permission E, E is CWPermission))")
# def test_optional_var_inlined_has_perm(self):
# c1 = ('X require_permission P')
@@ -249,7 +292,8 @@
def test_optional_var_inlined_imbricated_error(self):
c1 = ('X require_permission P')
c2 = ('X inlined_card O, O require_permission P')
- rqlst = parse(u'Any C,A,R,A2,R2 WHERE A? inlined_card C, A ref R,A2? inlined_card C, A2 ref R2')
+ rqlst = parse(u'Any C,A,R,A2,R2 WHERE A? inlined_card C, A ref R,'
+ 'A2? inlined_card C, A2 ref R2')
self.assertRaises(BadSchemaDefinition,
rewrite, rqlst, {('C', 'X'): (c1,),
('A', 'X'): (c2,),
@@ -258,7 +302,6 @@
def test_optional_var_inlined_linked(self):
c1 = ('X require_permission P')
- c2 = ('X inlined_card O, O require_permission P')
rqlst = parse(u'Any A,W WHERE A inlined_card C?, C inlined_note N, '
'N inlined_affaire W')
rewrite(rqlst, {('C', 'X'): (c1,)}, {})
@@ -295,6 +338,7 @@
self.assertEqual(rqlst.as_string(),
'Any C WHERE C in_state STATE?, C is Card, '
'EXISTS(STATE name "hop"), STATE is State')
+
def test_relation_optimization_2_rhs(self):
snippet = ('TW? subworkflow_exit X, TW name "hop"')
rqlst = parse(u'SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT')
@@ -362,10 +406,11 @@
def test_unsupported_constraint_2(self):
trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
rqlst = parse(u'Any U,T WHERE U is CWUser, T wf_info_for U')
- rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {})
+ rewrite(rqlst, {('T', 'X'): (trinfo_constraint,
+ 'X wf_info_for Y, Y in_group G, G name "managers"')}, {})
self.assertEqual(rqlst.as_string(),
u'Any U,T WHERE U is CWUser, T wf_info_for U, '
- 'EXISTS(U in_group B, B name "managers", B is CWGroup), T is TrInfo')
+ u'EXISTS(U in_group B, B name "managers", B is CWGroup), T is TrInfo')
def test_unsupported_constraint_3(self):
self.skipTest('raise unauthorized for now')
@@ -379,17 +424,20 @@
constraint = ('X concerne Y')
rqlst = parse(u'Affaire X')
rewrite(rqlst, {('X', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
+ self.assertEqual(
+ rqlst.as_string(),
+ u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) "
+ "OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
def test_add_ambiguity_outerjoin(self):
constraint = ('X concerne Y')
rqlst = parse(u'Any X,C WHERE X? documented_by C')
rewrite(rqlst, {('X', 'X'): (constraint,)}, {})
# ambiguity are kept in the sub-query, no need to be resolved using OR
- self.assertEqual(rqlst.as_string(),
- u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE EXISTS(X concerne A), X is Affaire)")
-
+ self.assertEqual(
+ rqlst.as_string(),
+ u"Any X,C WHERE X? documented_by C, C is Card "
+ "WITH X BEING (Any X WHERE EXISTS(X concerne A), X is Affaire)")
def test_rrqlexpr_nonexistant_subject_1(self):
constraint = RRQLExpression('S owned_by U')
@@ -418,26 +466,33 @@
'Any C WHERE C is Card, B eid %(D)s, EXISTS(A owned_by B, A is Card)')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU')
- self.assertEqual(rqlst.as_string(),
- 'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ 'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)')
def test_rrqlexpr_nonexistant_subject_3(self):
constraint = RRQLExpression('U in_group G, G name "users"')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, A eid %(B)s, '
+ 'EXISTS(A in_group D, D name "users", D is CWGroup)')
def test_rrqlexpr_nonexistant_subject_4(self):
constraint = RRQLExpression('U in_group G, G name "users", S owned_by U')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, A eid %(B)s, '
+ 'EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, A eid %(B)s, '
+ 'EXISTS(A in_group D, D name "users", D is CWGroup)')
def test_rrqlexpr_nonexistant_subject_5(self):
constraint = RRQLExpression('S owned_by Z, O owned_by Z, O is Card')
@@ -450,22 +505,28 @@
constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
rqlst = parse(u'Affaire A WHERE NOT EXISTS(A documented_by C)')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE NOT EXISTS(A documented_by C, '
+ 'EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
def test_rqlexpr_not_relation_1_2(self):
constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
rqlst = parse(u'Affaire A WHERE NOT EXISTS(A documented_by C)')
rewrite(rqlst, {('A', 'X'): (constraint,)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE NOT EXISTS(A documented_by C, C is Card), A is Affaire, EXISTS(A owned_by B, B login "hop", B is CWUser)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE NOT EXISTS(A documented_by C, C is Card), A is Affaire, '
+ 'EXISTS(A owned_by B, B login "hop", B is CWUser)')
def test_rqlexpr_not_relation_2(self):
constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
rqlst = rqlhelper.parse(u'Affaire A WHERE NOT A documented_by C', annotate=False)
rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE NOT EXISTS(A documented_by C, '
+ 'EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
def test_rqlexpr_multiexpr_outerjoin(self):
c1 = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
@@ -473,11 +534,12 @@
c3 = ERQLExpression('X owned_by Z, Z login "momo"', 'X')
rqlst = rqlhelper.parse(u'Any A WHERE A documented_by C?', annotate=False)
rewrite(rqlst, {('C', 'X'): (c1, c2, c3)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE A documented_by C?, A is Affaire '
- 'WITH C BEING (Any C WHERE ((EXISTS(C owned_by B, B login "hop")) '
- 'OR (EXISTS(C owned_by D, D login "momo"))) '
- 'OR (EXISTS(C owned_by A, A login "hip")), C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE A documented_by C?, A is Affaire '
+ 'WITH C BEING (Any C WHERE ((EXISTS(C owned_by B, B login "hop")) '
+ 'OR (EXISTS(C owned_by D, D login "momo"))) '
+ 'OR (EXISTS(C owned_by A, A login "hip")), C is Card)')
def test_multiple_erql_one_bad(self):
#: reproduce bug #2236985
@@ -503,7 +565,36 @@
'Any O WHERE S use_email O, S is CWUser, O is EmailAddress, '
'EXISTS(NOT S in_group A, A name "guests", A is CWGroup)')
-from cubicweb.devtools.testlib import CubicWebTC
+ def test_ambiguous_constraint_not_exists(self):
+ state_constraint = (
+ 'NOT EXISTS(A require_permission S) '
+ 'OR EXISTS(B require_permission S, B is Card, O name "state1")'
+ 'OR EXISTS(C require_permission S, C is Note, O name "state2")'
+ )
+ rqlst = parse(u'Any P WHERE NOT P require_state S')
+ rewrite(rqlst, {('P', 'S'): (state_constraint,), ('S', 'O'): (state_constraint,)}, {})
+ self.assertMultiLineEqual(
+ rqlst.as_string(),
+ u'Any P WHERE NOT P require_state S, '
+ '((NOT EXISTS(A require_permission P, A is IN(Card, Note)))'
+ ' OR (EXISTS(B require_permission P, B is Card, S name "state1")))'
+ ' OR (EXISTS(C require_permission P, C is Note, S name "state2")), '
+ 'P is CWPermission, S is State')
+
+ def test_ambiguous_using_is_in_function(self):
+ state_constraint = (
+ 'NOT EXISTS(A require_permission S) '
+ 'OR EXISTS(B require_permission S, B is IN (Card, Note), O name "state1")'
+ )
+ rqlst = parse(u'Any P WHERE NOT P require_state S')
+ rewrite(rqlst, {('P', 'S'): (state_constraint,), ('S', 'O'): (state_constraint,)}, {})
+ self.assertMultiLineEqual(
+ rqlst.as_string(),
+ u'Any P WHERE NOT P require_state S, '
+ '(NOT EXISTS(A require_permission P, A is IN(Card, Note))) '
+ 'OR (EXISTS(B require_permission P, B is IN(Card, Note), S name "state1")), '
+ 'P is CWPermission, S is State')
+
class RewriteFullTC(CubicWebTC):
appid = 'data-rewrite'
@@ -512,7 +603,7 @@
if args is None:
args = {}
querier = self.repo.querier
- union = parse(rql) # self.vreg.parse(rql, annotate=True)
+ union = parse(rql) # self.vreg.parse(rql, annotate=True)
with self.admin_access.repo_cnx() as cnx:
self.vreg.solutions(cnx, union, args)
querier._annotate(union)
@@ -546,7 +637,6 @@
union = self.process('Any A,AR,X,CD WHERE A concerne X?, A ref AR, X creation_date CD')
self.assertEqual(union.as_string(), 'not generated today')
-
def test_xxxx(self):
edef1 = self.schema['Societe']
edef2 = self.schema['Division']
@@ -564,12 +654,11 @@
def test_question_mark_attribute_snippet(self):
# see #3661918
- from cubicweb.rqlrewrite import RQLRewriter
- from logilab.common.decorators import monkeypatch
repotest.undo_monkey_patch()
orig_insert_snippets = RQLRewriter.insert_snippets
# patch insert_snippets and not rewrite, insert_snippets is already
# monkey patches (see above setupModule/repotest)
+
@monkeypatch(RQLRewriter)
def insert_snippets(self, snippets, varexistsmap=None):
# crash occurs if snippets are processed in a specific order, force
@@ -632,21 +721,21 @@
'C role D, D name "illustrator")',
rqlst.as_string())
-
def test_rewrite2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B, C require_permission R, S'
'require_state O')
rule_rewrite(rqlst, rules)
- self.assertEqual('Any A,B WHERE C require_permission R, S require_state O, '
- 'D is Contribution, D contributor A, D manifestation B, D role E, '
- 'E name "illustrator"',
- rqlst.as_string())
+ self.assertEqual(
+ 'Any A,B WHERE C require_permission R, S require_state O, '
+ 'D is Contribution, D contributor A, D manifestation B, D role E, '
+ 'E name "illustrator"',
+ rqlst.as_string())
def test_rewrite3(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE E require_permission T, A illustrator_of B')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE E require_permission T, '
@@ -656,7 +745,7 @@
def test_rewrite4(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE C require_permission R, A illustrator_of B')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE C require_permission R, '
@@ -666,7 +755,7 @@
def test_rewrite5(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE C require_permission R, A illustrator_of B, '
'S require_state O')
rule_rewrite(rqlst, rules)
@@ -678,7 +767,7 @@
# Tests for the with clause
def test_rewrite_with(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WITH A,B BEING '
@@ -688,8 +777,9 @@
def test_rewrite_with2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
- rqlst = rqlhelper.parse(u'Any A,B WHERE T require_permission C WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
+ 'C manifestation O, C role R, R name "illustrator"'}
+ rqlst = rqlhelper.parse(u'Any A,B WHERE T require_permission C '
+ 'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE T require_permission C '
'WITH A,B BEING (Any X,Y WHERE A is Contribution, '
@@ -707,32 +797,34 @@
def test_rewrite_with4(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B '
- 'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
+ 'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
rule_rewrite(rqlst, rules)
- self.assertEqual('Any A,B WHERE C is Contribution, '
- 'C contributor A, C manifestation B, C role D, '
- 'D name "illustrator" WITH A,B BEING '
- '(Any X,Y WHERE A is Contribution, A contributor X, '
- 'A manifestation Y, A role B, B name "illustrator")',
- rqlst.as_string())
+ self.assertEqual(
+ 'Any A,B WHERE C is Contribution, '
+ 'C contributor A, C manifestation B, C role D, '
+ 'D name "illustrator" WITH A,B BEING '
+ '(Any X,Y WHERE A is Contribution, A contributor X, '
+ 'A manifestation Y, A role B, B name "illustrator")',
+ rqlst.as_string())
# Tests for the union
def test_rewrite_union(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B) UNION'
'(Any X,Y WHERE X is CWUser, Z manifestation Y)')
rule_rewrite(rqlst, rules)
- self.assertEqual('(Any A,B WHERE C is Contribution, '
- 'C contributor A, C manifestation B, C role D, '
- 'D name "illustrator") UNION (Any X,Y WHERE X is CWUser, Z manifestation Y)',
- rqlst.as_string())
+ self.assertEqual(
+ '(Any A,B WHERE C is Contribution, '
+ 'C contributor A, C manifestation B, C role D, '
+ 'D name "illustrator") UNION (Any X,Y WHERE X is CWUser, Z manifestation Y)',
+ rqlst.as_string())
def test_rewrite_union2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any Y WHERE Y match W) UNION '
'(Any A WHERE A illustrator_of B) UNION '
'(Any Y WHERE Y is ArtWork)')
@@ -746,9 +838,9 @@
# Tests for the exists clause
def test_rewrite_exists(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B, '
- 'EXISTS(B is ArtWork))')
+ 'EXISTS(B is ArtWork))')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE EXISTS(B is ArtWork), '
'C is Contribution, C contributor A, C manifestation B, C role D, '
@@ -757,7 +849,7 @@
def test_rewrite_exists2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE B contributor A, EXISTS(A illustrator_of W))')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE B contributor A, '
@@ -767,7 +859,7 @@
def test_rewrite_exists3(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B, EXISTS(A illustrator_of W))')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE EXISTS(C is Contribution, C contributor A, '
@@ -779,13 +871,14 @@
# Test for GROUPBY
def test_rewrite_groupby(self):
rules = {'participated_in': 'S contributor O'}
- rqlst = rqlhelper.parse(u'Any SUM(SA) GROUPBY S WHERE P participated_in S, P manifestation SA')
+ rqlst = rqlhelper.parse(u'Any SUM(SA) GROUPBY S '
+ 'WHERE P participated_in S, P manifestation SA')
rule_rewrite(rqlst, rules)
self.assertEqual('Any SUM(SA) GROUPBY S WHERE P manifestation SA, P contributor S',
rqlst.as_string())
-class RQLRelationRewriterTC(CubicWebTC):
+class RQLRelationRewriterCWTC(CubicWebTC):
appid = 'data-rewrite'
@@ -794,8 +887,8 @@
art = cnx.create_entity('ArtWork', name=u'Les travailleurs de la Mer')
role = cnx.create_entity('Role', name=u'illustrator')
vic = cnx.create_entity('Person', name=u'Victor Hugo')
- contrib = cnx.create_entity('Contribution', code=96, contributor=vic,
- manifestation=art, role=role)
+ cnx.create_entity('Contribution', code=96, contributor=vic,
+ manifestation=art, role=role)
rset = cnx.execute('Any X WHERE X illustrator_of S')
self.assertEqual([u'Victor Hugo'],
[result.name for result in rset.entities()])
@@ -816,4 +909,5 @@
if __name__ == '__main__':
- unittest_main()
+ import unittest
+ unittest.main()