"""some utilities to ease repository testingThis module contains functions to initialize a new repository.:organization: Logilab:copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"frompprintimportpprintdeftuplify(list):foriinrange(len(list)):iftype(list[i])isnottype(()):list[i]=tuple(list[i])returnlistdefsnippet_cmp(a,b):a=(a[0],[e.expressionforeina[1]])b=(b[0],[e.expressionforeinb[1]])returncmp(a,b)deftest_plan(self,rql,expected,kwargs=None):plan=self._prepare_plan(rql,kwargs)self.planner.build_plan(plan)try:self.assertEquals(len(plan.steps),len(expected),'expected %s steps, got %s'%(len(expected),len(plan.steps)))# step order is importantfori,stepinenumerate(plan.steps):compare_steps(self,step.test_repr(),expected[i])exceptAssertionError:pprint([step.test_repr()forstepinplan.steps])raisedefcompare_steps(self,step,expected):try:self.assertEquals(step[0],expected[0],'expected step type %s, got %s'%(expected[0],step[0]))iflen(step)>2andisinstance(step[1],list)andisinstance(expected[1],list):queries,equeries=step[1],expected[1]self.assertEquals(len(queries),len(equeries),'expected %s queries, got %s'%(len(equeries),len(queries)))fori,(rql,sol)inenumerate(queries):self.assertEquals(rql,equeries[i][0])self.assertEquals(sol,equeries[i][1])idx=2else:idx=1self.assertEquals(step[idx:-1],expected[idx:-1],'expected step characteristic \n%s\n, got\n%s'%(expected[1:-1],step[1:-1]))self.assertEquals(len(step[-1]),len(expected[-1]),'got %s child steps, expected %s'%(len(step[-1]),len(expected[-1])))exceptAssertionError:print'error on step ',pprint(step[:-1])raisechildren=step[-1]ifstep[0]in('UnionFetchStep','UnionStep'):# sort childrenchildren=sorted(children)expectedchildren=sorted(expected[-1])else:expectedchildren=expected[-1]fori,substepinenumerate(children):compare_steps(self,substep,expectedchildren[i])classDumbOrderedDict(list):def__iter__(self):returnself.iterkeys()def__contains__(self,key):returnkeyinself.iterkeys()def__getitem__(self,key):forkey_,valueinself.iteritems():ifkey==key_:returnvalueraiseKeyError(key)defiterkeys(self):return(xforx,yinlist.__iter__(self))defiteritems(self):return(xforxinlist.__iter__(self))fromlogilab.common.testlibimportTestCasefromrqlimportRQLHelperfromcubicweb.devtools.fakeimportFakeRepo,FakeSessionfromcubicweb.serverimportset_debugfromcubicweb.server.querierimportQuerierHelperfromcubicweb.server.sessionimportSessionfromcubicweb.server.sources.rql2sqlimportremove_unused_solutionsclassRQLGeneratorTC(TestCase):schema=None# set this in concret testdefsetUp(self):self.rqlhelper=RQLHelper(self.schema,special_relations={'eid':'uid','has_text':'fti'})self.qhelper=QuerierHelper(FakeRepo(self.schema),self.schema)ExecutionPlan._check_permissions=_dummy_check_permissionsrqlannotation._select_principal=_select_principaldeftearDown(self):ExecutionPlan._check_permissions=_orig_check_permissionsrqlannotation._select_principal=_orig_select_principaldef_prepare(self,rql):#print '******************** prepare', rqlunion=self.rqlhelper.parse(rql)#print '********* parsed', union.as_string()self.rqlhelper.compute_solutions(union)#print '********* solutions', solutionsself.rqlhelper.simplify(union)#print '********* simplified', union.as_string()plan=self.qhelper.plan_factory(union,{},FakeSession())plan.preprocess(union)forselectinunion.children:select.solutions.sort()#print '********* ppsolutions', solutionsreturnunionclassBaseQuerierTC(TestCase):repo=None# set this in concret testdefsetUp(self):self.o=self.repo.querierself.session=self.repo._sessions.values()[0]self.ueid=self.session.user.eidassertself.ueid!=-1self.repo._type_source_cache={}# clear cacheself.pool=self.session.set_pool()self.maxeid=self.get_max_eid()do_monkey_patch()defget_max_eid(self):returnself.session.unsafe_execute('Any MAX(X)')[0][0]defcleanup(self):self.session.unsafe_execute('DELETE Any X WHERE X eid > %s'%self.maxeid)deftearDown(self):undo_monkey_patch()self.session.rollback()self.cleanup()self.commit()self.repo._free_pool(self.pool)assertself.session.user.eid!=-1defset_debug(self,debug):set_debug(debug)def_rqlhelper(self):rqlhelper=self.o._rqlhelper# reset uid_func so it don't try to get type from eidsrqlhelper._analyser.uid_func=Nonerqlhelper._analyser.uid_func_mapping={}returnrqlhelperdef_prepare_plan(self,rql,kwargs=None):rqlhelper=self._rqlhelper()rqlst=rqlhelper.parse(rql)rqlhelper.compute_solutions(rqlst,kwargs=kwargs)rqlhelper.simplify(rqlst)forselectinrqlst.children:select.solutions.sort()returnself.o.plan_factory(rqlst,kwargs,self.session)def_prepare(self,rql,kwargs=None):plan=self._prepare_plan(rql,kwargs)plan.preprocess(plan.rqlst)rqlst=plan.rqlst.children[0]rqlst.solutions=remove_unused_solutions(rqlst,rqlst.solutions,{},self.repo.schema)[0]returnrqlstdef_user_session(self,groups=('guests',),ueid=None):# use self.session.user.eid to get correct owned_by relation, unless explicit eidifueidisNone:ueid=self.session.user.eidu=self.repo._build_user(self.session,ueid)u._groups=set(groups)s=Session(u,self.repo)s._threaddata.pool=self.poolreturnu,sdefexecute(self,rql,args=None,eid_key=None,build_descr=True):returnself.o.execute(self.session,rql,args,eid_key,build_descr)defcommit(self):self.session.commit()self.session.set_pool()classBasePlannerTC(BaseQuerierTC):def_prepare_plan(self,rql,kwargs=None):rqlst=self.o.parse(rql,annotate=True)self.o.solutions(self.session,rqlst,kwargs)ifrqlst.TYPE=='select':self.o._rqlhelper.annotate(rqlst)forselectinrqlst.children:select.solutions.sort()else:rqlst.solutions.sort()returnself.o.plan_factory(rqlst,kwargs,self.session)# monkey patch some methods to get predicatable results #######################fromcubicweb.server.rqlrewriteimportRQLRewriter_orig_insert_snippets=RQLRewriter.insert_snippets_orig_build_variantes=RQLRewriter.build_variantesdef_insert_snippets(self,snippets,varexistsmap=None):_orig_insert_snippets(self,sorted(snippets,snippet_cmp),varexistsmap)def_build_variantes(self,newsolutions):variantes=_orig_build_variantes(self,newsolutions)sortedvariantes=[]forvarianteinvariantes:orderedkeys=sorted((k[1],k[2],v)fork,vinvariante.iteritems())variante=DumbOrderedDict(sorted(variante.iteritems(),lambdaa,b:cmp((a[0][1],a[0][2],a[1]),(b[0][1],b[0][2],b[1]))))sortedvariantes.append((orderedkeys,variante))return[vforok,vinsorted(sortedvariantes)]fromcubicweb.server.querierimportExecutionPlan_orig_check_permissions=ExecutionPlan._check_permissions_orig_init_temp_table=ExecutionPlan.init_temp_tabledef_check_permissions(*args,**kwargs):res,restricted=_orig_check_permissions(*args,**kwargs)res=DumbOrderedDict(sorted(res.iteritems(),lambdaa,b:cmp(a[1],b[1])))returnres,restricteddef_dummy_check_permissions(self,rqlst):return{():rqlst.solutions},set()def_init_temp_table(self,table,selection,solution):ifself.tablesinorderisNone:tablesinorder=self.tablesinorder={}else:tablesinorder=self.tablesinorderifnottableintablesinorder:tablesinorder[table]='table%s'%len(tablesinorder)return_orig_init_temp_table(self,table,selection,solution)fromcubicweb.serverimportrqlannotation_orig_select_principal=rqlannotation._select_principaldef_select_principal(scope,relations):return_orig_select_principal(scope,relations,_sort=lambdarels:sorted(rels,key=lambdax:x.r_type))try:fromcubicweb.server.msplannerimportPartPlanInformationexceptImportError:classPartPlanInformation(object):defmerge_input_maps(*args):passdef_choose_var(self,sourcevars):pass_orig_merge_input_maps=PartPlanInformation.merge_input_maps_orig_choose_var=PartPlanInformation._choose_vardef_merge_input_maps(*args):returnsorted(_orig_merge_input_maps(*args))def_choose_var(self,sourcevars):# predictable order for test purposedefget_key(x):try:# variablereturnx.nameexceptAttributeError:try:# relationreturnx.r_typeexceptAttributeError:# constreturnx.valuevarsinorder=sorted(sourcevars,key=get_key)iflen(self._sourcesvars)>1:forvarinvarsinorder:ifnotvar.scopeisself.rqlst:returnvar,sourcevars.pop(var)else:forvarinvarsinorder:ifvar.scopeisself.rqlst:returnvar,sourcevars.pop(var)var=varsinorder[0]returnvar,sourcevars.pop(var)defdo_monkey_patch():RQLRewriter.insert_snippets=_insert_snippetsRQLRewriter.build_variantes=_build_variantesExecutionPlan._check_permissions=_check_permissionsExecutionPlan.tablesinorder=NoneExecutionPlan.init_temp_table=_init_temp_tablePartPlanInformation.merge_input_maps=_merge_input_mapsPartPlanInformation._choose_var=_choose_vardefundo_monkey_patch():RQLRewriter.insert_snippets=_orig_insert_snippetsRQLRewriter.build_variantes=_orig_build_variantesExecutionPlan._check_permissions=_orig_check_permissionsExecutionPlan.init_temp_table=_orig_init_temp_tablePartPlanInformation.merge_input_maps=_orig_merge_input_mapsPartPlanInformation._choose_var=_orig_choose_var