# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Old and deprecated dataimport API that provides tools to import tabular data.Example of use (run this with `cubicweb-ctl shell instance import-script.py`):.. sourcecode:: python from cubicweb.dataimport import * # define data generators GENERATORS = [] USERS = [('Prenom', 'firstname', ()), ('Nom', 'surname', ()), ('Identifiant', 'login', ()), ] def gen_users(ctl): for row in ctl.iter_and_commit('utilisateurs'): entity = mk_entity(row, USERS) entity['upassword'] = 'motdepasse' ctl.check('login', entity['login'], None) entity = ctl.store.prepare_insert_entity('CWUser', **entity) email = ctl.store.prepare_insert_entity('EmailAddress', address=row['email']) ctl.store.prepare_insert_relation(entity, 'use_email', email) ctl.store.rql('SET U in_group G WHERE G name "users", U eid %(x)s', {'x': entity}) CHK = [('login', check_doubles, 'Utilisateurs Login', 'Deux utilisateurs ne devraient pas avoir le meme login.'), ] GENERATORS.append( (gen_users, CHK) ) # create controller ctl = CWImportController(RQLObjectStore(cnx)) ctl.askerror = 1 ctl.generators = GENERATORS ctl.data['utilisateurs'] = lazytable(ucsvreader(open('users.csv'))) # run ctl.run().. BUG file with one column are not parsable.. TODO rollback() invocation is not possible yet"""from__future__importprint_functionimportsysimporttracebackfromStringIOimportStringIOfromsiximportadd_metaclassfromlogilab.commonimportattrdict,shellutilsfromlogilab.common.dateimportstrptimefromlogilab.common.deprecationimportdeprecated,class_deprecatedfromcubicwebimportQueryErrorfromcubicweb.dataimportimportcallfunc_every@deprecated('[3.21] deprecated')deflazytable(reader):"""The first row is taken to be the header of the table and used to output a dict for each row of data. >>> data = lazytable(ucsvreader(open(filename))) """header=next(reader)forrowinreader:yielddict(zip(header,row))@deprecated('[3.21] deprecated')deflazydbtable(cu,table,headers,orderby=None):"""return an iterator on rows of a sql table. On each row, fetch columns defined in headers and return values as a dictionary. >>> data = lazydbtable(cu, 'experimentation', ('id', 'nickname', 'gps')) """sql='SELECT %s FROM %s'%(','.join(headers),table,)iforderby:sql+=' ORDER BY %s'%','.join(orderby)cu.execute(sql)whileTrue:row=cu.fetchone()ifrowisNone:breakyielddict(zip(headers,row))@deprecated('[3.21] deprecated')deftell(msg):print(msg)@deprecated('[3.21] deprecated')defconfirm(question):"""A confirm function that asks for yes/no/abort and exits on abort."""answer=shellutils.ASK.ask(question,('Y','n','abort'),'Y')ifanswer=='abort':sys.exit(1)returnanswer=='Y'@add_metaclass(class_deprecated)classcatch_error(object):"""Helper for @contextmanager decorator."""__deprecation_warning__='[3.21] deprecated'def__init__(self,ctl,key='unexpected error',msg=None):self.ctl=ctlself.key=keyself.msg=msgdef__enter__(self):returnselfdef__exit__(self,type,value,traceback):iftypeisnotNone:ifissubclass(type,(KeyboardInterrupt,SystemExit)):return# re-raiseifself.ctl.catcherrors:self.ctl.record_error(self.key,None,type,value,traceback)returnTrue# silent@deprecated('[3.21] deprecated')defmk_entity(row,map):"""Return a dict made from sanitized mapped values. ValueError can be raised on unexpected values found in checkers >>> row = {'myname': u'dupont'} >>> map = [('myname', u'name', (call_transform_method('title'),))] >>> mk_entity(row, map) {'name': u'Dupont'} >>> row = {'myname': u'dupont', 'optname': u''} >>> map = [('myname', u'name', (call_transform_method('title'),)), ... ('optname', u'MARKER', (optional,))] >>> mk_entity(row, map) {'name': u'Dupont', 'optname': None} """res={}assertisinstance(row,dict)assertisinstance(map,list)forsrc,dest,funcsinmap:try:res[dest]=row[src]exceptKeyError:continuetry:forfuncinfuncs:res[dest]=func(res[dest])ifres[dest]isNone:breakexceptValueErroraserr:exc=ValueError('error with %r field: %s'%(src,err))exc.__traceback__=sys.exc_info()[-1]raiseexcreturnres# base sanitizing/coercing functions ###########################################@deprecated('[3.21] deprecated')defoptional(value):"""checker to filter optional field If value is undefined (ex: empty string), return None that will break the checkers validation chain General use is to add 'optional' check in first condition to avoid ValueError by further checkers >>> MAPPER = [(u'value', 'value', (optional, int))] >>> row = {'value': u'XXX'} >>> mk_entity(row, MAPPER) {'value': None} >>> row = {'value': u'100'} >>> mk_entity(row, MAPPER) {'value': 100} """ifvalue:returnvaluereturnNone@deprecated('[3.21] deprecated')defrequired(value):"""raise ValueError if value is empty This check should be often found in last position in the chain. """ifvalue:returnvalueraiseValueError("required")@deprecated('[3.21] deprecated')deftodatetime(format='%d/%m/%Y'):"""return a transformation function to turn string input value into a `datetime.datetime` instance, using given format. Follow it by `todate` or `totime` functions from `logilab.common.date` if you want a `date`/`time` instance instead of `datetime`. """defcoerce(value):returnstrptime(value,format)returncoerce@deprecated('[3.21] deprecated')defcall_transform_method(methodname,*args,**kwargs):"""return value returned by calling the given method on input"""defcoerce(value):returngetattr(value,methodname)(*args,**kwargs)returncoerce@deprecated('[3.21] deprecated')defcall_check_method(methodname,*args,**kwargs):"""check value returned by calling the given method on input is true, else raise ValueError """defcheck(value):ifgetattr(value,methodname)(*args,**kwargs):returnvalueraiseValueError('%s not verified on %r'%(methodname,value))returncheck# base integrity checking functions ############################################@deprecated('[3.21] deprecated')defcheck_doubles(buckets):"""Extract the keys that have more than one item in their bucket."""return[(k,len(v))fork,vinbuckets.items()iflen(v)>1]@deprecated('[3.21] deprecated')defcheck_doubles_not_none(buckets):"""Extract the keys that have more than one item in their bucket."""return[(k,len(v))fork,vinbuckets.items()ifkisnotNoneandlen(v)>1]@add_metaclass(class_deprecated)classObjectStore(object):"""Store objects in memory for *faster* validation (development mode) But it will not enforce the constraints of the schema and hence will miss some problems >>> store = ObjectStore() >>> user = store.prepare_insert_entity('CWUser', login=u'johndoe') >>> group = store.prepare_insert_entity('CWUser', name=u'unknown') >>> store.prepare_insert_relation(user, 'in_group', group) """__deprecation_warning__='[3.21] use the new importer API'def__init__(self):self.items=[]self.eids={}self.types={}self.relations=set()self.indexes={}defprepare_insert_entity(self,etype,**data):"""Given an entity type, attributes and inlined relations, return an eid for the entity that would be inserted with a real store. """data=attrdict(data)data['eid']=eid=len(self.items)self.items.append(data)self.eids[eid]=dataself.types.setdefault(etype,[]).append(eid)returneiddefprepare_update_entity(self,etype,eid,**kwargs):"""Given an entity type and eid, updates the corresponding fake entity with specified attributes and inlined relations. """asserteidinself.types[etype],'Trying to update with wrong type {}'.format(etype)data=self.eids[eid]data.update(kwargs)defprepare_insert_relation(self,eid_from,rtype,eid_to,**kwargs):"""Store into the `relations` attribute that a relation ``rtype`` exists between entities with eids ``eid_from`` and ``eid_to``. """relation=eid_from,rtype,eid_toself.relations.add(relation)returnrelationdefflush(self):"""Nothing to flush for this store."""passdefcommit(self):"""Nothing to commit for this store."""returndeffinish(self):"""Nothing to do once import is terminated for this store."""pass@propertydefnb_inserted_entities(self):returnlen(self.eids)@propertydefnb_inserted_types(self):returnlen(self.types)@propertydefnb_inserted_relations(self):returnlen(self.relations)@deprecated('[3.21] use prepare_insert_entity instead')defcreate_entity(self,etype,**data):self.prepare_insert_entity(etype,**data)returnattrdict(data)@deprecated('[3.21] use prepare_insert_relation instead')defrelate(self,eid_from,rtype,eid_to,**kwargs):self.prepare_insert_relation(eid_from,rtype,eid_to,**kwargs)@add_metaclass(class_deprecated)classCWImportController(object):"""Controller of the data import process. >>> ctl = CWImportController(store) >>> ctl.generators = list_of_data_generators >>> ctl.data = dict_of_data_tables >>> ctl.run() """__deprecation_warning__='[3.21] use the new importer API'def__init__(self,store,askerror=0,catcherrors=None,tell=tell,commitevery=50):self.store=storeself.generators=Noneself.data={}self.errors=Noneself.askerror=askerrorifcatcherrorsisNone:catcherrors=askerrorself.catcherrors=catcherrorsself.commitevery=commitevery# set to None to do a single commitself._tell=telldefcheck(self,type,key,value):self._checks.setdefault(type,{}).setdefault(key,[]).append(value)defcheck_map(self,entity,key,map,default):try:entity[key]=map[entity[key]]exceptKeyError:self.check(key,entity[key],None)entity[key]=defaultdefrecord_error(self,key,msg=None,type=None,value=None,tb=None):tmp=StringIO()iftypeisNone:traceback.print_exc(file=tmp)else:traceback.print_exception(type,value,tb,file=tmp)# use a list to avoid counting a <nb lines> errors instead of oneerrorlog=self.errors.setdefault(key,[])ifmsgisNone:errorlog.append(tmp.getvalue().splitlines())else:errorlog.append((msg,tmp.getvalue().splitlines()))defrun(self):self.errors={}ifself.commiteveryisNone:self.tell('Will commit all or nothing.')else:self.tell('Will commit every %s iterations'%self.commitevery)forfunc,checksinself.generators:self._checks={}func_name=func.__name__self.tell("Run import function '%s'..."%func_name)try:func(self)exceptException:ifself.catcherrors:self.record_error(func_name,'While calling %s'%func.__name__)else:self._print_stats()raiseforkey,func,title,helpinchecks:buckets=self._checks.get(key)ifbuckets:err=func(buckets)iferr:self.errors[title]=(help,err)try:txuuid=self.store.commit()iftxuuidisnotNone:self.tell('Transaction commited (txuuid: %s)'%txuuid)exceptQueryErrorasex:self.tell('Transaction aborted: %s'%ex)self._print_stats()ifself.errors:ifself.askerror==2or(self.askerrorandconfirm('Display errors ?')):frompprintimportpformatforerrkey,errorinself.errors.items():self.tell("\n%s (%s): %d\n"%(error[0],errkey,len(error[1])))self.tell(pformat(sorted(error[1])))def_print_stats(self):nberrors=sum(len(err)forerrinself.errors.values())self.tell('\nImport statistics: %i entities, %i types, %i relations and %i errors'%(self.store.nb_inserted_entities,self.store.nb_inserted_types,self.store.nb_inserted_relations,nberrors))defget_data(self,key):returnself.data.get(key)defindex(self,name,key,value,unique=False):"""create a new index If unique is set to True, only first occurence will be kept not the following ones """ifunique:try:ifvalueinself.store.indexes[name][key]:returnexceptKeyError:# we're sure that one is the first occurence; so continue...passself.store.indexes.setdefault(name,{}).setdefault(key,[]).append(value)deftell(self,msg):self._tell(msg)defiter_and_commit(self,datakey):"""iter rows, triggering commit every self.commitevery iterations"""ifself.commiteveryisNone:returnself.get_data(datakey)else:returncallfunc_every(self.store.commit,self.commitevery,self.get_data(datakey))