# -*- coding: utf-8 -*-"""This module provides tools to import tabular data.:organization: Logilab:copyright: 2001-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licensesExample of use (run this with `cubicweb-ctl shell instance import-script.py`):.. sourcecode:: python from cubicweb.devtools.dataimport import * # define data generators GENERATORS = [] USERS = [('Prenom', 'firstname', ()), ('Nom', 'surname', ()), ('Identifiant', 'login', ()), ] def gen_users(ctl): for row in ctl.get_data('utilisateurs'): entity = mk_entity(row, USERS) entity['upassword'] = u'motdepasse' ctl.check('login', entity['login'], None) ctl.store.add('CWUser', entity) email = {'address': row['email']} ctl.store.add('EmailAddress', email) ctl.store.relate(entity['eid'], 'use_email', email['eid']) ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x':entity['eid']}) CHK = [('login', check_doubles, 'Utilisateurs Login', 'Deux utilisateurs ne devraient pas avoir le même login.'), ] GENERATORS.append( (gen_users, CHK) ) # create controller ctl = CWImportController(RQLObjectStore()) ctl.askerror = 1 ctl.generators = GENERATORS ctl.store._checkpoint = checkpoint ctl.store._rql = rql ctl.data['utilisateurs'] = lazytable(utf8csvreader(open('users.csv'))) # run ctl.run() sys.exit(0).. BUG fichier à une colonne pose un problème de parsing.. TODO rollback()"""__docformat__="restructuredtext en"importsysimportcsvimporttracebackimportos.pathasospfromStringIOimportStringIOfromlogilab.commonimportshellutilsfromlogilab.common.deprecationimportdeprecateddefucsvreader_pb(filepath,encoding='utf-8',separator=',',quote='"',skipfirst=False,withpb=True):"""same as ucsvreader but a progress bar is displayed as we iter on rows"""ifnotosp.exists(filepath):raiseException("file doesn't exists: %s"%filepath)rowcount=int(shellutils.Execute('wc -l "%s"'%filepath).out.strip().split()[0])ifskipfirst:rowcount-=1ifwithpb:pb=shellutils.ProgressBar(rowcount,50)forurowinucsvreader(file(filepath),encoding,separator,quote,skipfirst):yieldurowifwithpb:pb.update()print' %s rows imported'%rowcountdefucsvreader(stream,encoding='utf-8',separator=',',quote='"',skipfirst=False):"""A csv reader that accepts files with any encoding and outputs unicode strings """it=iter(csv.reader(stream,delimiter=separator,quotechar=quote))ifskipfirst:it.next()forrowinit:yield[item.decode(encoding)foriteminrow]utf8csvreader=deprecated('use ucsvreader instead')(ucsvreader)defcommit_every(nbit,store,it):fori,xinenumerate(it):yieldxifnbitisnotNoneandi%nbit:store.checkpoint()ifnbitisnotNone:store.checkpoint()deflazytable(reader):"""The first row is taken to be the header of the table and used to output a dict for each row of data. >>> data = lazytable(utf8csvreader(open(filename))) """header=reader.next()forrowinreader:yielddict(zip(header,row))defmk_entity(row,map):"""Return a dict made from sanitized mapped values. ValidationError can be raised on unexpected values found in checkers >>> row = {'myname': u'dupont'} >>> map = [('myname', u'name', (capitalize_if_unicase,))] >>> mk_entity(row, map) {'name': u'Dupont'} >>> row = {'myname': u'dupont', 'optname': u''} >>> map = [('myname', u'name', (capitalize_if_unicase,)), ... ('optname', u'MARKER', (optional,))] >>> mk_entity(row, map) {'name': u'Dupont'} """res={}assertisinstance(row,dict)assertisinstance(map,list)forsrc,dest,funcsinmap:assertnot(requiredinfuncsandoptionalinfuncs),"optional and required checks are exclusive"res[dest]=row[src]try:forfuncinfuncs:res[dest]=func(res[dest])ifres[dest]isNoneorres[dest]==False:raiseAssertionError('undetermined value')exceptAssertionError,err:ifoptionalinfuncs:# Forget this field if exception is coming from optional functiondelres[dest]else:raiseAssertionError('error with "%s" field: %s'%(src,err))returnres# user interactions ############################################################deftell(msg):printmsgdefconfirm(question):"""A confirm function that asks for yes/no/abort and exits on abort."""answer=shellutils.ASK.ask(question,('Y','n','abort'),'Y')ifanswer=='abort':sys.exit(1)returnanswer=='Y'classcatch_error(object):"""Helper for @contextmanager decorator."""def__init__(self,ctl,key='unexpected error',msg=None):self.ctl=ctlself.key=keyself.msg=msgdef__enter__(self):returnselfdef__exit__(self,type,value,traceback):iftypeisnotNone:ifissubclass(type,(KeyboardInterrupt,SystemExit)):return# re-raiseifself.ctl.catcherrors:self.ctl.record_error(self.key,None,type,value,traceback)returnTrue# silent# base sanitizing functions ####################################################defcapitalize_if_unicase(txt):iftxt.isupper()ortxt.islower():returntxt.capitalize()returntxtdefuppercase(txt):returntxt.upper()deflowercase(txt):returntxt.lower()defno_space(txt):returntxt.replace(' ','')defno_uspace(txt):returntxt.replace(u'\xa0','')defno_dash(txt):returntxt.replace('-','')defdecimal(value):"""cast to float but with comma replacement We take care of some locale format as replacing ',' by '.'"""value=value.replace(',','.')try:returnfloat(value)exceptException,err:raiseAssertionError(err)definteger(value):try:returnint(value)exceptException,err:raiseAssertionError(err)defstrip(txt):returntxt.strip()defyesno(value):returnvalue.lower()[0]in'yo1'defisalpha(value):ifvalue.isalpha():returnvalueraiseAssertionError("not all characters in the string alphabetic")defoptional(value):"""validation error will not been raised if you add this checker in chain"""returnvaluedefrequired(value):"""raise AssertionError is value is empty This check should be often found in last position in the chain. """ifbool(value):returnvalueraiseAssertionError("required")@deprecated('use required(value)')defnonempty(value):returnrequired(value)@deprecated('use integer(value)')defalldigits(txt):iftxt.isdigit():returntxtelse:returnu''# base integrity checking functions ############################################defcheck_doubles(buckets):"""Extract the keys that have more than one item in their bucket."""return[(key,len(value))forkey,valueinbuckets.items()iflen(value)>1]defcheck_doubles_not_none(buckets):"""Extract the keys that have more than one item in their bucket."""return[(key,len(value))forkey,valueinbuckets.items()ifkeyisnotNoneandlen(value)>1]# object stores #################################################################classObjectStore(object):"""Store objects in memory for *faster* validation (development mode) But it will not enforce the constraints of the schema and hence will miss some problems >>> store = ObjectStore() >>> user = {'login': 'johndoe'} >>> store.add('CWUser', user) >>> group = {'name': 'unknown'} >>> store.add('CWUser', group) >>> store.relate(user['eid'], 'in_group', group['eid']) """def__init__(self):self.items=[]self.eids={}self.types={}self.relations=set()self.indexes={}self._rql=Noneself._checkpoint=Nonedef_put(self,type,item):self.items.append(item)returnlen(self.items)-1defadd(self,type,item):assertisinstance(item,dict),'item is not a dict but a %s'%type(item)eid=item['eid']=self._put(type,item)self.eids[eid]=itemself.types.setdefault(type,[]).append(eid)defrelate(self,eid_from,rtype,eid_to):"""Add new relation (reverse type support is available) >>> 1,2 = eid_from, eid_to >>> self.relate(eid_from, 'in_group', eid_to) 1, 'in_group', 2 >>> self.relate(eid_from, 'reverse_in_group', eid_to) 2, 'in_group', 1 """ifrtype.startswith('reverse_'):eid_from,eid_to=eid_to,eid_fromrtype=rtype[8:]relation=eid_from,rtype,eid_toself.relations.add(relation)returnrelationdefbuild_index(self,name,type,func=None):index={}iffuncisNoneornotcallable(func):func=lambdax:x['eid']foreidinself.types[type]:index.setdefault(func(self.eids[eid]),[]).append(eid)assertindex,"new index '%s' cannot be empty"%nameself.indexes[name]=indexdefbuild_rqlindex(self,name,type,key,rql,rql_params=False,func=None):"""build an index by rql query rql should return eid in first column ctl.store.build_index('index_name', 'users', 'login', 'Any U WHERE U is CWUser') """rset=self.rql(rql,rql_paramsor{})forentityinrset.entities():getattr(entity,key)# autopopulate entity with key attributeself.eids[entity.eid]=dict(entity)ifentity.eidnotinself.types.setdefault(type,[]):self.types[type].append(entity.eid)assertself.types[type],"new index type '%s' cannot be empty (0 record found)"%type# Build index with specified keyfunc=lambdax:x[key]self.build_index(name,type,func)@deprecated('get_many() deprecated. Use fetch() instead')defget_many(self,name,key):returnself.fetch(name,key,unique=False)@deprecated('get_one() deprecated. Use fetch(..., unique=True) instead')defget_one(self,name,key):returnself.fetch(name,key,unique=True)deffetch(self,name,key,unique=False,decorator=None):""" decorator is a callable method or an iterator of callable methods (usually a lambda function) decorator=lambda x: x[:1] (first value is returned) We can use validation check function available in _entity """eids=self.indexes[name].get(key,[])ifdecoratorisnotNone:ifnothasattr(decorator,'__iter__'):decorator=(decorator,)forfindecorator:eids=f(eids)ifunique:assertlen(eids)==1,u'expected a single one value for key "%s" in index "%s". Got %i'%(key,name,len(eids))eids=eids[0]# FIXME maybe it's better to keep an iterator here ?returneidsdeffind(self,type,key,value):foridxinself.types[type]:item=self.items[idx]ifitem[key]==value:yielditemdefrql(self,*args):ifself._rqlisnotNone:returnself._rql(*args)defcheckpoint(self):passclassRQLObjectStore(ObjectStore):"""ObjectStore that works with an actual RQL repository (production mode)"""_rql=None# bw compatdef__init__(self,session=None,checkpoint=None):ObjectStore.__init__(self)ifsessionisnotNone:ifnothasattr(session,'set_pool'):# connectioncnx=sessionsession=session.request()session.set_pool=lambda:Nonecheckpoint=checkpointorcnx.commitself.session=sessionself.checkpoint=checkpointorsession.commitelifcheckpointisnotNone:self.checkpoint=checkpointdefrql(self,*args):ifself._rqlisnotNone:returnself._rql(*args)self.session.set_pool()returnself.session.execute(*args)defcreate_entity(self,*args,**kwargs):self.session.set_pool()entity=self.session.create_entity(*args,**kwargs)self.eids[entity.eid]=entityself.types.setdefault(args[0],[]).append(entity.eid)returnentitydef_put(self,type,item):query=('INSERT %s X: '%type)+', '.join(['X %s%%(%s)s'%(key,key)forkeyinitem])returnself.rql(query,item)[0][0]defrelate(self,eid_from,rtype,eid_to):# if reverse relation is found, eids are exchangedeid_from,rtype,eid_to=super(RQLObjectStore,self).relate(eid_from,rtype,eid_to)self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':int(eid_from),'y':int(eid_to)},('x','y'))# the import controller ########################################################classCWImportController(object):"""Controller of the data import process. >>> ctl = CWImportController(store) >>> ctl.generators = list_of_data_generators >>> ctl.data = dict_of_data_tables >>> ctl.run() """def__init__(self,store,askerror=0,catcherrors=None,tell=tell,commitevery=50):self.store=storeself.generators=Noneself.data={}self.errors=Noneself.askerror=askerrorifcatcherrorsisNone:catcherrors=askerrorself.catcherrors=catcherrorsself.commitevery=commitevery# set to None to do a single commitself._tell=telldefcheck(self,type,key,value):self._checks.setdefault(type,{}).setdefault(key,[]).append(value)defcheck_map(self,entity,key,map,default):try:entity[key]=map[entity[key]]exceptKeyError:self.check(key,entity[key],None)entity[key]=defaultdefrecord_error(self,key,msg=None,type=None,value=None,tb=None):tmp=StringIO()iftypeisNone:traceback.print_exc(file=tmp)else:traceback.print_exception(type,value,tb,file=tmp)printtmp.getvalue()# use a list to avoid counting a <nb lines> errors instead of oneerrorlog=self.errors.setdefault(key,[])ifmsgisNone:errorlog.append(tmp.getvalue().splitlines())else:errorlog.append((msg,tmp.getvalue().splitlines()))defrun(self):self.errors={}forfunc,checksinself.generators:self._checks={}func_name=func.__name__[4:]# XXXself.tell("Import '%s'..."%func_name)try:func(self)except:ifself.catcherrors:self.record_error(func_name,'While calling %s'%func.__name__)else:raiseforkey,func,title,helpinchecks:buckets=self._checks.get(key)ifbuckets:err=func(buckets)iferr:self.errors[title]=(help,err)self.store.checkpoint()nberrors=sum(len(err[1])forerrinself.errors.values())self.tell('\nImport completed: %i entities, %i types, %i relations and %i errors'%(len(self.store.eids),len(self.store.types),len(self.store.relations),nberrors))ifself.errors:ifself.askerror==2or(self.askerrorandconfirm('Display errors ?')):frompprintimportpformatforerrkey,errorinself.errors.items():self.tell("\n%s (%s): %d\n"%(error[0],errkey,len(error[1])))self.tell(pformat(sorted(error[1])))defget_data(self,key):returnself.data.get(key)defindex(self,name,key,value,unique=False):"""create a new index If unique is set to True, only first occurence will be kept not the following ones """ifunique:try:ifvalueinself.store.indexes[name][key]:returnexceptKeyError:# we're sure that one is the first occurence; so continue...passself.store.indexes.setdefault(name,{}).setdefault(key,[]).append(value)deftell(self,msg):self._tell(msg)defiter_and_commit(self,datakey):"""iter rows, triggering commit every self.commitevery iterations"""returncommit_every(self.commitevery,self.store,self.get_data(datakey))