# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""unit tests for selectors mechanism"""fromoperatorimporteq,lt,le,gtfromcontextlibimportcontextmanagerfromsix.movesimportrangefromlogilab.common.testlibimportTestCase,unittest_mainfromlogilab.common.decoratorsimportclear_cachefromcubicwebimportBinaryfromcubicweb.devtools.testlibimportCubicWebTCfromcubicweb.predicatesimport(is_instance,adaptable,match_kwargs,match_user_groups,multi_lines_rset,score_entity,is_in_state,rql_condition,relation_possible,match_form_params,paginated_rset)fromcubicweb.selectorsimporton_transition# XXX on_transition is deprecatedfromcubicweb.viewimportEntityAdapterfromcubicweb.webimportactionclassImplementsTC(CubicWebTC):deftest_etype_priority(self):withself.admin_access.web_request()asreq:f=req.create_entity('FakeFile',data_name=u'hop.txt',data=Binary(b'hop'),data_format=u'text/plain')rset=f.as_rset()anyscore=is_instance('Any')(f.__class__,req,rset=rset)idownscore=adaptable('IDownloadable')(f.__class__,req,rset=rset)self.assertTrue(idownscore>anyscore,(idownscore,anyscore))filescore=is_instance('FakeFile')(f.__class__,req,rset=rset)self.assertTrue(filescore>idownscore,(filescore,idownscore))deftest_etype_inheritance_no_yams_inheritance(self):cls=self.vreg['etypes'].etype_class('Personne')withself.admin_access.web_request()asreq:self.assertFalse(is_instance('Societe').score_class(cls,req))deftest_yams_inheritance(self):cls=self.vreg['etypes'].etype_class('Transition')withself.admin_access.web_request()asreq:self.assertEqual(is_instance('BaseTransition').score_class(cls,req),3)deftest_outer_join(self):withself.admin_access.web_request()asreq:rset=req.execute('Any U,B WHERE B? bookmarked_by U, U login "anon"')self.assertEqual(is_instance('Bookmark')(None,req,rset=rset,row=0,col=1),0)classWorkflowSelectorTC(CubicWebTC):defsetUp(self):super(WorkflowSelectorTC,self).setUp()# enable debug mode to state/transition validation on the flyself.vreg.config.debugmode=TruedeftearDown(self):self.vreg.config.debugmode=Falsesuper(WorkflowSelectorTC,self).tearDown()defsetup_database(self):withself.admin_access.shell()asshell:wf=shell.add_workflow("wf_test",'StateFull',default=True)created=wf.add_state('created',initial=True)validated=wf.add_state('validated')abandoned=wf.add_state('abandoned')wf.add_transition('validate',created,validated,('managers',))wf.add_transition('forsake',(created,validated,),abandoned,('managers',))@contextmanagerdefstatefull_stuff(self):withself.admin_access.web_request()asreq:wf_entity=req.create_entity('StateFull',name=u'')rset=wf_entity.as_rset()adapter=wf_entity.cw_adapt_to('IWorkflowable')req.cnx.commit()self.assertEqual(adapter.state,'created')yieldreq,wf_entity,rset,adapterdeftest_is_in_state(self):withself.statefull_stuff()as(req,wf_entity,rset,adapter):forstatein('created','validated','abandoned'):selector=is_in_state(state)self.assertEqual(selector(None,req,rset=rset),state=="created")adapter.fire_transition('validate')req.cnx.commit();wf_entity.cw_clear_all_caches()self.assertEqual(adapter.state,'validated')clear_cache(rset,'get_entity')selector=is_in_state('created')self.assertEqual(selector(None,req,rset=rset),0)selector=is_in_state('validated')self.assertEqual(selector(None,req,rset=rset),1)selector=is_in_state('validated','abandoned')self.assertEqual(selector(None,req,rset=rset),1)selector=is_in_state('abandoned')self.assertEqual(selector(None,req,rset=rset),0)adapter.fire_transition('forsake')req.cnx.commit();wf_entity.cw_clear_all_caches()self.assertEqual(adapter.state,'abandoned')clear_cache(rset,'get_entity')selector=is_in_state('created')self.assertEqual(selector(None,req,rset=rset),0)selector=is_in_state('validated')self.assertEqual(selector(None,req,rset=rset),0)selector=is_in_state('validated','abandoned')self.assertEqual(selector(None,req,rset=rset),1)self.assertEqual(adapter.state,'abandoned')self.assertEqual(selector(None,req,rset=rset),1)deftest_is_in_state_unvalid_names(self):withself.statefull_stuff()as(req,wf_entity,rset,adapter):selector=is_in_state("unknown")withself.assertRaises(ValueError)ascm:selector(None,req,rset=rset)self.assertEqual(str(cm.exception),"wf_test: unknown state(s): unknown")selector=is_in_state("weird","unknown","created","weird")withself.assertRaises(ValueError)ascm:selector(None,req,rset=rset)self.assertEqual(str(cm.exception),"wf_test: unknown state(s): unknown,weird")deftest_on_transition(self):withself.statefull_stuff()as(req,wf_entity,rset,adapter):fortransitionin('validate','forsake'):selector=on_transition(transition)self.assertEqual(selector(None,req,rset=rset),0)adapter.fire_transition('validate')req.cnx.commit();wf_entity.cw_clear_all_caches()self.assertEqual(adapter.state,'validated')clear_cache(rset,'get_entity')selector=on_transition("validate")self.assertEqual(selector(None,req,rset=rset),1)selector=on_transition("validate","forsake")self.assertEqual(selector(None,req,rset=rset),1)selector=on_transition("forsake")self.assertEqual(selector(None,req,rset=rset),0)adapter.fire_transition('forsake')req.cnx.commit();wf_entity.cw_clear_all_caches()self.assertEqual(adapter.state,'abandoned')clear_cache(rset,'get_entity')selector=on_transition("validate")self.assertEqual(selector(None,req,rset=rset),0)selector=on_transition("validate","forsake")self.assertEqual(selector(None,req,rset=rset),1)selector=on_transition("forsake")self.assertEqual(selector(None,req,rset=rset),1)deftest_on_transition_unvalid_names(self):withself.statefull_stuff()as(req,wf_entity,rset,adapter):selector=on_transition("unknown")withself.assertRaises(ValueError)ascm:selector(None,req,rset=rset)self.assertEqual(str(cm.exception),"wf_test: unknown transition(s): unknown")selector=on_transition("weird","unknown","validate","weird")withself.assertRaises(ValueError)ascm:selector(None,req,rset=rset)self.assertEqual(str(cm.exception),"wf_test: unknown transition(s): unknown,weird")deftest_on_transition_with_no_effect(self):"""selector will not be triggered with `change_state()`"""withself.statefull_stuff()as(req,wf_entity,rset,adapter):adapter.change_state('validated')req.cnx.commit();wf_entity.cw_clear_all_caches()self.assertEqual(adapter.state,'validated')selector=on_transition("validate")self.assertEqual(selector(None,req,rset=rset),0)selector=on_transition("validate","forsake")self.assertEqual(selector(None,req,rset=rset),0)selector=on_transition("forsake")self.assertEqual(selector(None,req,rset=rset),0)classRelationPossibleTC(CubicWebTC):deftest_rqlst_1(self):withself.admin_access.web_request()asreq:selector=relation_possible('in_group')select=self.vreg.parse(req,'Any X WHERE X is CWUser').children[0]score=selector(None,req,rset=1,select=select,filtered_variable=select.defined_vars['X'])self.assertEqual(score,1)deftest_rqlst_2(self):withself.admin_access.web_request()asreq:selector=relation_possible('in_group')select=self.vreg.parse(req,'Any 1, COUNT(X) WHERE X is CWUser, X creation_date XD, ''Y creation_date YD, Y is CWGroup ''HAVING DAY(XD)=DAY(YD)').children[0]score=selector(None,req,rset=1,select=select,filtered_variable=select.defined_vars['X'])self.assertEqual(score,1)deftest_ambiguous(self):# Ambiguous relations are :# (Service, fabrique_par, Personne) and (Produit, fabrique_par, Usine)# There used to be a crash here with a bad rdef choice in the strict# checking case.selector=relation_possible('fabrique_par',role='object',target_etype='Personne',strict=True)withself.admin_access.web_request()asreq:usine=req.create_entity('Usine',lieu=u'here')score=selector(None,req,rset=usine.as_rset())self.assertEqual(0,score)classMatchUserGroupsTC(CubicWebTC):deftest_owners_group(self):"""tests usage of 'owners' group with match_user_group"""classSomeAction(action.Action):__regid__='yo'category='foo'__select__=match_user_groups('owners')self.vreg._loadedmods[__name__]={}self.vreg.register(SomeAction)SomeAction.__registered__(self.vreg['actions'])self.assertTrue(SomeActioninself.vreg['actions']['yo'],self.vreg['actions'])try:withself.admin_access.web_request()asreq:self.create_user(req,'john')# login as a simple userjohn_access=self.new_access('john')withjohn_access.web_request()asreq:# it should not be possible to use SomeAction not owned objectsrset=req.execute('Any G WHERE G is CWGroup, G name "managers"')self.assertFalse('yo'indict(self.pactions(req,rset)))# insert a new card, and check that we can use SomeAction on our objectreq.execute('INSERT Card C: C title "zoubidou"')req.cnx.commit()withjohn_access.web_request()asreq:rset=req.execute('Card C WHERE C title "zoubidou"')self.assertTrue('yo'indict(self.pactions(req,rset)),self.pactions(req,rset))# make sure even managers can't use the actionwithself.admin_access.web_request()asreq:rset=req.execute('Card C WHERE C title "zoubidou"')self.assertFalse('yo'indict(self.pactions(req,rset)))finally:delself.vreg[SomeAction.__registry__][SomeAction.__regid__]classMultiLinesRsetTC(CubicWebTC):defsetup_database(self):withself.admin_access.web_request()asreq:req.execute('INSERT CWGroup G: G name "group1"')req.execute('INSERT CWGroup G: G name "group2"')req.cnx.commit()deftest_default_op_in_selector(self):withself.admin_access.web_request()asreq:rset=req.execute('Any G WHERE G is CWGroup')expected=len(rset)selector=multi_lines_rset(expected)self.assertEqual(selector(None,req,rset=rset),1)self.assertEqual(selector(None,req,None),0)selector=multi_lines_rset(expected+1)self.assertEqual(selector(None,req,rset=rset),0)self.assertEqual(selector(None,req,None),0)selector=multi_lines_rset(expected-1)self.assertEqual(selector(None,req,rset=rset),0)self.assertEqual(selector(None,req,None),0)deftest_without_rset(self):withself.admin_access.web_request()asreq:rset=req.execute('Any G WHERE G is CWGroup')expected=len(rset)selector=multi_lines_rset(expected)self.assertEqual(selector(None,req,None),0)selector=multi_lines_rset(expected+1)self.assertEqual(selector(None,req,None),0)selector=multi_lines_rset(expected-1)self.assertEqual(selector(None,req,None),0)deftest_with_operators(self):withself.admin_access.web_request()asreq:rset=req.execute('Any G WHERE G is CWGroup')expected=len(rset)# Format 'expected', 'operator', 'assert'testdata=((expected,eq,1),(expected+1,eq,0),(expected-1,eq,0),(expected,le,1),(expected+1,le,1),(expected-1,le,0),(expected-1,gt,1),(expected,gt,0),(expected+1,gt,0),(expected+1,lt,1),(expected,lt,0),(expected-1,lt,0))for(expected,operator,assertion)intestdata:selector=multi_lines_rset(expected,operator)yieldself.assertEqual,selector(None,req,rset=rset),assertionclassMatchKwargsTC(TestCase):deftest_match_kwargs_default(self):selector=match_kwargs(set(('a','b')))self.assertEqual(selector(None,None,a=1,b=2),2)self.assertEqual(selector(None,None,a=1),0)self.assertEqual(selector(None,None,c=1),0)self.assertEqual(selector(None,None,a=1,c=1),0)deftest_match_kwargs_any(self):selector=match_kwargs(set(('a','b')),mode='any')self.assertEqual(selector(None,None,a=1,b=2),2)self.assertEqual(selector(None,None,a=1),1)self.assertEqual(selector(None,None,c=1),0)self.assertEqual(selector(None,None,a=1,c=1),1)classScoreEntityTC(CubicWebTC):deftest_intscore_entity_selector(self):withself.admin_access.web_request()asreq:rset=req.execute('Any E WHERE E eid 1')selector=score_entity(lambdax:None)self.assertEqual(selector(None,req,rset=rset),0)selector=score_entity(lambdax:"something")self.assertEqual(selector(None,req,rset=rset),1)selector=score_entity(lambdax:object)self.assertEqual(selector(None,req,rset=rset),1)rset=req.execute('Any G LIMIT 2 WHERE G is CWGroup')selector=score_entity(lambdax:10)self.assertEqual(selector(None,req,rset=rset),20)selector=score_entity(lambdax:10,mode='any')self.assertEqual(selector(None,req,rset=rset),10)deftest_rql_condition_entity(self):withself.admin_access.web_request()asreq:selector=rql_condition('X identity U')rset=req.user.as_rset()self.assertEqual(selector(None,req,rset=rset),1)self.assertEqual(selector(None,req,entity=req.user),1)self.assertEqual(selector(None,req),0)deftest_rql_condition_user(self):withself.admin_access.web_request()asreq:selector=rql_condition('U login "admin"',user_condition=True)self.assertEqual(selector(None,req),1)selector=rql_condition('U login "toto"',user_condition=True)self.assertEqual(selector(None,req),0)classAdaptablePredicateTC(CubicWebTC):deftest_multiple_entity_types_rset(self):classCWUserIWhatever(EntityAdapter):__regid__='IWhatever'__select__=is_instance('CWUser')classCWGroupIWhatever(EntityAdapter):__regid__='IWhatever'__select__=is_instance('CWGroup')withself.temporary_appobjects(CWUserIWhatever,CWGroupIWhatever):withself.admin_access.web_request()asreq:selector=adaptable('IWhatever')rset=req.execute('Any X WHERE X is IN(CWGroup, CWUser)')self.assertTrue(selector(None,req,rset=rset))classMatchFormParamsTC(CubicWebTC):"""tests for match_form_params predicate"""deftest_keyonly_match(self):"""test standard usage: ``match_form_params('param1', 'param2')`` ``param1`` and ``param2`` must be specified in request's form. """web_request=self.admin_access.web_requestvid_selector=match_form_params('vid')vid_subvid_selector=match_form_params('vid','subvid')# no parameter => KO,KOwithweb_request()asreq:self.assertEqual(vid_selector(None,req),0)self.assertEqual(vid_subvid_selector(None,req),0)# one expected parameter found => OK,KOwithweb_request(vid='foo')asreq:self.assertEqual(vid_selector(None,req),1)self.assertEqual(vid_subvid_selector(None,req),0)# all expected parameters found => OK,OKwithweb_request(vid='foo',subvid='bar')asreq:self.assertEqual(vid_selector(None,req),1)self.assertEqual(vid_subvid_selector(None,req),2)deftest_keyvalue_match_one_parameter(self):"""test dict usage: ``match_form_params(param1=value1)`` ``param1`` must be specified in the request's form and its value must be ``value1``. """web_request=self.admin_access.web_request# test both positional and named parametersvid_selector=match_form_params(vid='foo')# no parameter => should failwithweb_request()asreq:self.assertEqual(vid_selector(None,req),0)# expected parameter found with expected value => OKwithweb_request(vid='foo',subvid='bar')asreq:self.assertEqual(vid_selector(None,req),1)# expected parameter found but value is incorrect => KOwithweb_request(vid='bar')asreq:self.assertEqual(vid_selector(None,req),0)deftest_keyvalue_match_two_parameters(self):"""test dict usage: ``match_form_params(param1=value1, param2=value2)`` ``param1`` and ``param2`` must be specified in the request's form and their respective value must be ``value1`` and ``value2``. """web_request=self.admin_access.web_requestvid_subvid_selector=match_form_params(vid='list',subvid='tsearch')# missing one expected parameter => KOwithweb_request(vid='list')asreq:self.assertEqual(vid_subvid_selector(None,req),0)# expected parameters found but values are incorrect => KOwithweb_request(vid='list',subvid='foo')asreq:self.assertEqual(vid_subvid_selector(None,req),0)# expected parameters found and values are correct => OKwithweb_request(vid='list',subvid='tsearch')asreq:self.assertEqual(vid_subvid_selector(None,req),2)deftest_keyvalue_multiple_match(self):"""test dict usage with multiple values i.e. as in ``match_form_params(param1=('value1', 'value2'))`` ``param1`` must be specified in the request's form and its value must be either ``value1`` or ``value2``. """web_request=self.admin_access.web_requestvid_subvid_selector=match_form_params(vid='list',subvid=('tsearch','listitem'))# expected parameters found and values correct => OKwithweb_request(vid='list',subvid='tsearch')asreq:self.assertEqual(vid_subvid_selector(None,req),2)withweb_request(vid='list',subvid='listitem')asreq:self.assertEqual(vid_subvid_selector(None,req),2)# expected parameters found but values are incorrect => OKwithweb_request(vid='list',subvid='foo')asreq:self.assertEqual(vid_subvid_selector(None,req),0)deftest_invalid_calls(self):"""checks invalid calls raise a ValueError"""# mixing named and positional arguments should failwithself.assertRaises(ValueError)ascm:match_form_params('list',x='1',y='2')self.assertEqual(str(cm.exception),"match_form_params() can't be called with both ""positional and named arguments")# using a dict as first and unique argument should failwithself.assertRaises(ValueError)ascm:match_form_params({'x':1})self.assertEqual(str(cm.exception),"match_form_params() positional arguments must be strings")classPaginatedTC(CubicWebTC):"""tests for paginated_rset predicate"""defsetup_database(self):withself.admin_access.repo_cnx()ascnx:foriinrange(30):cnx.create_entity('CWGroup',name=u"group%d"%i)cnx.commit()deftest_paginated_rset(self):default_nb_pages=1web_request=self.admin_access.web_requestwithweb_request()asreq:rset=req.execute('Any G WHERE G is CWGroup')self.assertEqual(len(rset),34)withweb_request(vid='list',page_size='10')asreq:self.assertEqual(paginated_rset()(None,req,rset),default_nb_pages)withweb_request(vid='list',page_size='20')asreq:self.assertEqual(paginated_rset()(None,req,rset),default_nb_pages)withweb_request(vid='list',page_size='50')asreq:self.assertEqual(paginated_rset()(None,req,rset),0)withweb_request(vid='list',page_size='10/')asreq:self.assertEqual(paginated_rset()(None,req,rset),0)withweb_request(vid='list',page_size='.1')asreq:self.assertEqual(paginated_rset()(None,req,rset),0)withweb_request(vid='list',page_size='not_an_int')asreq:self.assertEqual(paginated_rset()(None,req,rset),0)if__name__=='__main__':unittest_main()