# -*- coding: utf-8 -*-"""This module provides tools to import tabular data.:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licensesExample of use (run this with `cubicweb-ctl shell instance import-script.py`):.. sourcecode:: python from cubicweb.devtools.dataimport import * # define data generators GENERATORS = [] USERS = [('Prenom', 'firstname', ()), ('Nom', 'surname', ()), ('Identifiant', 'login', ()), ] def gen_users(ctl): for row in ctl.get_data('utilisateurs'): entity = mk_entity(row, USERS) entity['upassword'] = u'motdepasse' ctl.check('login', entity['login'], None) ctl.store.add('CWUser', entity) email = {'address': row['email']} ctl.store.add('EmailAddress', email) ctl.store.relate(entity['eid'], 'use_email', email['eid']) ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x':entity['eid']}) CHK = [('login', check_doubles, 'Utilisateurs Login', 'Deux utilisateurs ne devraient pas avoir le même login.'), ] GENERATORS.append( (gen_users, CHK) ) # create controller ctl = CWImportController(RQLObjectStore()) ctl.askerror = 1 ctl.generators = GENERATORS ctl.store._checkpoint = checkpoint ctl.store._rql = rql ctl.data['utilisateurs'] = lazytable(utf8csvreader(open('users.csv'))) # run ctl.run() sys.exit(0).. BUG fichier à une colonne pose un problème de parsing.. TODO rollback()"""__docformat__="restructuredtext en"importsysimportcsvimporttracebackimportos.pathasospfromStringIOimportStringIOfromcopyimportcopyfromlogilab.commonimportshellutilsfromlogilab.common.dateimportstrptimefromlogilab.common.decoratorsimportcachedfromlogilab.common.deprecationimportdeprecateddefucsvreader_pb(filepath,encoding='utf-8',separator=',',quote='"',skipfirst=False,withpb=True):"""same as ucsvreader but a progress bar is displayed as we iter on rows"""ifnotosp.exists(filepath):raiseException("file doesn't exists: %s"%filepath)rowcount=int(shellutils.Execute('wc -l "%s"'%filepath).out.strip().split()[0])ifskipfirst:rowcount-=1ifwithpb:pb=shellutils.ProgressBar(rowcount,50)forurowinucsvreader(file(filepath),encoding,separator,quote,skipfirst):yieldurowifwithpb:pb.update()print' %s rows imported'%rowcountdefucsvreader(stream,encoding='utf-8',separator=',',quote='"',skipfirst=False):"""A csv reader that accepts files with any encoding and outputs unicode strings """it=iter(csv.reader(stream,delimiter=separator,quotechar=quote))ifskipfirst:it.next()forrowinit:yield[item.decode(encoding)foriteminrow]defcommit_every(nbit,store,it):fori,xinenumerate(it):yieldxifnbitisnotNoneandi%nbit:store.checkpoint()ifnbitisnotNone:store.checkpoint()deflazytable(reader):"""The first row is taken to be the header of the table and used to output a dict for each row of data. >>> data = lazytable(utf8csvreader(open(filename))) """header=reader.next()forrowinreader:yielddict(zip(header,row))defmk_entity(row,map):"""Return a dict made from sanitized mapped values. ValidationError can be raised on unexpected values found in checkers >>> row = {'myname': u'dupont'} >>> map = [('myname', u'name', (capitalize_if_unicase,))] >>> mk_entity(row, map) {'name': u'Dupont'} >>> row = {'myname': u'dupont', 'optname': u''} >>> map = [('myname', u'name', (capitalize_if_unicase,)), ... ('optname', u'MARKER', (optional,))] >>> mk_entity(row, map) {'name': u'Dupont'} """res={}assertisinstance(row,dict)assertisinstance(map,list)forsrc,dest,funcsinmap:assertnot(requiredinfuncsandoptionalinfuncs), \"optional and required checks are exclusive"res[dest]=row[src]try:forfuncinfuncs:res[dest]=func(res[dest])ifres[dest]isNone:breakexceptValueError,err:raiseValueError('error with %r field: %s'%(src,err))returnres# user interactions ############################################################deftell(msg):printmsgdefconfirm(question):"""A confirm function that asks for yes/no/abort and exits on abort."""answer=shellutils.ASK.ask(question,('Y','n','abort'),'Y')ifanswer=='abort':sys.exit(1)returnanswer=='Y'classcatch_error(object):"""Helper for @contextmanager decorator."""def__init__(self,ctl,key='unexpected error',msg=None):self.ctl=ctlself.key=keyself.msg=msgdef__enter__(self):returnselfdef__exit__(self,type,value,traceback):iftypeisnotNone:ifissubclass(type,(KeyboardInterrupt,SystemExit)):return# re-raiseifself.ctl.catcherrors:self.ctl.record_error(self.key,None,type,value,traceback)returnTrue# silent# base sanitizing/coercing functions ###########################################defoptional(value):"""validation error will not been raised if you add this checker in chain"""ifvalue:returnvaluereturnNonedefrequired(value):"""raise ValueError is value is empty This check should be often found in last position in the chain. """ifvalue:returnvalueraiseValueError("required")deftodatetime(format='%d/%m/%Y'):"""return a transformation function to turn string input value into a `datetime.datetime` instance, using given format. Follow it by `todate` or `totime` functions from `logilab.common.date` if you want a `date`/`time` instance instead of `datetime`. """defcoerce(value):returnstrptime(value,format)returncoercedefcall_transform_method(methodname,*args,**kwargs):"""return value returned by calling the given method on input"""defcoerce(value):returngetattr(value,methodname)(*args,**kwargs)returncoercedefcall_check_method(methodname,*args,**kwargs):"""check value returned by calling the given method on input is true, else raise ValueError """defcheck(value):ifgetattr(value,methodname)(*args,**kwargs):returnvalueraiseValueError('%s not verified on %r'%(methodname,value))returncheck# base integrity checking functions ############################################defcheck_doubles(buckets):"""Extract the keys that have more than one item in their bucket."""return[(k,len(v))fork,vinbuckets.items()iflen(v)>1]defcheck_doubles_not_none(buckets):"""Extract the keys that have more than one item in their bucket."""return[(k,len(v))fork,vinbuckets.items()ifkisnotNoneandlen(v)>1]# object stores #################################################################classObjectStore(object):"""Store objects in memory for *faster* validation (development mode) But it will not enforce the constraints of the schema and hence will miss some problems >>> store = ObjectStore() >>> user = {'login': 'johndoe'} >>> store.add('CWUser', user) >>> group = {'name': 'unknown'} >>> store.add('CWUser', group) >>> store.relate(user['eid'], 'in_group', group['eid']) """def__init__(self):self.items=[]self.eids={}self.types={}self.relations=set()self.indexes={}self._rql=Noneself._checkpoint=Nonedef_put(self,type,item):self.items.append(item)returnlen(self.items)-1defadd(self,type,item):assertisinstance(item,dict),'item is not a dict but a %s'%type(item)eid=item['eid']=self._put(type,item)self.eids[eid]=itemself.types.setdefault(type,[]).append(eid)defrelate(self,eid_from,rtype,eid_to,inlined=False):"""Add new relation (reverse type support is available) >>> 1,2 = eid_from, eid_to >>> self.relate(eid_from, 'in_group', eid_to) 1, 'in_group', 2 >>> self.relate(eid_from, 'reverse_in_group', eid_to) 2, 'in_group', 1 """ifrtype.startswith('reverse_'):eid_from,eid_to=eid_to,eid_fromrtype=rtype[8:]relation=eid_from,rtype,eid_toself.relations.add(relation)returnrelationdefbuild_index(self,name,type,func=None):index={}iffuncisNoneornotcallable(func):func=lambdax:x['eid']foreidinself.types[type]:index.setdefault(func(self.eids[eid]),[]).append(eid)assertindex,"new index '%s' cannot be empty"%nameself.indexes[name]=indexdefbuild_rqlindex(self,name,type,key,rql,rql_params=False,func=None):"""build an index by rql query rql should return eid in first column ctl.store.build_index('index_name', 'users', 'login', 'Any U WHERE U is CWUser') """rset=self.rql(rql,rql_paramsor{})forentityinrset.entities():getattr(entity,key)# autopopulate entity with key attributeself.eids[entity.eid]=dict(entity)ifentity.eidnotinself.types.setdefault(type,[]):self.types[type].append(entity.eid)assertself.types[type],"new index type '%s' cannot be empty (0 record found)"%type# Build index with specified keyfunc=lambdax:x[key]self.build_index(name,type,func)deffetch(self,name,key,unique=False,decorator=None):""" decorator is a callable method or an iterator of callable methods (usually a lambda function) decorator=lambda x: x[:1] (first value is returned) We can use validation check function available in _entity """eids=self.indexes[name].get(key,[])ifdecoratorisnotNone:ifnothasattr(decorator,'__iter__'):decorator=(decorator,)forfindecorator:eids=f(eids)ifunique:assertlen(eids)==1,u'expected a single one value for key "%s" in index "%s". Got %i'%(key,name,len(eids))eids=eids[0]# FIXME maybe it's better to keep an iterator here ?returneidsdeffind(self,type,key,value):foridxinself.types[type]:item=self.items[idx]ifitem[key]==value:yielditemdefrql(self,*args):ifself._rqlisnotNone:returnself._rql(*args)defcheckpoint(self):pass@propertydefnb_inserted_entities(self):returnlen(self.eids)@propertydefnb_inserted_types(self):returnlen(self.types)@propertydefnb_inserted_relations(self):returnlen(self.relations)@deprecated('[3.6] get_many() deprecated. Use fetch() instead')defget_many(self,name,key):returnself.fetch(name,key,unique=False)@deprecated('[3.6] get_one() deprecated. Use fetch(..., unique=True) instead')defget_one(self,name,key):returnself.fetch(name,key,unique=True)classRQLObjectStore(ObjectStore):"""ObjectStore that works with an actual RQL repository (production mode)"""_rql=None# bw compatdef__init__(self,session=None,checkpoint=None):ObjectStore.__init__(self)ifsessionisnotNone:ifnothasattr(session,'set_pool'):# connectioncnx=sessionsession=session.request()session.set_pool=lambda:Nonecheckpoint=checkpointorcnx.commitelse:session.set_pool()self.session=sessionself._checkpoint=checkpointorsession.commitelifcheckpointisnotNone:self._checkpoint=checkpoint# XXX .sessiondefcheckpoint(self):self._checkpoint()self.session.set_pool()defrql(self,*args):ifself._rqlisnotNone:returnself._rql(*args)returnself.session.execute(*args)defcreate_entity(self,*args,**kwargs):entity=self.session.create_entity(*args,**kwargs)self.eids[entity.eid]=entityself.types.setdefault(args[0],[]).append(entity.eid)returnentitydef_put(self,type,item):query=('INSERT %s X: '%type)+', '.join('X %s%%(%s)s'%(k,k)forkinitem)returnself.rql(query,item)[0][0]defrelate(self,eid_from,rtype,eid_to,inlined=False):# if reverse relation is found, eids are exchangedeid_from,rtype,eid_to=super(RQLObjectStore,self).relate(eid_from,rtype,eid_to)self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':int(eid_from),'y':int(eid_to)},('x','y'))# the import controller ########################################################classCWImportController(object):"""Controller of the data import process. >>> ctl = CWImportController(store) >>> ctl.generators = list_of_data_generators >>> ctl.data = dict_of_data_tables >>> ctl.run() """def__init__(self,store,askerror=0,catcherrors=None,tell=tell,commitevery=50):self.store=storeself.generators=Noneself.data={}self.errors=Noneself.askerror=askerrorifcatcherrorsisNone:catcherrors=askerrorself.catcherrors=catcherrorsself.commitevery=commitevery# set to None to do a single commitself._tell=telldefcheck(self,type,key,value):self._checks.setdefault(type,{}).setdefault(key,[]).append(value)defcheck_map(self,entity,key,map,default):try:entity[key]=map[entity[key]]exceptKeyError:self.check(key,entity[key],None)entity[key]=defaultdefrecord_error(self,key,msg=None,type=None,value=None,tb=None):tmp=StringIO()iftypeisNone:traceback.print_exc(file=tmp)else:traceback.print_exception(type,value,tb,file=tmp)printtmp.getvalue()# use a list to avoid counting a <nb lines> errors instead of oneerrorlog=self.errors.setdefault(key,[])ifmsgisNone:errorlog.append(tmp.getvalue().splitlines())else:errorlog.append((msg,tmp.getvalue().splitlines()))defrun(self):self.errors={}forfunc,checksinself.generators:self._checks={}func_name=func.__name__[4:]# XXXself.tell("Import '%s'..."%func_name)try:func(self)except:ifself.catcherrors:self.record_error(func_name,'While calling %s'%func.__name__)else:raiseforkey,func,title,helpinchecks:buckets=self._checks.get(key)ifbuckets:err=func(buckets)iferr:self.errors[title]=(help,err)self.store.checkpoint()nberrors=sum(len(err[1])forerrinself.errors.values())self.tell('\nImport completed: %i entities, %i types, %i relations and %i errors'%(self.store.nb_inserted_entities,self.store.nb_inserted_types,self.store.nb_inserted_relations,nberrors))ifself.errors:ifself.askerror==2or(self.askerrorandconfirm('Display errors ?')):frompprintimportpformatforerrkey,errorinself.errors.items():self.tell("\n%s (%s): %d\n"%(error[0],errkey,len(error[1])))self.tell(pformat(sorted(error[1])))defget_data(self,key):returnself.data.get(key)defindex(self,name,key,value,unique=False):"""create a new index If unique is set to True, only first occurence will be kept not the following ones """ifunique:try:ifvalueinself.store.indexes[name][key]:returnexceptKeyError:# we're sure that one is the first occurence; so continue...passself.store.indexes.setdefault(name,{}).setdefault(key,[]).append(value)deftell(self,msg):self._tell(msg)defiter_and_commit(self,datakey):"""iter rows, triggering commit every self.commitevery iterations"""returncommit_every(self.commitevery,self.store,self.get_data(datakey))fromdatetimeimportdatetimefromcubicweb.schemaimportMETA_RTYPES,VIRTUAL_RTYPESclassNoHookRQLObjectStore(RQLObjectStore):"""ObjectStore that works with an actual RQL repository (production mode)"""_rql=None# bw compatdef__init__(self,session,metagen=None,baseurl=None):super(NoHookRQLObjectStore,self).__init__(session)self.source=session.repo.system_sourceself.rschema=session.repo.schema.rschemaself.add_relation=self.source.add_relationifmetagenisNone:metagen=MetaGenerator(session,baseurl)self.metagen=metagenself._nb_inserted_entities=0self._nb_inserted_types=0self._nb_inserted_relations=0self.rql=session.unsafe_executedefcreate_entity(self,etype,**kwargs):fork,vinkwargs.iteritems():kwargs[k]=getattr(v,'eid',v)entity,rels=self.metagen.base_etype_dicts(etype)entity=copy(entity)entity._related_cache={}self.metagen.init_entity(entity)entity.update(kwargs)session=self.sessionself.source.add_entity(session,entity)self.source.add_info(session,entity,self.source,complete=False)forrtype,targeteidsinrels.iteritems():# targeteids may be a single eid or a list of eidsinlined=self.rschema(rtype).inlinedtry:fortargeteidintargeteids:self.add_relation(session,entity.eid,rtype,targeteid,inlined)exceptTypeError:self.add_relation(session,entity.eid,rtype,targeteids,inlined)self._nb_inserted_entities+=1returnentitydefrelate(self,eid_from,rtype,eid_to):assertnotrtype.startswith('reverse_')self.add_relation(self.session,eid_from,rtype,eid_to,self.rschema(rtype).inlined)self._nb_inserted_relations+=1@propertydefnb_inserted_entities(self):returnself._nb_inserted_entities@propertydefnb_inserted_types(self):returnself._nb_inserted_types@propertydefnb_inserted_relations(self):returnself._nb_inserted_relationsdef_put(self,type,item):raiseRuntimeError('use create entity')classMetaGenerator(object):def__init__(self,session,baseurl=None):self.session=sessionself.source=session.repo.system_sourceself.time=datetime.now()ifbaseurlisNone:config=session.vreg.configbaseurl=config['base-url']orconfig.default_base_url()ifnotbaseurl[-1]=='/':baseurl+='/'self.baseurl=baseurl# attributes/relations shared by all entities of the same typeself.etype_attrs=[]self.etype_rels=[]# attributes/relations specific to each entityself.entity_attrs=['eid','cwuri']#self.entity_rels = [] XXX not handled (YAGNI?)schema=session.vreg.schemarschema=schema.rschemaforrtypeinMETA_RTYPES:ifrtypein('eid','cwuri')orrtypeinVIRTUAL_RTYPES:continueifrschema(rtype).final:self.etype_attrs.append(rtype)else:self.etype_rels.append(rtype)ifnotschema._eid_index:# test schema loaded from the fsself.gen_is=self.test_gen_isself.gen_is_instance_of=self.test_gen_is_instanceof@cacheddefbase_etype_dicts(self,etype):entity=self.session.vreg['etypes'].etype_class(etype)(self.session)# entity are "surface" copied, avoid shared dict between copiesdelentity.cw_extra_kwargsforattrinself.etype_attrs:entity[attr]=self.generate(entity,attr)rels={}forrelinself.etype_rels:rels[rel]=self.generate(entity,rel)returnentity,relsdefinit_entity(self,entity):forattrinself.entity_attrs:entity[attr]=self.generate(entity,attr)entity.eid=entity['eid']defgenerate(self,entity,rtype):returngetattr(self,'gen_%s'%rtype)(entity)defgen_eid(self,entity):returnself.source.create_eid(self.session)defgen_cwuri(self,entity):returnu'%seid/%s'%(self.baseurl,entity['eid'])defgen_creation_date(self,entity):returnself.timedefgen_modification_date(self,entity):returnself.timedefgen_is(self,entity):returnentity.e_schema.eiddefgen_is_instance_of(self,entity):eids=[]foretypeinentity.e_schema.ancestors()+[entity.e_schema]:eids.append(entity.e_schema.eid)returneidsdefgen_created_by(self,entity):returnself.session.user.eiddefgen_owned_by(self,entity):returnself.session.user.eid# implementations of gen_is / gen_is_instance_of to use during test where# schema has been loaded from the fs (hence entity type schema eids are not# known)deftest_gen_is(self,entity):fromcubicweb.hooks.metadataimporteschema_eidreturneschema_eid(self.session,entity.e_schema)deftest_gen_is_instanceof(self,entity):fromcubicweb.hooks.metadataimporteschema_eideids=[]foreschemainentity.e_schema.ancestors()+[entity.e_schema]:eids.append(eschema_eid(self.session,eschema))returneids################################################################################utf8csvreader=deprecated('[3.6] use ucsvreader instead')(ucsvreader)@deprecated('[3.6] use required')defnonempty(value):returnrequired(value)@deprecated("[3.6] use call_check_method('isdigit')")defalldigits(txt):iftxt.isdigit():returntxtelse:returnu''@deprecated("[3.7] too specific, will move away, copy me")defcapitalize_if_unicase(txt):iftxt.isupper()ortxt.islower():returntxt.capitalize()returntxt@deprecated("[3.7] too specific, will move away, copy me")defyesno(value):"""simple heuristic that returns boolean value >>> yesno("Yes") True >>> yesno("oui") True >>> yesno("1") True >>> yesno("11") True >>> yesno("") False >>> yesno("Non") False >>> yesno("blablabla") False """ifvalue:returnvalue.lower()[0]in'yo1'returnFalse@deprecated("[3.7] use call_check_method('isalpha')")defisalpha(value):ifvalue.isalpha():returnvalueraiseValueError("not all characters in the string alphabetic")@deprecated("[3.7] use call_transform_method('upper')")defuppercase(txt):returntxt.upper()@deprecated("[3.7] use call_transform_method('lower')")deflowercase(txt):returntxt.lower()@deprecated("[3.7] use call_transform_method('replace', ' ', '')")defno_space(txt):returntxt.replace(' ','')@deprecated("[3.7] use call_transform_method('replace', u'\xa0', '')")defno_uspace(txt):returntxt.replace(u'\xa0','')@deprecated("[3.7] use call_transform_method('replace', '-', '')")defno_dash(txt):returntxt.replace('-','')@deprecated("[3.7] use call_transform_method('strip')")defstrip(txt):returntxt.strip()@deprecated("[3.7] use call_transform_method('replace', ',', '.'), float")defdecimal(value):returncomma_float(value)@deprecated('[3.7] use int builtin')definteger(value):returnint(value)