# -*- coding: utf-8 -*-"""This module provides tools to import tabular data.:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licensesExample of use (run this with `cubicweb-ctl shell instance import-script.py`):.. sourcecode:: python from cubicweb.devtools.dataimport import * # define data generators GENERATORS = [] USERS = [('Prenom', 'firstname', ()), ('Nom', 'surname', ()), ('Identifiant', 'login', ()), ] def gen_users(ctl): for row in ctl.get_data('utilisateurs'): entity = mk_entity(row, USERS) entity['upassword'] = u'motdepasse' ctl.check('login', entity['login'], None) ctl.store.add('CWUser', entity) email = {'address': row['email']} ctl.store.add('EmailAddress', email) ctl.store.relate(entity['eid'], 'use_email', email['eid']) ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x':entity['eid']}) CHK = [('login', check_doubles, 'Utilisateurs Login', 'Deux utilisateurs ne devraient pas avoir le même login.'), ] GENERATORS.append( (gen_users, CHK) ) # create controller ctl = CWImportController(RQLObjectStore()) ctl.askerror = True ctl.generators = GENERATORS ctl.store._checkpoint = checkpoint ctl.store._rql = rql ctl.data['utilisateurs'] = lazytable(utf8csvreader(open('users.csv'))) # run ctl.run() sys.exit(0)"""__docformat__="restructuredtext en"importsys,csv,tracebackfromlogilab.commonimportshellutilsdefutf8csvreader(file,encoding='utf-8',separator=',',quote='"'):"""A csv reader that accepts files with any encoding and outputs unicode strings."""forrowincsv.reader(file,delimiter=separator,quotechar=quote):yield[item.decode(encoding)foriteminrow]deflazytable(reader):"""The first row is taken to be the header of the table and used to output a dict for each row of data. >>> data = lazytable(utf8csvreader(open(filename))) """header=reader.next()forrowinreader:yielddict(zip(header,row))deftell(msg):printmsg# base sanitizing functions #####defcapitalize_if_unicase(txt):iftxt.isupper()ortxt.islower():returntxt.capitalize()returntxtdefno_space(txt):returntxt.replace(' ','')defno_uspace(txt):returntxt.replace(u'\xa0','')defno_dash(txt):returntxt.replace('-','')defalldigits(txt):iftxt.isdigit():returntxtelse:returnu''defstrip(txt):returntxt.strip()# base checks #####defcheck_doubles(buckets):"""Extract the keys that have more than one item in their bucket."""return[(key,len(value))forkey,valueinbuckets.items()iflen(value)>1]# make entity helper #####defmk_entity(row,map):"""Return a dict made from sanitized mapped values. >>> row = {'myname': u'dupont'} >>> map = [('myname', u'name', (capitalize_if_unicase,))] >>> mk_entity(row, map) {'name': u'Dupont'} """res={}forsrc,dest,funcsinmap:res[dest]=row[src]forfuncinfuncs:res[dest]=func(res[dest])returnres# object storesclassObjectStore(object):"""Store objects in memory for faster testing. Will not enforce the constraints of the schema and hence will miss some problems. >>> store = ObjectStore() >>> user = {'login': 'johndoe'} >>> store.add('CWUser', user) >>> group = {'name': 'unknown'} >>> store.add('CWUser', group) >>> store.relate(user['eid'], 'in_group', group['eid']) """def__init__(self):self.items=[]self.eids={}self.types={}self.relations=set()self.indexes={}self._rql=Noneself._checkpoint=Nonedef_put(self,type,item):self.items.append(item)returnlen(self.items)-1defadd(self,type,item):assertisinstance(item,dict),'item is not a dict but a %s'%type(item)eid=item['eid']=self._put(type,item)self.eids[eid]=itemself.types.setdefault(type,[]).append(eid)defrelate(self,eid_from,rtype,eid_to):eids_valid=(eid_from<len(self.items)andeid_to<=len(self.items))asserteids_valid,'eid error %s%s'%(eid_from,eid_to)self.relations.add((eid_from,rtype,eid_to))defbuild_index(self,name,type,func):index={}foreidinself.types[type]:index.setdefault(func(self.eids[eid]),[]).append(eid)self.indexes[name]=indexdefget_many(self,name,key):returnself.indexes[name].get(key,[])defget_one(self,name,key):eids=self.indexes[name].get(key,[])assertlen(eids)==1,'expected a single one got %i'%len(eids)returneids[0]deffind(self,type,key,value):foridxinself.types[type]:item=self.items[idx]ifitem[key]==value:yielditemdefrql(self,query,args):ifself._rql:returnself._rql(query,args)defcheckpoint(self):ifself._checkpoint:self._checkpoint()classRQLObjectStore(ObjectStore):"""ObjectStore that works with an actual RQL repository."""def_put(self,type,item):query=('INSERT %s X: '%type)+', '.join(['X %s%%(%s)s'%(key,key)forkeyinitem])returnself.rql(query,item)[0][0]defrelate(self,eid_from,rtype,eid_to):query='SET X %s Y WHERE X eid %%(from)s, Y eid %%(to)s'%rtypeself.rql(query,{'from':int(eid_from),'to':int(eid_to)})self.relations.add((eid_from,rtype,eid_to))# import controller #####classCWImportController(object):"""Controller of the data import process. >>> ctl = CWImportController(store) >>> ctl.generators = list_of_data_generators >>> ctl.data = dict_of_data_tables >>> ctl.run() """def__init__(self,store):self.store=storeself.generators=Noneself.data={}self.errors=Noneself.askerror=Falseself._tell=telldefcheck(self,type,key,value):self._checks.setdefault(type,{}).setdefault(key,[]).append(value)defcheck_map(self,entity,key,map,default):try:entity[key]=map[entity[key]]exceptKeyError:self.check(key,entity[key],None)entity[key]=defaultdefrun(self):self.errors={}forfunc,checksinself.generators:self._checks={}func_name=func.__name__[4:]question='Importation de %s'%func_nameself.tell(question)try:func(self)except:importStringIOtmp=StringIO.StringIO()traceback.print_exc(file=tmp)printtmp.getvalue()self.errors[func_name]=('Erreur lors de la transformation',tmp.getvalue().splitlines())forkey,func,title,helpinchecks:buckets=self._checks.get(key)ifbuckets:err=func(buckets)iferr:self.errors[title]=(help,err)self.store.checkpoint()errors=sum(len(err[1])forerrinself.errors.values())self.tell('Importation terminée. (%i objets, %i types, %i relations et %i erreurs).'%(len(self.store.eids),len(self.store.types),len(self.store.relations),errors))ifself.errorsandself.askerrorandconfirm('Afficher les erreurs ?'):importpprintpprint.pprint(self.errors)defget_data(self,key):returnself.data.get(key)defindex(self,name,key,value):self.store.indexes.setdefault(name,{}).setdefault(key,[]).append(value)deftell(self,msg):self._tell(msg)defconfirm(question):"""A confirm function that asks for yes/no/abort and exits on abort."""answer=shellutils.ASK.ask(question,('Y','n','abort'),'Y')ifanswer=='abort':sys.exit(1)returnanswer=='Y'