65 rqlhelper.simplify(rqlst, needcopy) |
65 rqlhelper.simplify(rqlst, needcopy) |
66 return rewriter_cls(mock_object(vreg=FakeVReg, user=(mock_object(eid=1)))) |
66 return rewriter_cls(mock_object(vreg=FakeVReg, user=(mock_object(eid=1)))) |
67 |
67 |
68 def rewrite(rqlst, snippets_map, kwargs, existingvars=None): |
68 def rewrite(rqlst, snippets_map, kwargs, existingvars=None): |
69 rewriter = _prepare_rewriter(rqlrewrite.RQLRewriter, kwargs) |
69 rewriter = _prepare_rewriter(rqlrewrite.RQLRewriter, kwargs) |
|
70 # turn {(V1, V2): constraints} into [(varmap, constraints)] |
70 snippets = [] |
71 snippets = [] |
|
72 snippet_varmap = {} |
71 for v, exprs in sorted(snippets_map.items()): |
73 for v, exprs in sorted(snippets_map.items()): |
72 rqlexprs = [isinstance(snippet, string_types) |
74 rqlexprs = [] |
73 and mock_object(snippet_rqlst=parse(u'Any X WHERE '+snippet).children[0], |
75 varmap = dict([v]) |
74 expression=u'Any X WHERE '+snippet) |
76 for snippet in exprs: |
75 or snippet |
77 # when the same snippet is impacting several variables, group them |
76 for snippet in exprs] |
78 # unless there is some conflicts on the snippet's variable name (we |
77 snippets.append((dict([v]), rqlexprs)) |
79 # only want that for constraint on relations using both S and O) |
|
80 if snippet in snippet_varmap and not ( |
|
81 set(varmap.values()) & set(snippet_varmap[snippet].values())): |
|
82 snippet_varmap[snippet].update(varmap) |
|
83 continue |
|
84 snippet_varmap[snippet] = varmap |
|
85 if isinstance(snippet, string_types): |
|
86 snippet = mock_object(snippet_rqlst=parse(u'Any X WHERE ' + snippet).children[0], |
|
87 expression=u'Any X WHERE ' + snippet) |
|
88 rqlexprs.append(snippet) |
|
89 if rqlexprs: |
|
90 snippets.append((varmap, rqlexprs)) |
|
91 |
78 rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs) |
92 rqlhelper.compute_solutions(rqlst.children[0], {'eid': eid_func_map}, kwargs=kwargs) |
79 rewriter.rewrite(rqlst.children[0], snippets, kwargs, existingvars) |
93 rewriter.rewrite(rqlst.children[0], snippets, kwargs, existingvars) |
80 check_vrefs(rqlst.children[0]) |
94 check_vrefs(rqlst.children[0]) |
81 return rewriter.rewritten |
95 return rewriter.rewritten |
82 |
96 |
500 rqlst = parse(u'Any O WHERE S use_email O, S is CWUser, O is_instance_of EmailAddress') |
514 rqlst = parse(u'Any O WHERE S use_email O, S is CWUser, O is_instance_of EmailAddress') |
501 rewrite(rqlst, {('S', 'X'): (user_expr,)}, {}) |
515 rewrite(rqlst, {('S', 'X'): (user_expr,)}, {}) |
502 self.assertEqual(rqlst.as_string(), |
516 self.assertEqual(rqlst.as_string(), |
503 'Any O WHERE S use_email O, S is CWUser, O is EmailAddress, ' |
517 'Any O WHERE S use_email O, S is CWUser, O is EmailAddress, ' |
504 'EXISTS(NOT S in_group A, A name "guests", A is CWGroup)') |
518 'EXISTS(NOT S in_group A, A name "guests", A is CWGroup)') |
|
519 |
|
520 def test_ambiguous_constraint_not_exists(self): |
|
521 state_constraint = ( |
|
522 'NOT EXISTS(A require_permission S) ' |
|
523 'OR EXISTS(B require_permission S, B is Card, O name "state1")' |
|
524 'OR EXISTS(C require_permission S, C is Note, O name "state2")' |
|
525 ) |
|
526 rqlst = parse(u'Any P WHERE NOT P require_state S') |
|
527 rewrite(rqlst, {('P', 'S'): (state_constraint,), ('S', 'O'): (state_constraint,)}, {}) |
|
528 self.assertMultiLineEqual( |
|
529 rqlst.as_string(), |
|
530 u'Any P WHERE NOT P require_state S, ' |
|
531 'EXISTS(((NOT EXISTS(A require_permission P, A is IN(Card, Note)))' |
|
532 ' OR (EXISTS(B require_permission P, B is Card, S name "state1")))' |
|
533 ' OR (EXISTS(C require_permission P, C is Note, S name "state2"))), ' |
|
534 'P is CWPermission, S is State') |
|
535 |
|
536 def test_ambiguous_using_is_in_function(self): |
|
537 state_constraint = ( |
|
538 'NOT EXISTS(A require_permission S) ' |
|
539 'OR EXISTS(B require_permission S, B is IN (Card, Note), O name "state1")' |
|
540 ) |
|
541 rqlst = parse(u'Any P WHERE NOT P require_state S') |
|
542 rewrite(rqlst, {('P', 'S'): (state_constraint,), ('S', 'O'): (state_constraint,)}, {}) |
|
543 self.assertMultiLineEqual( |
|
544 rqlst.as_string(), |
|
545 u'Any P WHERE NOT P require_state S, ' |
|
546 'EXISTS((NOT EXISTS(A require_permission P, A is IN(Card, Note))) ' |
|
547 'OR (EXISTS(B require_permission P, B is IN(Card, Note), S name "state1"))), ' |
|
548 'P is CWPermission, S is State') |
505 |
549 |
506 from cubicweb.devtools.testlib import CubicWebTC |
550 from cubicweb.devtools.testlib import CubicWebTC |
507 |
551 |
508 class RewriteFullTC(CubicWebTC): |
552 class RewriteFullTC(CubicWebTC): |
509 appid = 'data-rewrite' |
553 appid = 'data-rewrite' |