--- a/cubicweb/test/unittest_rqlrewrite.py Thu Apr 20 18:05:06 2017 +0200
+++ b/cubicweb/test/unittest_rqlrewrite.py Tue Apr 25 17:35:29 2017 +0200
@@ -20,13 +20,16 @@
from logilab.common.testlib import unittest_main, TestCase
from logilab.common.testlib import mock_object
+from logilab.common.decorators import monkeypatch
from yams import BadSchemaDefinition
from yams.buildobjs import RelationDefinition
from rql import parse, nodes, RQLHelper
from cubicweb import Unauthorized, rqlrewrite, devtools
+from cubicweb.rqlrewrite import RQLRewriter
from cubicweb.schema import RRQLExpression, ERQLExpression
from cubicweb.devtools import repotest
+from cubicweb.devtools.testlib import CubicWebTC
def setUpModule(*args):
@@ -40,22 +43,27 @@
'has_text': 'fti'})
repotest.do_monkey_patch()
+
def tearDownModule(*args):
repotest.undo_monkey_patch()
global rqlhelper, schema
del rqlhelper, schema
+
def eid_func_map(eid):
return {1: 'CWUser',
2: 'Card',
3: 'Affaire'}[eid]
+
def _prepare_rewriter(rewriter_cls, kwargs):
class FakeVReg:
schema = schema
+
@staticmethod
def solutions(sqlcursor, rqlst, kwargs):
rqlhelper.compute_solutions(rqlst, {'eid': eid_func_map}, kwargs=kwargs)
+
class rqlhelper:
@staticmethod
def annotate(rqlst):
@@ -65,6 +73,7 @@
rqlhelper.simplify(rqlst, needcopy)
return rewriter_cls(mock_object(vreg=FakeVReg, user=(mock_object(eid=1))))
+
def rewrite(rqlst, snippets_map, kwargs, existingvars=None):
rewriter = _prepare_rewriter(rqlrewrite.RQLRewriter, kwargs)
# turn {(V1, V2): constraints} into [(varmap, constraints)]
@@ -94,6 +103,7 @@
check_vrefs(rqlst.children[0])
return rewriter.rewritten
+
def check_vrefs(node):
vrefmaps = {}
selects = []
@@ -102,14 +112,15 @@
try:
vrefmaps[stmt].setdefault(vref.name, set()).add(vref)
except KeyError:
- vrefmaps[stmt] = {vref.name: set( (vref,) )}
+ vrefmaps[stmt] = {vref.name: set((vref,))}
selects.append(stmt)
assert node in selects, (node, selects)
for stmt in selects:
for var in stmt.defined_vars.values():
assert var.stinfo['references']
vrefmap = vrefmaps[stmt]
- assert not (var.stinfo['references'] ^ vrefmap[var.name]), (node.as_string(), var, var.stinfo['references'], vrefmap[var.name])
+ assert not (var.stinfo['references'] ^ vrefmap[var.name]), (
+ node.as_string(), var, var.stinfo['references'], vrefmap[var.name])
class RQLRewriteTC(TestCase):
@@ -123,72 +134,81 @@
def test_base_var(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
+ 'P name "read", P require_group G')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, B eid %(D)s, '
- 'EXISTS(C in_state A, B in_group E, F require_state A, '
- 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, B eid %(D)s, '
+ 'EXISTS(C in_state A, B in_group E, F require_state A, '
+ 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission)')
def test_multiple_var(self):
card_constraint = ('X in_state S, U in_group G, P require_state S,'
'P name "read", P require_group G')
affaire_constraints = ('X ref LIKE "PUBLIC%"', 'U in_group G, G name "public"')
- kwargs = {'u':2}
+ kwargs = {'u': 2}
rqlst = parse(u'Any S WHERE S documented_by C, C eid %(u)s')
rewrite(rqlst, {('C', 'X'): (card_constraint,), ('S', 'X'): affaire_constraints},
kwargs)
self.assertMultiLineEqual(
rqlst.as_string(),
u'Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, '
- 'EXISTS(C in_state A, B in_group E, F require_state A, '
- 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission), '
- '(EXISTS(S ref LIKE "PUBLIC%")) OR (EXISTS(B in_group G, G name "public", G is CWGroup)), '
- 'S is Affaire')
+ 'EXISTS(C in_state A, B in_group E, F require_state A, '
+ 'F name "read", F require_group E, A is State, E is CWGroup, F is CWPermission), '
+ '(EXISTS(S ref LIKE "PUBLIC%")) '
+ 'OR (EXISTS(B in_group G, G name "public", G is CWGroup)), '
+ 'S is Affaire')
self.assertIn('D', kwargs)
def test_or(self):
- constraint = '(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
+ constraint = (
+ '(X identity U) OR '
+ '(X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'
+ )
rqlst = parse(u'Any S WHERE S owned_by C, C eid %(u)s, S is in (CWUser, CWGroup)')
- rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u':1})
- self.assertEqual(rqlst.as_string(),
- 'Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, '
- 'EXISTS((C identity A) OR (C in_state D, E identity A, '
- 'E in_state D, D name "subscribed"), D is State, E is CWUser)')
+ rewrite(rqlst, {('C', 'X'): (constraint,)}, {'u': 1})
+ self.assertEqual(
+ rqlst.as_string(),
+ 'Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, '
+ 'EXISTS((C identity A) OR (C in_state D, E identity A, '
+ 'E in_state D, D name "subscribed"), D is State, E is CWUser)')
def test_simplified_rqlst(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
- rqlst = parse(u'Any 2') # this is the simplified rql st for Any X WHERE X eid 12
+ 'P name "read", P require_group G')
+ rqlst = parse(u'Any 2') # this is the simplified rql st for Any X WHERE X eid 12
rewrite(rqlst, {('2', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any 2 WHERE B eid %(C)s, '
- 'EXISTS(2 in_state A, B in_group D, E require_state A, '
- 'E name "read", E require_group D, A is State, D is CWGroup, E is CWPermission)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any 2 WHERE B eid %(C)s, '
+ 'EXISTS(2 in_state A, B in_group D, E require_state A, '
+ 'E name "read", E require_group D, A is State, D is CWGroup, E is CWPermission)')
def test_optional_var_1(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
+ 'P name "read", P require_group G')
rqlst = parse(u'Any A,C WHERE A documented_by C?')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any A,C WHERE A documented_by C?, A is Affaire '
- 'WITH C BEING '
- '(Any C WHERE EXISTS(C in_state B, D in_group F, G require_state B, G name "read", '
- 'G require_group F), D eid %(A)s, C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,C WHERE A documented_by C?, A is Affaire '
+ 'WITH C BEING '
+ '(Any C WHERE EXISTS(C in_state B, D in_group F, G require_state B, G name "read", '
+ 'G require_group F), D eid %(A)s, C is Card)')
def test_optional_var_2(self):
constraint = ('X in_state S, U in_group G, P require_state S,'
- 'P name "read", P require_group G')
+ 'P name "read", P require_group G')
rqlst = parse(u'Any A,C,T WHERE A documented_by C?, C title T')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any A,C,T WHERE A documented_by C?, A is Affaire '
- 'WITH C,T BEING '
- '(Any C,T WHERE C title T, EXISTS(C in_state B, D in_group F, '
- 'G require_state B, G name "read", G require_group F), '
- 'D eid %(A)s, C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,C,T WHERE A documented_by C?, A is Affaire '
+ 'WITH C,T BEING '
+ '(Any C,T WHERE C title T, EXISTS(C in_state B, D in_group F, '
+ 'G require_state B, G name "read", G require_group F), '
+ 'D eid %(A)s, C is Card)')
def test_optional_var_3(self):
constraint1 = ('X in_state S, U in_group G, P require_state S,'
@@ -196,12 +216,14 @@
constraint2 = 'X in_state S, S name "public"'
rqlst = parse(u'Any A,C,T WHERE A documented_by C?, C title T')
rewrite(rqlst, {('C', 'X'): (constraint1, constraint2)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any A,C,T WHERE A documented_by C?, A is Affaire '
- 'WITH C,T BEING (Any C,T WHERE C title T, '
- '(EXISTS(C in_state B, D in_group F, G require_state B, G name "read", G require_group F)) '
- 'OR (EXISTS(C in_state E, E name "public")), '
- 'D eid %(A)s, C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,C,T WHERE A documented_by C?, A is Affaire '
+ 'WITH C,T BEING (Any C,T WHERE C title T, '
+ '(EXISTS(C in_state B, D in_group F, G require_state B, '
+ 'G name "read", G require_group F)) '
+ 'OR (EXISTS(C in_state E, E name "public")), '
+ 'D eid %(A)s, C is Card)')
def test_optional_var_4(self):
constraint1 = 'A created_by U, X documented_by A'
@@ -211,29 +233,38 @@
rewrite(rqlst, {('LA', 'X'): (constraint1, constraint2),
('X', 'X'): (constraint3,),
('Y', 'X'): (constraint3,)}, {})
- self.assertEqual(rqlst.as_string(),
- u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y, B eid %(C)s, '
- 'EXISTS(X created_by B), EXISTS(Y created_by B), '
- 'X is Card, Y is IN(Division, Note, Societe) '
- 'WITH LA BEING (Any LA WHERE (EXISTS(A created_by B, LA documented_by A)) OR (EXISTS(E created_by B, LA concerne E)), '
- 'B eid %(D)s, LA is Affaire)')
-
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any X,LA,Y WHERE LA? documented_by X, LA concerne Y, B eid %(C)s, '
+ 'EXISTS(X created_by B), EXISTS(Y created_by B), '
+ 'X is Card, Y is IN(Division, Note, Societe) '
+ 'WITH LA BEING (Any LA WHERE (EXISTS(A created_by B, LA documented_by A)) '
+ 'OR (EXISTS(E created_by B, LA concerne E)), '
+ 'B eid %(D)s, LA is Affaire)')
def test_ambiguous_optional_same_exprs(self):
"""See #3013535"""
# see test of the same name in RewriteFullTC: original problem is
# unreproducible here because it actually lies in
# RQLRewriter.insert_local_checks
- rqlst = parse(u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s, X creation_date CD')
- rewrite(rqlst, {('X', 'X'): ('X created_by U',),}, {'a': 3})
- self.assertEqual(rqlst.as_string(),
- u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s WITH X,CD BEING (Any X,CD WHERE X creation_date CD, EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
+ rqlst = parse(u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, '
+ 'A eid %(a)s, X creation_date CD')
+ rewrite(rqlst, {('X', 'X'): ('X created_by U',)}, {'a': 3})
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,AR,X,CD WHERE A concerne X?, A ref AR, A eid %(a)s '
+ 'WITH X,CD BEING (Any X,CD WHERE X creation_date CD, '
+ 'EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
def test_ambiguous_optional_same_exprs_constant(self):
- rqlst = parse(u'Any A,AR,X WHERE A concerne X?, A ref AR, A eid %(a)s, X creation_date TODAY')
- rewrite(rqlst, {('X', 'X'): ('X created_by U',),}, {'a': 3})
- self.assertEqual(rqlst.as_string(),
- u'Any A,AR,X WHERE A concerne X?, A ref AR, A eid %(a)s WITH X BEING (Any X WHERE X creation_date TODAY, EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
+ rqlst = parse(u'Any A,AR,X WHERE A concerne X?, A ref AR, '
+ 'A eid %(a)s, X creation_date TODAY')
+ rewrite(rqlst, {('X', 'X'): ('X created_by U',)}, {'a': 3})
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A,AR,X WHERE A concerne X?, A ref AR, A eid %(a)s '
+ 'WITH X BEING (Any X WHERE X creation_date TODAY, '
+ 'EXISTS(X created_by B), B eid %(A)s, X is IN(Division, Note, Societe))')
def test_optional_var_inlined(self):
c1 = ('X require_permission P')
@@ -243,12 +274,13 @@
('A', 'X'): (c2,),
}, {})
# XXX suboptimal
- self.assertEqual(rqlst.as_string(),
- "Any C,A,R WITH A,C,R BEING "
- "(Any A,C,R WHERE A? inlined_card C, A ref R, "
- "(A is NULL) OR (EXISTS(A inlined_card B, B require_permission D, "
- "B is Card, D is CWPermission)), "
- "A is Affaire, C is Card, EXISTS(C require_permission E, E is CWPermission))")
+ self.assertEqual(
+ rqlst.as_string(),
+ "Any C,A,R WITH A,C,R BEING "
+ "(Any A,C,R WHERE A? inlined_card C, A ref R, "
+ "(A is NULL) OR (EXISTS(A inlined_card B, B require_permission D, "
+ "B is Card, D is CWPermission)), "
+ "A is Affaire, C is Card, EXISTS(C require_permission E, E is CWPermission))")
# def test_optional_var_inlined_has_perm(self):
# c1 = ('X require_permission P')
@@ -263,7 +295,8 @@
def test_optional_var_inlined_imbricated_error(self):
c1 = ('X require_permission P')
c2 = ('X inlined_card O, O require_permission P')
- rqlst = parse(u'Any C,A,R,A2,R2 WHERE A? inlined_card C, A ref R,A2? inlined_card C, A2 ref R2')
+ rqlst = parse(u'Any C,A,R,A2,R2 WHERE A? inlined_card C, A ref R,'
+ 'A2? inlined_card C, A2 ref R2')
self.assertRaises(BadSchemaDefinition,
rewrite, rqlst, {('C', 'X'): (c1,),
('A', 'X'): (c2,),
@@ -272,7 +305,6 @@
def test_optional_var_inlined_linked(self):
c1 = ('X require_permission P')
- c2 = ('X inlined_card O, O require_permission P')
rqlst = parse(u'Any A,W WHERE A inlined_card C?, C inlined_note N, '
'N inlined_affaire W')
rewrite(rqlst, {('C', 'X'): (c1,)}, {})
@@ -309,6 +341,7 @@
self.assertEqual(rqlst.as_string(),
'Any C WHERE C in_state STATE?, C is Card, '
'EXISTS(STATE name "hop"), STATE is State')
+
def test_relation_optimization_2_rhs(self):
snippet = ('TW? subworkflow_exit X, TW name "hop"')
rqlst = parse(u'SubWorkflowExitPoint EXIT WHERE C? subworkflow_exit EXIT')
@@ -376,10 +409,11 @@
def test_unsupported_constraint_2(self):
trinfo_constraint = ('X wf_info_for Y, Y require_permission P, P name "read"')
rqlst = parse(u'Any U,T WHERE U is CWUser, T wf_info_for U')
- rewrite(rqlst, {('T', 'X'): (trinfo_constraint, 'X wf_info_for Y, Y in_group G, G name "managers"')}, {})
+ rewrite(rqlst, {('T', 'X'): (trinfo_constraint,
+ 'X wf_info_for Y, Y in_group G, G name "managers"')}, {})
self.assertEqual(rqlst.as_string(),
u'Any U,T WHERE U is CWUser, T wf_info_for U, '
- 'EXISTS(U in_group B, B name "managers", B is CWGroup), T is TrInfo')
+ u'EXISTS(U in_group B, B name "managers", B is CWGroup), T is TrInfo')
def test_unsupported_constraint_3(self):
self.skipTest('raise unauthorized for now')
@@ -393,17 +427,20 @@
constraint = ('X concerne Y')
rqlst = parse(u'Affaire X')
rewrite(rqlst, {('X', 'X'): (constraint,)}, {})
- self.assertEqual(rqlst.as_string(),
- u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
+ self.assertEqual(
+ rqlst.as_string(),
+ u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) "
+ "OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")
def test_add_ambiguity_outerjoin(self):
constraint = ('X concerne Y')
rqlst = parse(u'Any X,C WHERE X? documented_by C')
rewrite(rqlst, {('X', 'X'): (constraint,)}, {})
# ambiguity are kept in the sub-query, no need to be resolved using OR
- self.assertEqual(rqlst.as_string(),
- u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE EXISTS(X concerne A), X is Affaire)")
-
+ self.assertEqual(
+ rqlst.as_string(),
+ u"Any X,C WHERE X? documented_by C, C is Card "
+ "WITH X BEING (Any X WHERE EXISTS(X concerne A), X is Affaire)")
def test_rrqlexpr_nonexistant_subject_1(self):
constraint = RRQLExpression('S owned_by U')
@@ -432,26 +469,33 @@
'Any C WHERE C is Card, B eid %(D)s, EXISTS(A owned_by B, A is Card)')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SOU')
- self.assertEqual(rqlst.as_string(),
- 'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ 'Any C WHERE C is Card, A eid %(B)s, EXISTS(C owned_by A, D owned_by A, D is Card)')
def test_rrqlexpr_nonexistant_subject_3(self):
constraint = RRQLExpression('U in_group G, G name "users"')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, A eid %(B)s, '
+ 'EXISTS(A in_group D, D name "users", D is CWGroup)')
def test_rrqlexpr_nonexistant_subject_4(self):
constraint = RRQLExpression('U in_group G, G name "users", S owned_by U')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'SU')
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, A eid %(B)s, '
+ 'EXISTS(A in_group D, D name "users", C owned_by A, D is CWGroup)')
rqlst = parse(u'Card C')
rewrite(rqlst, {('C', 'S'): (constraint,)}, {}, 'OU')
- self.assertEqual(rqlst.as_string(),
- u'Any C WHERE C is Card, A eid %(B)s, EXISTS(A in_group D, D name "users", D is CWGroup)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any C WHERE C is Card, A eid %(B)s, '
+ 'EXISTS(A in_group D, D name "users", D is CWGroup)')
def test_rrqlexpr_nonexistant_subject_5(self):
constraint = RRQLExpression('S owned_by Z, O owned_by Z, O is Card')
@@ -464,22 +508,28 @@
constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
rqlst = parse(u'Affaire A WHERE NOT EXISTS(A documented_by C)')
rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE NOT EXISTS(A documented_by C, '
+ 'EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
def test_rqlexpr_not_relation_1_2(self):
constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
rqlst = parse(u'Affaire A WHERE NOT EXISTS(A documented_by C)')
rewrite(rqlst, {('A', 'X'): (constraint,)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE NOT EXISTS(A documented_by C, C is Card), A is Affaire, EXISTS(A owned_by B, B login "hop", B is CWUser)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE NOT EXISTS(A documented_by C, C is Card), A is Affaire, '
+ 'EXISTS(A owned_by B, B login "hop", B is CWUser)')
def test_rqlexpr_not_relation_2(self):
constraint = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
rqlst = rqlhelper.parse(u'Affaire A WHERE NOT A documented_by C', annotate=False)
rewrite(rqlst, {('C', 'X'): (constraint,)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE NOT EXISTS(A documented_by C, EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE NOT EXISTS(A documented_by C, '
+ 'EXISTS(C owned_by B, B login "hop", B is CWUser), C is Card), A is Affaire')
def test_rqlexpr_multiexpr_outerjoin(self):
c1 = ERQLExpression('X owned_by Z, Z login "hop"', 'X')
@@ -487,11 +537,12 @@
c3 = ERQLExpression('X owned_by Z, Z login "momo"', 'X')
rqlst = rqlhelper.parse(u'Any A WHERE A documented_by C?', annotate=False)
rewrite(rqlst, {('C', 'X'): (c1, c2, c3)}, {}, 'X')
- self.assertEqual(rqlst.as_string(),
- u'Any A WHERE A documented_by C?, A is Affaire '
- 'WITH C BEING (Any C WHERE ((EXISTS(C owned_by B, B login "hop")) '
- 'OR (EXISTS(C owned_by D, D login "momo"))) '
- 'OR (EXISTS(C owned_by A, A login "hip")), C is Card)')
+ self.assertEqual(
+ rqlst.as_string(),
+ u'Any A WHERE A documented_by C?, A is Affaire '
+ 'WITH C BEING (Any C WHERE ((EXISTS(C owned_by B, B login "hop")) '
+ 'OR (EXISTS(C owned_by D, D login "momo"))) '
+ 'OR (EXISTS(C owned_by A, A login "hip")), C is Card)')
def test_multiple_erql_one_bad(self):
#: reproduce bug #2236985
@@ -547,7 +598,6 @@
'OR (EXISTS(B require_permission P, B is IN(Card, Note), S name "state1")), '
'P is CWPermission, S is State')
-from cubicweb.devtools.testlib import CubicWebTC
class RewriteFullTC(CubicWebTC):
appid = 'data-rewrite'
@@ -556,7 +606,7 @@
if args is None:
args = {}
querier = self.repo.querier
- union = parse(rql) # self.vreg.parse(rql, annotate=True)
+ union = parse(rql) # self.vreg.parse(rql, annotate=True)
with self.admin_access.repo_cnx() as cnx:
self.vreg.solutions(cnx, union, args)
querier._annotate(union)
@@ -590,7 +640,6 @@
union = self.process('Any A,AR,X,CD WHERE A concerne X?, A ref AR, X creation_date CD')
self.assertEqual(union.as_string(), 'not generated today')
-
def test_xxxx(self):
edef1 = self.schema['Societe']
edef2 = self.schema['Division']
@@ -608,12 +657,11 @@
def test_question_mark_attribute_snippet(self):
# see #3661918
- from cubicweb.rqlrewrite import RQLRewriter
- from logilab.common.decorators import monkeypatch
repotest.undo_monkey_patch()
orig_insert_snippets = RQLRewriter.insert_snippets
# patch insert_snippets and not rewrite, insert_snippets is already
# monkey patches (see above setupModule/repotest)
+
@monkeypatch(RQLRewriter)
def insert_snippets(self, snippets, varexistsmap=None):
# crash occurs if snippets are processed in a specific order, force
@@ -676,21 +724,21 @@
'C role D, D name "illustrator")',
rqlst.as_string())
-
def test_rewrite2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B, C require_permission R, S'
'require_state O')
rule_rewrite(rqlst, rules)
- self.assertEqual('Any A,B WHERE C require_permission R, S require_state O, '
- 'D is Contribution, D contributor A, D manifestation B, D role E, '
- 'E name "illustrator"',
- rqlst.as_string())
+ self.assertEqual(
+ 'Any A,B WHERE C require_permission R, S require_state O, '
+ 'D is Contribution, D contributor A, D manifestation B, D role E, '
+ 'E name "illustrator"',
+ rqlst.as_string())
def test_rewrite3(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE E require_permission T, A illustrator_of B')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE E require_permission T, '
@@ -700,7 +748,7 @@
def test_rewrite4(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE C require_permission R, A illustrator_of B')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE C require_permission R, '
@@ -710,7 +758,7 @@
def test_rewrite5(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE C require_permission R, A illustrator_of B, '
'S require_state O')
rule_rewrite(rqlst, rules)
@@ -722,7 +770,7 @@
# Tests for the with clause
def test_rewrite_with(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WITH A,B BEING '
@@ -732,8 +780,9 @@
def test_rewrite_with2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
- rqlst = rqlhelper.parse(u'Any A,B WHERE T require_permission C WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
+ 'C manifestation O, C role R, R name "illustrator"'}
+ rqlst = rqlhelper.parse(u'Any A,B WHERE T require_permission C '
+ 'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE T require_permission C '
'WITH A,B BEING (Any X,Y WHERE A is Contribution, '
@@ -751,32 +800,34 @@
def test_rewrite_with4(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'Any A,B WHERE A illustrator_of B '
- 'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
+ 'WITH A, B BEING(Any X, Y WHERE X illustrator_of Y)')
rule_rewrite(rqlst, rules)
- self.assertEqual('Any A,B WHERE C is Contribution, '
- 'C contributor A, C manifestation B, C role D, '
- 'D name "illustrator" WITH A,B BEING '
- '(Any X,Y WHERE A is Contribution, A contributor X, '
- 'A manifestation Y, A role B, B name "illustrator")',
- rqlst.as_string())
+ self.assertEqual(
+ 'Any A,B WHERE C is Contribution, '
+ 'C contributor A, C manifestation B, C role D, '
+ 'D name "illustrator" WITH A,B BEING '
+ '(Any X,Y WHERE A is Contribution, A contributor X, '
+ 'A manifestation Y, A role B, B name "illustrator")',
+ rqlst.as_string())
# Tests for the union
def test_rewrite_union(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B) UNION'
'(Any X,Y WHERE X is CWUser, Z manifestation Y)')
rule_rewrite(rqlst, rules)
- self.assertEqual('(Any A,B WHERE C is Contribution, '
- 'C contributor A, C manifestation B, C role D, '
- 'D name "illustrator") UNION (Any X,Y WHERE X is CWUser, Z manifestation Y)',
- rqlst.as_string())
+ self.assertEqual(
+ '(Any A,B WHERE C is Contribution, '
+ 'C contributor A, C manifestation B, C role D, '
+ 'D name "illustrator") UNION (Any X,Y WHERE X is CWUser, Z manifestation Y)',
+ rqlst.as_string())
def test_rewrite_union2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any Y WHERE Y match W) UNION '
'(Any A WHERE A illustrator_of B) UNION '
'(Any Y WHERE Y is ArtWork)')
@@ -790,9 +841,9 @@
# Tests for the exists clause
def test_rewrite_exists(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B, '
- 'EXISTS(B is ArtWork))')
+ 'EXISTS(B is ArtWork))')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE EXISTS(B is ArtWork), '
'C is Contribution, C contributor A, C manifestation B, C role D, '
@@ -801,7 +852,7 @@
def test_rewrite_exists2(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE B contributor A, EXISTS(A illustrator_of W))')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE B contributor A, '
@@ -811,7 +862,7 @@
def test_rewrite_exists3(self):
rules = {'illustrator_of': 'C is Contribution, C contributor S, '
- 'C manifestation O, C role R, R name "illustrator"'}
+ 'C manifestation O, C role R, R name "illustrator"'}
rqlst = rqlhelper.parse(u'(Any A,B WHERE A illustrator_of B, EXISTS(A illustrator_of W))')
rule_rewrite(rqlst, rules)
self.assertEqual('Any A,B WHERE EXISTS(C is Contribution, C contributor A, '
@@ -823,13 +874,14 @@
# Test for GROUPBY
def test_rewrite_groupby(self):
rules = {'participated_in': 'S contributor O'}
- rqlst = rqlhelper.parse(u'Any SUM(SA) GROUPBY S WHERE P participated_in S, P manifestation SA')
+ rqlst = rqlhelper.parse(u'Any SUM(SA) GROUPBY S '
+ 'WHERE P participated_in S, P manifestation SA')
rule_rewrite(rqlst, rules)
self.assertEqual('Any SUM(SA) GROUPBY S WHERE P manifestation SA, P contributor S',
rqlst.as_string())
-class RQLRelationRewriterTC(CubicWebTC):
+class RQLRelationRewriterCWTC(CubicWebTC):
appid = 'data-rewrite'
@@ -838,8 +890,8 @@
art = cnx.create_entity('ArtWork', name=u'Les travailleurs de la Mer')
role = cnx.create_entity('Role', name=u'illustrator')
vic = cnx.create_entity('Person', name=u'Victor Hugo')
- contrib = cnx.create_entity('Contribution', code=96, contributor=vic,
- manifestation=art, role=role)
+ cnx.create_entity('Contribution', code=96, contributor=vic,
+ manifestation=art, role=role)
rset = cnx.execute('Any X WHERE X illustrator_of S')
self.assertEqual([u'Victor Hugo'],
[result.name for result in rset.entities()])