""":organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""fromlogilab.common.testlibimportunittest_main,TestCasefromlogilab.common.testlibimportmock_objectfromrqlimportparse,nodes,RQLHelperfromcubicwebimportUnauthorizedfromcubicweb.rqlrewriteimportRQLRewriterfromcubicweb.devtoolsimportrepotest,TestServerConfigurationconfig=TestServerConfiguration('data/rewrite')config.bootstrap_cubes()schema=config.load_schema()schema.add_relation_def(mock_object(subject='Card',name='in_state',object='State',cardinality='1*'))rqlhelper=RQLHelper(schema,special_relations={'eid':'uid','has_text':'fti'})defsetup_module(*args):repotest.do_monkey_patch()defteardown_module(*args):repotest.undo_monkey_patch()defeid_func_map(eid):return{1:'CWUser',2:'Card'}[eid]defrewrite(rqlst,snippets_map,kwargs):classFakeVReg:schema=schema@staticmethoddefsolutions(sqlcursor,mainrqlst,kwargs):rqlhelper.compute_solutions(rqlst,{'eid':eid_func_map},kwargs=kwargs)classrqlhelper:@staticmethoddefannotate(rqlst):rqlhelper.annotate(rqlst)@staticmethoddefsimplify(mainrqlst,needcopy=False):rqlhelper.simplify(rqlst,needcopy)rewriter=RQLRewriter(mock_object(vreg=FakeVReg,user=(mock_object(eid=1))))forv,snippetsinsnippets_map.items():snippets_map[v]=[mock_object(snippet_rqlst=parse('Any X WHERE '+snippet).children[0],expression='Any X WHERE '+snippet)forsnippetinsnippets]rqlhelper.compute_solutions(rqlst.children[0],{'eid':eid_func_map},kwargs=kwargs)solutions=rqlst.children[0].solutionsrewriter.rewrite(rqlst.children[0],snippets_map.items(),solutions,kwargs)test_vrefs(rqlst.children[0])returnrewriter.rewrittendeftest_vrefs(node):vrefmap={}forvrefinnode.iget_nodes(nodes.VariableRef):vrefmap.setdefault(vref.name,set()).add(vref)forvarinnode.defined_vars.itervalues():assertnot(var.stinfo['references']^vrefmap[var.name])assert(var.stinfo['references'])classRQLRewriteTC(TestCase):"""a faire: * optimisation: detecter les relations utilisees dans les rqlexpressions qui sont presentes dans la requete de depart pour les reutiliser si possible * "has_<ACTION>_permission" ? """deftest_base_var(self):card_constraint=('X in_state S, U in_group G, P require_state S,''P name "read", P require_group G')rqlst=parse('Card C')rewrite(rqlst,{('C','X'):(card_constraint,)},{})self.failUnlessEqual(rqlst.as_string(),u"Any C WHERE C is Card, B eid %(D)s, ""EXISTS(C in_state A, B in_group E, F require_state A, ""F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission)")deftest_multiple_var(self):card_constraint=('X in_state S, U in_group G, P require_state S,''P name "read", P require_group G')affaire_constraints=('X ref LIKE "PUBLIC%"','U in_group G, G name "public"')kwargs={'u':2}rqlst=parse('Any S WHERE S documented_by C, C eid %(u)s')rewrite(rqlst,{('C','X'):(card_constraint,),('S','X'):affaire_constraints},kwargs)self.assertTextEquals(rqlst.as_string(),"Any S WHERE S documented_by C, C eid %(u)s, B eid %(D)s, ""EXISTS(C in_state A, B in_group E, F require_state A, ""F name 'read', F require_group E, A is State, E is CWGroup, F is CWPermission), ""(EXISTS(S ref LIKE 'PUBLIC%')) OR (EXISTS(B in_group G, G name 'public', G is CWGroup)), ""S is Affaire")self.failUnless('D'inkwargs)deftest_or(self):constraint='(X identity U) OR (X in_state ST, CL identity U, CL in_state ST, ST name "subscribed")'rqlst=parse('Any S WHERE S owned_by C, C eid %(u)s, S is in (CWUser, CWGroup)')rewrite(rqlst,{('C','X'):(constraint,)},{'u':1})self.failUnlessEqual(rqlst.as_string(),"Any S WHERE S owned_by C, C eid %(u)s, S is IN(CWUser, CWGroup), A eid %(B)s, ""EXISTS((C identity A) OR (C in_state D, E identity A, ""E in_state D, D name 'subscribed'), D is State, E is CWUser)")deftest_simplified_rqlst(self):card_constraint=('X in_state S, U in_group G, P require_state S,''P name "read", P require_group G')rqlst=parse('Any 2')# this is the simplified rql st for Any X WHERE X eid 12rewrite(rqlst,{('2','X'):(card_constraint,)},{})self.failUnlessEqual(rqlst.as_string(),u"Any 2 WHERE B eid %(C)s, ""EXISTS(2 in_state A, B in_group D, E require_state A, ""E name 'read', E require_group D, A is State, D is CWGroup, E is CWPermission)")deftest_optional_var(self):card_constraint=('X in_state S, U in_group G, P require_state S,''P name "read", P require_group G')rqlst=parse('Any A,C WHERE A documented_by C?')rewrite(rqlst,{('C','X'):(card_constraint,)},{})self.failUnlessEqual(rqlst.as_string(),"Any A,C WHERE A documented_by C?, A is Affaire ""WITH C BEING ""(Any C WHERE C in_state B, D in_group F, G require_state B, G name 'read', ""G require_group F, D eid %(A)s, C is Card)")rqlst=parse('Any A,C,T WHERE A documented_by C?, C title T')rewrite(rqlst,{('C','X'):(card_constraint,)},{})self.failUnlessEqual(rqlst.as_string(),"Any A,C,T WHERE A documented_by C?, A is Affaire ""WITH C,T BEING ""(Any C,T WHERE C in_state B, D in_group F, G require_state B, G name 'read', ""G require_group F, C title T, D eid %(A)s, C is Card)")deftest_relation_optimization(self):# since Card in_state State as monovalued cardinality, the in_state# relation used in the rql expression can be ignored and S replaced by# the variable from the incoming querycard_constraint=('X in_state S, U in_group G, P require_state S,''P name "read", P require_group G')rqlst=parse('Card C WHERE C in_state STATE')rewrite(rqlst,{('C','X'):(card_constraint,)},{})self.failUnlessEqual(rqlst.as_string(),u"Any C WHERE C in_state STATE, C is Card, A eid %(B)s, ""EXISTS(A in_group D, E require_state STATE, ""E name 'read', E require_group D, D is CWGroup, E is CWPermission), ""STATE is State")deftest_unsupported_constraint_1(self):# CWUser doesn't have require_permissiontrinfo_constraint=('X wf_info_for Y, Y require_permission P, P name "read"')rqlst=parse('Any U,T WHERE U is CWUser, T wf_info_for U')self.assertRaises(Unauthorized,rewrite,rqlst,{('T','X'):(trinfo_constraint,)},{})deftest_unsupported_constraint_2(self):trinfo_constraint=('X wf_info_for Y, Y require_permission P, P name "read"')rqlst=parse('Any U,T WHERE U is CWUser, T wf_info_for U')rewrite(rqlst,{('T','X'):(trinfo_constraint,'X wf_info_for Y, Y in_group G, G name "managers"')},{})self.failUnlessEqual(rqlst.as_string(),u"Any U,T WHERE U is CWUser, T wf_info_for U, ""EXISTS(U in_group B, B name 'managers', B is CWGroup), T is TrInfo")deftest_unsupported_constraint_3(self):self.skip('raise unauthorized for now')trinfo_constraint=('X wf_info_for Y, Y require_permission P, P name "read"')rqlst=parse('Any T WHERE T wf_info_for X')rewrite(rqlst,{('T','X'):(trinfo_constraint,'X in_group G, G name "managers"')},{})self.failUnlessEqual(rqlst.as_string(),u'XXX dunno what should be generated')deftest_add_ambiguity_exists(self):constraint=('X concerne Y')rqlst=parse('Affaire X')rewrite(rqlst,{('X','X'):(constraint,)},{})self.failUnlessEqual(rqlst.as_string(),u"Any X WHERE X is Affaire, ((EXISTS(X concerne A, A is Division)) OR (EXISTS(X concerne C, C is Societe))) OR (EXISTS(X concerne B, B is Note))")deftest_add_ambiguity_outerjoin(self):constraint=('X concerne Y')rqlst=parse('Any X,C WHERE X? documented_by C')rewrite(rqlst,{('X','X'):(constraint,)},{})# ambiguity are kept in the sub-query, no need to be resolved using ORself.failUnlessEqual(rqlst.as_string(),u"Any X,C WHERE X? documented_by C, C is Card WITH X BEING (Any X WHERE X concerne A, X is Affaire)")if__name__=='__main__':unittest_main()