[rql2sql test] sqlserver use 0/1 for booleans, not FALSE/TRUE
# -*- coding: iso-8859-1 -*-# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""This modules defines func / methods for creating test repositories"""__docformat__="restructuredtext en"fromrandomimportrandint,choicefromcopyimportdeepcopyfromdatetimeimportdatetime,date,time,timedeltafromdecimalimportDecimalfromlogilab.commonimportattrdictfromyams.constraintsimport(SizeConstraint,StaticVocabularyConstraint,IntervalBoundConstraint,BoundaryConstraint,Attribute,actual_value)fromrql.utilsimportdecompose_b26asbase_decompose_b26fromcubicwebimportBinaryfromcubicweb.schemaimportRQLConstraintdefcustom_range(start,stop,step):whilestart<stop:yieldstartstart+=stepdefdecompose_b26(index,ascii=False):"""return a letter (base-26) decomposition of index"""ifascii:returnbase_decompose_b26(index)returnbase_decompose_b26(index,u'�abcdefghijklmnopqrstuvwxyz')defget_max_length(eschema,attrname):"""returns the maximum length allowed for 'attrname'"""forcstineschema.rdef(attrname).constraints:ifisinstance(cst,SizeConstraint)andcst.max:returncst.maxreturn300#raise AttributeError('No Size constraint on attribute "%s"' % attrname)_GENERATED_VALUES={}class_ValueGenerator(object):"""generates integers / dates / strings / etc. to fill a DB table"""def__init__(self,eschema,choice_func=None):"""<choice_func> is a function that returns a list of possible choices for a given entity type and an attribute name. It should looks like : def values_for(etype, attrname): # some stuff ... return alist_of_acceptable_values # or None """self.choice_func=choice_funcself.eschema=eschemadefgenerate_attribute_value(self,entity,attrname,index=1,**kwargs):ifattrnameinentity:returnentity[attrname]eschema=self.eschemaifnoteschema.has_unique_values(attrname):value=self.__generate_value(entity,attrname,index,**kwargs)else:value=self.__generate_value(entity,attrname,index,**kwargs)whilevaluein_GENERATED_VALUES.get((eschema,attrname),()):index+=1value=self.__generate_value(entity,attrname,index,**kwargs)_GENERATED_VALUES.setdefault((eschema,attrname),set()).add(value)entity[attrname]=valuereturnvaluedef__generate_value(self,entity,attrname,index,**kwargs):"""generates a consistent value for 'attrname'"""eschema=self.eschemaattrtype=str(eschema.destination(attrname)).lower()# Before calling generate_%s functions, try to find values domainifself.choice_funcisnotNone:values_domain=self.choice_func(eschema,attrname)ifvalues_domainisnotNone:returnchoice(values_domain)gen_func=getattr(self,'generate_%s_%s'%(eschema,attrname),getattr(self,'generate_Any_%s'%attrname,None))ifgen_funcisnotNone:returngen_func(entity,index,**kwargs)# If no specific values domain, then generate a dummy valuegen_func=getattr(self,'generate_%s'%(attrtype))returngen_func(entity,attrname,index,**kwargs)defgenerate_string(self,entity,attrname,index,format=None):"""generates a consistent value for 'attrname' if it's a string"""# First try to get choiceschoosed=self.get_choice(entity,attrname)ifchoosedisnotNone:returnchoosed# All other case, generate a default stringattrlength=get_max_length(self.eschema,attrname)num_len=numlen(index)ifnum_len>=attrlength:ascii=self.eschema.rdef(attrname).internationalizablereturn('&'+decompose_b26(index,ascii))[:attrlength]# always use plain text when no format is specifiedattrprefix=attrname[:max(attrlength-num_len-1,0)]ifformat=='text/html':value=u'<span>�%s<b>%d</b></span>'%(attrprefix,index)elifformat=='text/rest':value=u"""title-----* %s* %d* �&"""%(attrprefix,index)else:value=u'�&%s%d'%(attrprefix,index)returnvalue[:attrlength]defgenerate_password(self,entity,attrname,index):"""generates a consistent value for 'attrname' if it's a password"""returnu'toto'defgenerate_integer(self,entity,attrname,index):"""generates a consistent value for 'attrname' if it's an integer"""returnself._constrained_generate(entity,attrname,0,1,index)generate_int=generate_integerdefgenerate_float(self,entity,attrname,index):"""generates a consistent value for 'attrname' if it's a float"""returnself._constrained_generate(entity,attrname,0.0,1.0,index)defgenerate_decimal(self,entity,attrname,index):"""generates a consistent value for 'attrname' if it's a float"""returnDecimal(str(self.generate_float(entity,attrname,index)))defgenerate_datetime(self,entity,attrname,index):"""generates a random date (format is 'yyyy-mm-dd HH:MM')"""base=datetime(randint(2000,2004),randint(1,12),randint(1,28),11,index%60)returnself._constrained_generate(entity,attrname,base,timedelta(hours=1),index)generate_tzdatetime=generate_datetime# XXX implementation should add a timezonedefgenerate_date(self,entity,attrname,index):"""generates a random date (format is 'yyyy-mm-dd')"""base=date(randint(2000,2010),1,1)+timedelta(randint(1,365))returnself._constrained_generate(entity,attrname,base,timedelta(days=1),index)defgenerate_interval(self,entity,attrname,index):"""generates a random date (format is 'yyyy-mm-dd')"""base=timedelta(randint(1,365))returnself._constrained_generate(entity,attrname,base,timedelta(days=1),index)defgenerate_time(self,entity,attrname,index):"""generates a random time (format is ' HH:MM')"""returntime(11,index%60)#'11:%02d' % (index % 60)generate_tztime=generate_time# XXX implementation should add a timezonedefgenerate_bytes(self,entity,attrname,index,format=None):fakefile=Binary("%s%s"%(attrname,index))fakefile.filename=u"file_%s"%attrnamereturnfakefiledefgenerate_boolean(self,entity,attrname,index):"""generates a consistent value for 'attrname' if it's a boolean"""returnindex%2==0def_constrained_generate(self,entity,attrname,base,step,index):choosed=self.get_choice(entity,attrname)ifchoosedisnotNone:returnchoosed# ensure index > 0index+=1minvalue,maxvalue=self.get_bounds(entity,attrname)ifmaxvalueisNone:ifminvalueisnotNone:base=max(minvalue,base)maxvalue=base+index*stepifminvalueisNone:minvalue=maxvalue-(index*step)# i.e. randint(-index, 0)returnchoice(list(custom_range(minvalue,maxvalue,step)))def_actual_boundary(self,entity,attrname,boundary):ifisinstance(boundary,Attribute):# ensure we've a value for this attributeentity[attrname]=None# infinite loop safety beltifnotboundary.attrinentity:self.generate_attribute_value(entity,boundary.attr)boundary=actual_value(boundary,entity)returnboundarydefget_bounds(self,entity,attrname):minvalue=maxvalue=Noneforcstinself.eschema.rdef(attrname).constraints:ifisinstance(cst,IntervalBoundConstraint):minvalue=self._actual_boundary(entity,attrname,cst.minvalue)maxvalue=self._actual_boundary(entity,attrname,cst.maxvalue)elifisinstance(cst,BoundaryConstraint):ifcst.operator[0]=='<':maxvalue=self._actual_boundary(entity,attrname,cst.boundary)else:minvalue=self._actual_boundary(entity,attrname,cst.boundary)returnminvalue,maxvaluedefget_choice(self,entity,attrname):"""generates a consistent value for 'attrname' if it has some static vocabulary set, else return None. """forcstinself.eschema.rdef(attrname).constraints:ifisinstance(cst,StaticVocabularyConstraint):returnunicode(choice(cst.vocabulary()))returnNone# XXX nothing to do heredefgenerate_Any_data_format(self,entity,index,**kwargs):# data_format attribute of File has no vocabulary constraint, we# need this method else stupid values will be set which make mtconverter# raise exceptionreturnu'application/octet-stream'defgenerate_Any_content_format(self,entity,index,**kwargs):# content_format attribute of EmailPart has no vocabulary constraint, we# need this method else stupid values will be set which make mtconverter# raise exceptionreturnu'text/plain'classautoextend(type):def__new__(mcs,name,bases,classdict):forattrname,attrvalueinclassdict.items():ifcallable(attrvalue):ifattrname.startswith('generate_')and \attrvalue.func_code.co_argcount<2:raiseTypeError('generate_xxx must accept at least 1 argument')setattr(_ValueGenerator,attrname,attrvalue)returntype.__new__(mcs,name,bases,classdict)classValueGenerator(_ValueGenerator):__metaclass__=autoextenddef_default_choice_func(etype,attrname):"""default choice_func for insert_entity_queries"""returnNonedefinsert_entity_queries(etype,schema,vreg,entity_num,choice_func=_default_choice_func):"""returns a list of 'add entity' queries (couples query, args) :type etype: str :param etype: the entity's type :type schema: cubicweb.schema.Schema :param schema: the instance schema :type entity_num: int :param entity_num: the number of entities to insert XXX FIXME: choice_func is here for *historical* reasons, it should probably replaced by a nicer way to specify choices :type choice_func: function :param choice_func: a function that takes an entity type, an attrname and returns acceptable values for this attribute """queries=[]forindexinxrange(entity_num):restrictions=[]args={}forattrname,valueinmake_entity(etype,schema,vreg,index,choice_func).items():restrictions.append('X %s%%(%s)s'%(attrname,attrname))args[attrname]=valueifrestrictions:queries.append(('INSERT %s X: %s'%(etype,', '.join(restrictions)),args))assertnot'eid'inargs,argselse:queries.append(('INSERT %s X'%etype,{}))returnqueriesdefmake_entity(etype,schema,vreg,index=0,choice_func=_default_choice_func,form=False):"""generates a random entity and returns it as a dict by default, generate an entity to be inserted in the repository elif form, generate an form dictionnary to be given to a web controller """eschema=schema.eschema(etype)valgen=ValueGenerator(eschema,choice_func)entity=attrdict()# preprocessing to deal with _format fieldsattributes=[]relatedfields={}forrschema,attrschemaineschema.attribute_definitions():attrname=rschema.typeifattrname=='eid':# don't specify eids !continueifattrname.endswith('_format')andattrname[:-7]ineschema.subject_relations():relatedfields[attrname[:-7]]=attrschemaelse:attributes.append((attrname,attrschema))forattrname,attrschemainattributes:ifattrnameinrelatedfields:# first generate a format and record itformat=valgen.generate_attribute_value(entity,attrname+'_format',index)# then a value coherent with this formatvalue=valgen.generate_attribute_value(entity,attrname,index,format=format)else:value=valgen.generate_attribute_value(entity,attrname,index)ifform:# need to encode valuesifattrschema.type=='Bytes':# twisted wayfakefile=valuefilename=value.filenamevalue=(filename,u"text/plain",fakefile)elifattrschema.type=='Date':value=value.strftime(vreg.property_value('ui.date-format'))elifattrschema.type=='Datetime':value=value.strftime(vreg.property_value('ui.datetime-format'))elifattrschema.type=='Time':value=value.strftime(vreg.property_value('ui.time-format'))elifattrschema.type=='Float':fmt=vreg.property_value('ui.float-format')value=fmt%valueelse:value=unicode(value)returnentitydefselect(constraints,cursor,selectvar='O',objtype=None):"""returns list of eids matching <constraints> <selectvar> should be either 'O' or 'S' to match schema definitions """try:rql='Any %s WHERE %s'%(selectvar,constraints)ifobjtype:rql+=', %s is %s'%(selectvar,objtype)rset=cursor.execute(rql)except:print"could restrict eid_list with given constraints (%r)"%constraintsreturn[]returnset(eidforeid,inrset.rows)defmake_relations_queries(schema,edict,cursor,ignored_relations=(),existingrels=None):"""returns a list of generated RQL queries for relations :param schema: The instance schema :param e_dict: mapping between etypes and eids :param ignored_relations: list of relations to ignore (i.e. don't try to generate insert queries for these relations) """gen=RelationsQueriesGenerator(schema,cursor,existingrels)returngen.compute_queries(edict,ignored_relations)defcomposite_relation(rschema):forobjinrschema.objects():ifobj.rdef(rschema,'object').composite=='subject':returnTrueforobjinrschema.subjects():ifobj.rdef(rschema,'subject').composite=='object':returnTruereturnFalseclassRelationsQueriesGenerator(object):rql_tmpl='SET S %s O WHERE S eid %%(subjeid)s, O eid %%(objeid)s'def__init__(self,schema,cursor,existing=None):self.schema=schemaself.cursor=cursorself.existingrels=existingor{}defcompute_queries(self,edict,ignored_relations):queries=[]# 1/ skip final relations and explictly ignored relationsrels=sorted([rschemaforrschemainself.schema.relations()ifnot(rschema.finalorrschemainignored_relations)],key=lambdax:notcomposite_relation(x))# for each relation# 2/ take each possible couple (subj, obj)# 3/ analyze cardinality of relation# a/ if relation is mandatory, insert one relation# b/ else insert N relations where N is the mininum# of 20 and the number of existing targetable entitiesforrschemainrels:sym=set()sedict=deepcopy(edict)oedict=deepcopy(edict)delayed=[]# for each couple (subjschema, objschema), insert relationsforsubj,objinrschema.rdefs:sym.add((subj,obj))ifrschema.symmetricand(obj,subj)insym:continuesubjcard,objcard=rschema.rdef(subj,obj).cardinality# process mandatory relations firstifsubjcardin'1+'orobjcardin'1+'orcomposite_relation(rschema):forquery,argsinself.make_relation_queries(sedict,oedict,rschema,subj,obj):yieldquery,argselse:delayed.append((subj,obj))forsubj,objindelayed:forquery,argsinself.make_relation_queries(sedict,oedict,rschema,subj,obj):yieldquery,argsdefqargs(self,subjeids,objeids,subjcard,objcard,subjeid,objeid):ifsubjcardin'?1+':subjeids.remove(subjeid)ifobjcardin'?1+':objeids.remove(objeid)return{'subjeid':subjeid,'objeid':objeid}defmake_relation_queries(self,sedict,oedict,rschema,subj,obj):rdef=rschema.rdef(subj,obj)subjcard,objcard=rdef.cardinalitysubjeids=sedict.get(subj,frozenset())used=self.existingrels[rschema.type]preexisting_subjrels=set(subjforsubj,objinused)preexisting_objrels=set(objforsubj,objinused)# if there are constraints, only select appropriate objeidsq=self.rql_tmpl%rschema.typeconstraints=[cforcinrdef.constraintsifisinstance(c,RQLConstraint)]ifconstraints:restrictions=', '.join(c.expressionforcinconstraints)q+=', %s'%restrictions# restrict object eids if possible# XXX the attempt to restrict below in completely wrong# disabling it for nowobjeids=select(restrictions,self.cursor,objtype=obj)else:objeids=oedict.get(obj,frozenset())ifsubjcardin'?1'orobjcardin'?1':forsubjeid,objeidinused:ifsubjcardin'?1'andsubjeidinsubjeids:subjeids.remove(subjeid)# XXX why?#if objeid in objeids:# objeids.remove(objeid)ifobjcardin'?1'andobjeidinobjeids:objeids.remove(objeid)# XXX why?#if subjeid in subjeids:# subjeids.remove(subjeid)ifnotsubjeids:check_card_satisfied(objcard,objeids,subj,rschema,obj)returnifnotobjeids:check_card_satisfied(subjcard,subjeids,subj,rschema,obj)returnifsubjcardin'?1+':forsubjeidintuple(subjeids):# do not insert relation if this entity already has a relationifsubjeidinpreexisting_subjrels:continueobjeid=choose_eid(objeids,subjeid)ifobjeidisNoneor(subjeid,objeid)inused:continueyieldq,self.qargs(subjeids,objeids,subjcard,objcard,subjeid,objeid)used.add((subjeid,objeid))ifnotobjeids:check_card_satisfied(subjcard,subjeids,subj,rschema,obj)breakelifobjcardin'?1+':forobjeidintuple(objeids):# do not insert relation if this entity already has a relationifobjeidinpreexisting_objrels:continuesubjeid=choose_eid(subjeids,objeid)ifsubjeidisNoneor(subjeid,objeid)inused:continueyieldq,self.qargs(subjeids,objeids,subjcard,objcard,subjeid,objeid)used.add((subjeid,objeid))ifnotsubjeids:check_card_satisfied(objcard,objeids,subj,rschema,obj)breakelse:# FIXME: 20 should be read from configsubjeidsiter=[choice(tuple(subjeids))foriinxrange(min(len(subjeids),20))]objeidsiter=[choice(tuple(objeids))foriinxrange(min(len(objeids),20))]forsubjeid,objeidinzip(subjeidsiter,objeidsiter):ifsubjeid!=objeidandnot(subjeid,objeid)inused:used.add((subjeid,objeid))yieldq,self.qargs(subjeids,objeids,subjcard,objcard,subjeid,objeid)defcheck_card_satisfied(card,remaining,subj,rschema,obj):ifcardin'1+'andremaining:raiseException("can't satisfy cardinality %s for relation %s%s%s"%(card,subj,rschema,obj))defchoose_eid(values,avoid):values=tuple(values)iflen(values)==1andvalues[0]==avoid:returnNoneobjeid=choice(values)whileobjeid==avoid:# avoid infinite recursion like in X comment Xobjeid=choice(values)returnobjeid# UTILITIES FUNCS ##############################################################defmake_tel(num_tel):"""takes an integer, converts is as a string and inserts white spaces each 2 chars (french notation) """num_list=list(str(num_tel))forindexin(6,4,2):num_list.insert(index,' ')return''.join(num_list)defnumlen(number):"""returns the number's length"""returnlen(str(number))