# -*- coding: iso-8859-1 -*-"""This modules defines func / methods for creating test repositories:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"fromrandomimportrandint,choicefromcopyimportdeepcopyfromdatetimeimportdatetime,date,time#timedeltafromdecimalimportDecimalfromyams.constraintsimport(SizeConstraint,StaticVocabularyConstraint,IntervalBoundConstraint)fromrql.utilsimportdecompose_b26asbase_decompose_b26fromcubicwebimportBinaryfromcubicweb.schemaimportRQLConstraintdefdecompose_b26(index,ascii=False):"""return a letter (base-26) decomposition of index"""ifascii:returnbase_decompose_b26(index)returnbase_decompose_b26(index,u'�abcdefghijklmnopqrstuvwxyz')defget_choices(eschema,attrname):"""returns possible choices for 'attrname' if attrname doesn't have ChoiceConstraint, return None """forcstineschema.constraints(attrname):ifisinstance(cst,StaticVocabularyConstraint):returncst.vocabulary()returnNonedefget_max_length(eschema,attrname):"""returns the maximum length allowed for 'attrname'"""forcstineschema.constraints(attrname):ifisinstance(cst,SizeConstraint)andcst.max:returncst.maxreturn300#raise AttributeError('No Size constraint on attribute "%s"' % attrname)defget_bounds(eschema,attrname):forcstineschema.constraints(attrname):ifisinstance(cst,IntervalBoundConstraint):returncst.minvalue,cst.maxvaluereturnNone,None_GENERATED_VALUES={}class_ValueGenerator(object):"""generates integers / dates / strings / etc. to fill a DB table"""def__init__(self,eschema,choice_func=None):"""<choice_func> is a function that returns a list of possible choices for a given entity type and an attribute name. It should looks like : def values_for(etype, attrname): # some stuff ... return alist_of_acceptable_values # or None """self.e_schema=eschemaself.choice_func=choice_funcdef_generate_value(self,attrname,index,**kwargs):ifnotself.e_schema.has_unique_values(attrname):returnself.__generate_value(attrname,index,**kwargs)value=self.__generate_value(attrname,index,**kwargs)whilevaluein_GENERATED_VALUES.get((self.e_schema.type,attrname),()):index+=1value=self.__generate_value(attrname,index,**kwargs)_GENERATED_VALUES.setdefault((self.e_schema.type,attrname),set()).add(value)returnvaluedef__generate_value(self,attrname,index,**kwargs):"""generates a consistent value for 'attrname'"""attrtype=str(self.e_schema.destination(attrname)).lower()# Before calling generate_%s functions, try to find values domainetype=self.e_schema.typeifself.choice_funcisnotNone:values_domain=self.choice_func(etype,attrname)ifvalues_domainisnotNone:returnchoice(values_domain)gen_func=getattr(self,'generate_%s_%s'%(self.e_schema.type,attrname),None)ifgen_funcisNone:gen_func=getattr(self,'generate_Any_%s'%attrname,None)ifgen_funcisnotNone:returngen_func(index,**kwargs)# If no specific values domain, then generate a dummy valuegen_func=getattr(self,'generate_%s'%(attrtype))returngen_func(attrname,index,**kwargs)defgenerate_choice(self,attrname,index):"""generates a consistent value for 'attrname' if it's a choice"""choices=get_choices(self.e_schema,attrname)ifchoicesisNone:returnNonereturnunicode(choice(choices))# FIXMEdefgenerate_string(self,attrname,index,format=None):"""generates a consistent value for 'attrname' if it's a string"""# First try to get choiceschoosed=self.generate_choice(attrname,index)ifchoosedisnotNone:returnchoosed# All other case, generate a default stringattrlength=get_max_length(self.e_schema,attrname)num_len=numlen(index)ifnum_len>=attrlength:ascii=self.e_schema.rproperty(attrname,'internationalizable')return('&'+decompose_b26(index,ascii))[:attrlength]# always use plain text when no format is specifiedattrprefix=attrname[:max(attrlength-num_len-1,0)]ifformat=='text/html':value=u'<span>�%s<b>%d</b></span>'%(attrprefix,index)elifformat=='text/rest':value=u"""title-----* %s* %d* �&"""%(attrprefix,index)else:value=u'�&%s%d'%(attrprefix,index)returnvalue[:attrlength]defgenerate_password(self,attrname,index):"""generates a consistent value for 'attrname' if it's a password"""returnu'toto'defgenerate_integer(self,attrname,index):"""generates a consistent value for 'attrname' if it's an integer"""choosed=self.generate_choice(attrname,index)ifchoosedisnotNone:returnchoosedminvalue,maxvalue=get_bounds(self.e_schema,attrname)ifmaxvalueisnotNoneandmaxvalue<=0andminvalueisNone:minvalue=maxvalue-index# i.e. randint(-index, 0)else:maxvalue=maxvalueorindexreturnrandint(minvalueor0,maxvalue)generate_int=generate_integerdefgenerate_float(self,attrname,index):"""generates a consistent value for 'attrname' if it's a float"""returnfloat(randint(-index,index))defgenerate_decimal(self,attrname,index):"""generates a consistent value for 'attrname' if it's a float"""returnDecimal(str(self.generate_float(attrname,index)))defgenerate_date(self,attrname,index):"""generates a random date (format is 'yyyy-mm-dd')"""returndate(randint(2000,2004),randint(1,12),randint(1,28))defgenerate_time(self,attrname,index):"""generates a random time (format is ' HH:MM')"""returntime(11,index%60)#'11:%02d' % (index % 60)defgenerate_datetime(self,attrname,index):"""generates a random date (format is 'yyyy-mm-dd HH:MM')"""returndatetime(randint(2000,2004),randint(1,12),randint(1,28),11,index%60)defgenerate_bytes(self,attrname,index,format=None):# modpython wayfakefile=Binary("%s%s"%(attrname,index))fakefile.filename=u"file_%s"%attrnamefakefile.value=fakefile.getvalue()returnfakefiledefgenerate_boolean(self,attrname,index):"""generates a consistent value for 'attrname' if it's a boolean"""returnindex%2==0defgenerate_Any_data_format(self,index,**kwargs):# data_format attribute of Image/File has no vocabulary constraint, we# need this method else stupid values will be set which make mtconverter# raise exceptionreturnu'application/octet-stream'defgenerate_Any_content_format(self,index,**kwargs):# content_format attribute of EmailPart has no vocabulary constraint, we# need this method else stupid values will be set which make mtconverter# raise exceptionreturnu'text/plain'defgenerate_Image_data_format(self,index,**kwargs):# data_format attribute of Image/File has no vocabulary constraint, we# need this method else stupid values will be set which make mtconverter# raise exceptionreturnu'image/png'classautoextend(type):def__new__(mcs,name,bases,classdict):forattrname,attrvalueinclassdict.items():ifcallable(attrvalue):ifattrname.startswith('generate_')and \attrvalue.func_code.co_argcount<2:raiseTypeError('generate_xxx must accept at least 1 argument')setattr(_ValueGenerator,attrname,attrvalue)returntype.__new__(mcs,name,bases,classdict)classValueGenerator(_ValueGenerator):__metaclass__=autoextenddef_default_choice_func(etype,attrname):"""default choice_func for insert_entity_queries"""returnNonedefinsert_entity_queries(etype,schema,vreg,entity_num,choice_func=_default_choice_func):"""returns a list of 'add entity' queries (couples query, args) :type etype: str :param etype: the entity's type :type schema: cubicweb.schema.Schema :param schema: the instance schema :type entity_num: int :param entity_num: the number of entities to insert XXX FIXME: choice_func is here for *historical* reasons, it should probably replaced by a nicer way to specify choices :type choice_func: function :param choice_func: a function that takes an entity type, an attrname and returns acceptable values for this attribute """# XXX HACK, remove or fix asapifetypein(('String','Int','Float','Boolean','Date','CWGroup','CWUser')):return[]queries=[]forindexinxrange(entity_num):restrictions=[]args={}forattrname,valueinmake_entity(etype,schema,vreg,index,choice_func).items():restrictions.append('X %s%%(%s)s'%(attrname,attrname))args[attrname]=valueifrestrictions:queries.append(('INSERT %s X: %s'%(etype,', '.join(restrictions)),args))assertnot'eid'inargs,argselse:queries.append(('INSERT %s X'%etype,{}))returnqueriesdefmake_entity(etype,schema,vreg,index=0,choice_func=_default_choice_func,form=False):"""generates a random entity and returns it as a dict by default, generate an entity to be inserted in the repository elif form, generate an form dictionnary to be given to a web controller """eschema=schema.eschema(etype)valgen=ValueGenerator(eschema,choice_func)entity={}# preprocessing to deal with _format fieldsattributes=[]relatedfields={}forrschema,attrschemaineschema.attribute_definitions():attrname=rschema.typeifattrname=='eid':# don't specify eids !continueifattrname.endswith('_format')andattrname[:-7]ineschema.subject_relations():relatedfields[attrname[:-7]]=attrschemaelse:attributes.append((attrname,attrschema))forattrname,attrschemainattributes:ifattrnameinrelatedfields:# first generate a format and record itformat=valgen._generate_value(attrname+'_format',index)entity[attrname+'_format']=format# then a value coherent with this formatvalue=valgen._generate_value(attrname,index,format=format)else:value=valgen._generate_value(attrname,index)ifform:# need to encode valuesifattrschema.type=='Bytes':# twisted wayfakefile=valuefilename=value.filenamevalue=(filename,u"text/plain",fakefile)elifattrschema.type=='Date':value=value.strftime(vreg.property_value('ui.date-format'))elifattrschema.type=='Datetime':value=value.strftime(vreg.property_value('ui.datetime-format'))elifattrschema.type=='Time':value=value.strftime(vreg.property_value('ui.time-format'))elifattrschema.type=='Float':fmt=vreg.property_value('ui.float-format')value=fmt%valueelse:value=unicode(value)entity[attrname]=valuereturnentitydefselect(constraints,cursor,selectvar='O'):"""returns list of eids matching <constraints> <selectvar> should be either 'O' or 'S' to match schema definitions """try:rset=cursor.execute('Any %s WHERE %s'%(selectvar,constraints))except:print"could restrict eid_list with given constraints (%r)"%constraintsreturn[]returnset(eidforeid,inrset.rows)defmake_relations_queries(schema,edict,cursor,ignored_relations=(),existingrels=None):"""returns a list of generated RQL queries for relations :param schema: The instance schema :param e_dict: mapping between etypes and eids :param ignored_relations: list of relations to ignore (i.e. don't try to generate insert queries for these relations) """gen=RelationsQueriesGenerator(schema,cursor,existingrels)returngen.compute_queries(edict,ignored_relations)classRelationsQueriesGenerator(object):rql_tmpl='SET S %s O WHERE S eid %%(subjeid)s, O eid %%(objeid)s'def__init__(self,schema,cursor,existing=None):self.schema=schemaself.cursor=cursorself.existingrels=existingor{}defcompute_queries(self,edict,ignored_relations):queries=[]# 1/ skip final relations and explictly ignored relationsrels=[rschemaforrschemainself.schema.relations()ifnot(rschema.finalorrschemainignored_relations)]# for each relation# 2/ take each possible couple (subj, obj)# 3/ analyze cardinality of relation# a/ if relation is mandatory, insert one relation# b/ else insert N relations where N is the mininum# of 20 and the number of existing targetable entitiesforrschemainrels:sym=set()sedict=deepcopy(edict)oedict=deepcopy(edict)delayed=[]# for each couple (subjschema, objschema), insert relationsforsubj,objinrschema.iter_rdefs():sym.add((subj,obj))ifrschema.symetricand(obj,subj)insym:continuesubjcard,objcard=rschema.rproperty(subj,obj,'cardinality')# process mandatory relations firstifsubjcardin'1+'orobjcardin'1+':queries+=self.make_relation_queries(sedict,oedict,rschema,subj,obj)else:delayed.append((subj,obj))forsubj,objindelayed:queries+=self.make_relation_queries(sedict,oedict,rschema,subj,obj)returnqueriesdefqargs(self,subjeids,objeids,subjcard,objcard,subjeid,objeid):ifsubjcardin'?1':subjeids.remove(subjeid)ifobjcardin'?1':objeids.remove(objeid)return{'subjeid':subjeid,'objeid':objeid}defmake_relation_queries(self,sedict,oedict,rschema,subj,obj):subjcard,objcard=rschema.rproperty(subj,obj,'cardinality')subjeids=sedict.get(subj,frozenset())used=self.existingrels[rschema.type]preexisting_subjrels=set(subjforsubj,objinused)preexisting_objrels=set(objforsubj,objinused)# if there are constraints, only select appropriate objeidsq=self.rql_tmpl%rschema.typeconstraints=[cforcinrschema.rproperty(subj,obj,'constraints')ifisinstance(c,RQLConstraint)]ifconstraints:restrictions=', '.join(c.restrictionforcinconstraints)q+=', %s'%restrictions# restrict object eids if possible# XXX the attempt to restrict below in completely wrong# disabling it for nowobjeids=select(restrictions,self.cursor)else:objeids=oedict.get(obj,frozenset())## objeids = oedict.get(obj, frozenset())ifsubjcardin'?1'orobjcardin'?1':forsubjeid,objeidinused:ifsubjcardin'?1'andsubjeidinsubjeids:subjeids.remove(subjeid)# XXX why?#if objeid in objeids:# objeids.remove(objeid)ifobjcardin'?1'andobjeidinobjeids:objeids.remove(objeid)# XXX why?#if subjeid in subjeids:# subjeids.remove(subjeid)ifnotsubjeids:check_card_satisfied(objcard,objeids,subj,rschema,obj)returnifnotobjeids:check_card_satisfied(subjcard,subjeids,subj,rschema,obj)returnifsubjcardin'?1+':forsubjeidintuple(subjeids):# do not insert relation if this entity already has a relationifsubjeidinpreexisting_subjrels:continueobjeid=choose_eid(objeids,subjeid)ifobjeidisNoneor(subjeid,objeid)inused:continueyieldq,self.qargs(subjeids,objeids,subjcard,objcard,subjeid,objeid)used.add((subjeid,objeid))ifnotobjeids:check_card_satisfied(subjcard,subjeids,subj,rschema,obj)breakelifobjcardin'?1+':forobjeidintuple(objeids):# do not insert relation if this entity already has a relationifobjeidinpreexisting_objrels:continuesubjeid=choose_eid(subjeids,objeid)ifsubjeidisNoneor(subjeid,objeid)inused:continueyieldq,self.qargs(subjeids,objeids,subjcard,objcard,subjeid,objeid)used.add((subjeid,objeid))ifnotsubjeids:check_card_satisfied(objcard,objeids,subj,rschema,obj)breakelse:# FIXME: 20 should be read from configsubjeidsiter=[choice(tuple(subjeids))foriinxrange(min(len(subjeids),20))]objeidsiter=[choice(tuple(objeids))foriinxrange(min(len(objeids),20))]forsubjeid,objeidinzip(subjeidsiter,objeidsiter):ifsubjeid!=objeidandnot(subjeid,objeid)inused:used.add((subjeid,objeid))yieldq,self.qargs(subjeids,objeids,subjcard,objcard,subjeid,objeid)defcheck_card_satisfied(card,remaining,subj,rschema,obj):ifcardin'1+'andremaining:raiseException("can't satisfy cardinality %s for relation %s%s%s"%(card,subj,rschema,obj))defchoose_eid(values,avoid):values=tuple(values)iflen(values)==1andvalues[0]==avoid:returnNoneobjeid=choice(values)whileobjeid==avoid:# avoid infinite recursion like in X comment Xobjeid=choice(values)returnobjeid# UTILITIES FUNCS ##############################################################defmake_tel(num_tel):"""takes an integer, converts is as a string and inserts white spaces each 2 chars (french notation) """num_list=list(str(num_tel))forindexin(6,4,2):num_list.insert(index,' ')return''.join(num_list)defnumlen(number):"""returns the number's length"""returnlen(str(number))