process_posted yield field instead of field's name
# -*- coding: utf-8 -*-"""This module provides tools to import tabular data.:organization: Logilab:copyright: 2001-2009 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licensesExample of use (run this with `cubicweb-ctl shell instance import-script.py`):.. sourcecode:: python from cubicweb.devtools.dataimport import * # define data generators GENERATORS = [] USERS = [('Prenom', 'firstname', ()), ('Nom', 'surname', ()), ('Identifiant', 'login', ()), ] def gen_users(ctl): for row in ctl.get_data('utilisateurs'): entity = mk_entity(row, USERS) entity['upassword'] = u'motdepasse' ctl.check('login', entity['login'], None) ctl.store.add('CWUser', entity) email = {'address': row['email']} ctl.store.add('EmailAddress', email) ctl.store.relate(entity['eid'], 'use_email', email['eid']) ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x':entity['eid']}) CHK = [('login', check_doubles, 'Utilisateurs Login', 'Deux utilisateurs ne devraient pas avoir le même login.'), ] GENERATORS.append( (gen_users, CHK) ) # create controller ctl = CWImportController(RQLObjectStore()) ctl.askerror = True ctl.generators = GENERATORS ctl.store._checkpoint = checkpoint ctl.store._rql = rql ctl.data['utilisateurs'] = lazytable(utf8csvreader(open('users.csv'))) # run ctl.run() sys.exit(0)"""__docformat__="restructuredtext en"importsys,csv,tracebackfromlogilab.commonimportshellutilsfromlogilab.common.deprecationimportdeprecateddefucsvreader_pb(filepath,encoding='utf-8',separator=',',quote='"',skipfirst=False,withpb=True):"""same as ucsvreader but a progress bar is displayed as we iter on rows"""rowcount=int(shellutils.Execute('wc -l "%s"'%filepath).out.strip().split()[0])ifskipfirst:rowcount-=1ifwithpb:pb=shellutils.ProgressBar(rowcount,50)forurowinucsvreader(file(filepath),encoding,separator,quote,skipfirst):yieldurowifwithpb:pb.update()print' %s rows imported'%rowcountdefucsvreader(stream,encoding='utf-8',separator=',',quote='"',skipfirst=False):"""A csv reader that accepts files with any encoding and outputs unicode strings """it=iter(csv.reader(stream,delimiter=separator,quotechar=quote))ifskipfirst:it.next()forrowinit:yield[item.decode(encoding)foriteminrow]utf8csvreader=deprecated('use ucsvreader instead')(ucsvreader)defcommit_every(nbit,store,it):fori,xinenumerate(it):yieldxifnbitisnotNoneandi%nbit:store.checkpoint()ifnbitisnotNone:store.checkpoint()deflazytable(reader):"""The first row is taken to be the header of the table and used to output a dict for each row of data. >>> data = lazytable(utf8csvreader(open(filename))) """header=reader.next()forrowinreader:yielddict(zip(header,row))defmk_entity(row,map):"""Return a dict made from sanitized mapped values. >>> row = {'myname': u'dupont'} >>> map = [('myname', u'name', (capitalize_if_unicase,))] >>> mk_entity(row, map) {'name': u'Dupont'} """res={}forsrc,dest,funcsinmap:res[dest]=row[src]forfuncinfuncs:res[dest]=func(res[dest])returnres# user interactions ############################################################deftell(msg):printmsgdefconfirm(question):"""A confirm function that asks for yes/no/abort and exits on abort."""answer=shellutils.ASK.ask(question,('Y','n','abort'),'Y')ifanswer=='abort':sys.exit(1)returnanswer=='Y'classcatch_error(object):"""Helper for @contextmanager decorator."""def__init__(self,ctl,key='unexpected error',msg=None):self.ctl=ctlself.key=keyself.msg=msgdef__enter__(self):returnselfdef__exit__(self,type,value,traceback):iftypeisnotNone:ifissubclass(type,(KeyboardInterrupt,SystemExit)):return# re-raiseifself.ctl.catcherrors:self.ctl.record_error(self.key,msg)returnTrue# silent# base sanitizing functions ####################################################defcapitalize_if_unicase(txt):iftxt.isupper()ortxt.islower():returntxt.capitalize()returntxtdefno_space(txt):returntxt.replace(' ','')defno_uspace(txt):returntxt.replace(u'\xa0','')defno_dash(txt):returntxt.replace('-','')defalldigits(txt):iftxt.isdigit():returntxtelse:returnu''defstrip(txt):returntxt.strip()# base integrity checking functions ############################################defcheck_doubles(buckets):"""Extract the keys that have more than one item in their bucket."""return[(key,len(value))forkey,valueinbuckets.items()iflen(value)>1]defcheck_doubles_not_none(buckets):"""Extract the keys that have more than one item in their bucket."""return[(key,len(value))forkey,valueinbuckets.items()ifkeyisnotNoneandlen(value)>1]# object stores #################################################################classObjectStore(object):"""Store objects in memory for faster testing. Will not enforce the constraints of the schema and hence will miss some problems. >>> store = ObjectStore() >>> user = {'login': 'johndoe'} >>> store.add('CWUser', user) >>> group = {'name': 'unknown'} >>> store.add('CWUser', group) >>> store.relate(user['eid'], 'in_group', group['eid']) """def__init__(self):self.items=[]self.eids={}self.types={}self.relations=set()self.indexes={}self._rql=Noneself._checkpoint=Nonedef_put(self,type,item):self.items.append(item)returnlen(self.items)-1defadd(self,type,item):assertisinstance(item,dict),'item is not a dict but a %s'%type(item)eid=item['eid']=self._put(type,item)self.eids[eid]=itemself.types.setdefault(type,[]).append(eid)defrelate(self,eid_from,rtype,eid_to):eids_valid=(eid_from<len(self.items)andeid_to<=len(self.items))asserteids_valid,'eid error %s%s'%(eid_from,eid_to)self.relations.add((eid_from,rtype,eid_to))defbuild_index(self,name,type,func):index={}foreidinself.types[type]:index.setdefault(func(self.eids[eid]),[]).append(eid)self.indexes[name]=indexdefget_many(self,name,key):returnself.indexes[name].get(key,[])defget_one(self,name,key):eids=self.indexes[name].get(key,[])assertlen(eids)==1,'expected a single one got %i'%len(eids)returneids[0]deffind(self,type,key,value):foridxinself.types[type]:item=self.items[idx]ifitem[key]==value:yielditemdefcheckpoint(self):passclassRQLObjectStore(ObjectStore):"""ObjectStore that works with an actual RQL repository."""_rql=None# bw compatdef__init__(self,session=None,checkpoint=None):ObjectStore.__init__(self)ifsessionisnotNone:ifnothasattr(session,'set_pool'):# connectioncnx=sessionsession=session.request()session.set_pool=lambda:Nonecheckpoint=checkpointorcnx.commitself.session=sessionself.checkpoint=checkpointorsession.commitelifcheckpointisnotNone:self.checkpoint=checkpointdefrql(self,*args):ifself._rqlisnotNone:returnself._rql(*args)self.session.set_pool()returnself.session.execute(*args)defcreate_entity(self,*args,**kwargs):self.session.set_pool()entity=self.session.create_entity(*args,**kwargs)self.eids[entity.eid]=entityself.types.setdefault(args[0],[]).append(entity.eid)returnentitydef_put(self,type,item):query=('INSERT %s X: '%type)+', '.join(['X %s%%(%s)s'%(key,key)forkeyinitem])returnself.rql(query,item)[0][0]defrelate(self,eid_from,rtype,eid_to):self.rql('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':int(eid_from),'y':int(eid_to)},('x','y'))self.relations.add((eid_from,rtype,eid_to))# the import controller ########################################################classCWImportController(object):"""Controller of the data import process. >>> ctl = CWImportController(store) >>> ctl.generators = list_of_data_generators >>> ctl.data = dict_of_data_tables >>> ctl.run() """def__init__(self,store,askerror=False,catcherrors=None,tell=tell,commitevery=50):self.store=storeself.generators=Noneself.data={}self.errors=Noneself.askerror=askerrorifcatcherrorsisNone:catcherrors=askerrorself.catcherrors=catcherrorsself.commitevery=commitevery# set to None to do a single commitself._tell=telldefcheck(self,type,key,value):self._checks.setdefault(type,{}).setdefault(key,[]).append(value)defcheck_map(self,entity,key,map,default):try:entity[key]=map[entity[key]]exceptKeyError:self.check(key,entity[key],None)entity[key]=defaultdefrecord_error(self,key,msg=None,type=None,value=None,tb=None):importStringIOtmp=StringIO.StringIO()iftypeisNone:traceback.print_exc(file=tmp)else:traceback.print_exception(type,value,tb,file=tmp)printtmp.getvalue()# use a list to avoid counting a <nb lines> errors instead of oneerrorlog=self.errors.setdefault(key,[])ifmsgisNone:errorlog.append(tmp.getvalue().splitlines())else:errorlog.append((msg,tmp.getvalue().splitlines()))defrun(self):self.errors={}forfunc,checksinself.generators:self._checks={}func_name=func.__name__[4:]# XXXself.tell('Importing %s'%func_name)try:func(self)except:ifself.catcherrors:self.record_error(func_name,'While calling %s'%func.__name__)else:raiseforkey,func,title,helpinchecks:buckets=self._checks.get(key)ifbuckets:err=func(buckets)iferr:self.errors[title]=(help,err)self.store.checkpoint()self.tell('\nImport completed: %i entities (%i types), %i relations'%(len(self.store.eids),len(self.store.types),len(self.store.relations)))nberrors=sum(len(err[1])forerrinself.errors.values())ifnberrors:print'%s errors'%nberrorsifself.errorsandself.askerrorandconfirm('Display errors?'):importpprintpprint.pprint(self.errors)defget_data(self,key):returnself.data.get(key)defindex(self,name,key,value):self.store.indexes.setdefault(name,{}).setdefault(key,[]).append(value)deftell(self,msg):self._tell(msg)defiter_and_commit(self,datakey):"""iter rows, triggering commit every self.commitevery iterations"""returncommit_every(self.commitevery,self.store,self.get_data(datakey))