[datafeed] propagate raise_on_error to parser's process method
# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""datafeed sources: copy data from an external data stream into the systemdatabase"""importurllib2importStringIOfromdatetimeimportdatetime,timedeltafrombase64importb64decodefromcookielibimportCookieJarfromlxmlimportetreefromcubicwebimportRegistryNotFound,ObjectNotFound,ValidationErrorfromcubicweb.server.sourcesimportAbstractSourcefromcubicweb.appobjectimportAppObjectclassDataFeedSource(AbstractSource):copy_based_source=Trueoptions=(('synchronize',{'type':'yn','default':True,'help':('Is the repository responsible to automatically import ''content from this source? ''You should say yes unless you don\'t want this behaviour ''or if you use a multiple repositories setup, in which ''case you should say yes on one repository, no on others.'),'group':'datafeed-source','level':2,}),('synchronization-interval',{'type':'time','default':'5min','help':('Interval in seconds between synchronization with the ''external source (default to 5 minutes, must be >= 1 min).'),'group':'datafeed-source','level':2,}),('delete-entities',{'type':'yn','default':True,'help':('Should already imported entities not found anymore on the ''external source be deleted?'),'group':'datafeed-source','level':2,}),)def__init__(self,repo,source_config,eid=None):AbstractSource.__init__(self,repo,source_config,eid)self.update_config(None,self.check_conf_dict(eid,source_config))defcheck_config(self,source_entity):"""check configuration of source entity"""typedconfig=super(DataFeedSource,self).check_config(source_entity)iftypedconfig['synchronization-interval']<60:_=source_entity._cw._msg=_('synchronization-interval must be greater than 1 minute')raiseValidationError(source_entity.eid,{'config':msg})returntypedconfigdef_entity_update(self,source_entity):source_entity.complete()self.parser=source_entity.parserself.latest_retrieval=source_entity.latest_retrievalself.urls=[url.strip()forurlinsource_entity.url.splitlines()ifurl.strip()]defupdate_config(self,source_entity,typedconfig):"""update configuration from source entity. `typedconfig` is config properly typed with defaults set """self.synchro_interval=timedelta(seconds=typedconfig['synchronization-interval'])ifsource_entityisnotNone:self._entity_update(source_entity)self.config=typedconfigdefinit(self,activated,source_entity):ifactivated:self._entity_update(source_entity)self.parser=source_entity.parserself.load_mapping(source_entity._cw)def_get_parser(self,session,**kwargs):returnself.repo.vreg['parsers'].select(self.parser,session,source=self,**kwargs)defload_mapping(self,session):self.mapping={}self.mapping_idx={}try:parser=self._get_parser(session)except(RegistryNotFound,ObjectNotFound):return# no parser yet, don't go furtherself._load_mapping(session,parser=parser)defadd_schema_config(self,schemacfg,checkonly=False,parser=None):"""added CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.add_schema_config(schemacfg,checkonly)defdel_schema_config(self,schemacfg,checkonly=False,parser=None):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.del_schema_config(schemacfg,checkonly)deffresh(self):ifself.latest_retrievalisNone:returnFalsereturndatetime.now()<(self.latest_retrieval+self.synchro_interval)defpull_data(self,session,force=False,raise_on_error=False):ifnotforceandself.fresh():return{}ifself.config['delete-entities']:myuris=self.source_cwuris(session)else:myuris=Noneparser=self._get_parser(session,sourceuris=myuris)error=Falseself.info('pulling data for source %s',self.uri)forurlinself.urls:try:ifparser.process(url,raise_on_error):error=TrueexceptIOError,exc:ifraise_on_error:raiseself.error('could not pull data while processing %s: %s',url,exc)error=Trueiferror:self.warning("some error occured, don't attempt to delete entities")elifself.config['delete-entities']andmyuris:byetype={}foreid,etypeinmyuris.values():byetype.setdefault(etype,[]).append(str(eid))self.error('delete %s entities %s',self.uri,byetype)foretype,eidsinbyetype.iteritems():session.execute('DELETE %s X WHERE X eid IN (%s)'%(etype,','.join(eids)))self.latest_retrieval=datetime.now()session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',{'x':self.eid,'date':self.latest_retrieval})returnparser.statsdefbefore_entity_insertion(self,session,lid,etype,eid,sourceparams):"""called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. This method must return the an Entity instance representation of this entity. """entity=super(DataFeedSource,self).before_entity_insertion(session,lid,etype,eid,sourceparams)entity.cw_edited['cwuri']=unicode(lid)entity.cw_edited.set_defaults()sourceparams['parser'].before_entity_copy(entity,sourceparams)# avoid query to search full-text indexed attributesforattrinentity.e_schema.indexable_attributes():entity.cw_edited.setdefault(attr,u'')returnentitydefafter_entity_insertion(self,session,lid,entity,sourceparams):"""called by the repository after an entity stored here has been inserted in the system table. """ifsession.is_hook_category_activated('integrity'):entity.cw_edited.check(creation=True)self.repo.system_source.add_entity(session,entity)entity.cw_edited.saved=entity._cw_is_saved=Truesourceparams['parser'].after_entity_copy(entity,sourceparams)defsource_cwuris(self,session):sql=('SELECT extid, eid, type FROM entities, cw_source_relation ''WHERE entities.eid=cw_source_relation.eid_from ''AND cw_source_relation.eid_to=%s'%self.eid)returndict((b64decode(uri),(eid,type))foruri,eid,typeinsession.system_sql(sql))classDataFeedParser(AppObject):__registry__='parsers'def__init__(self,session,source,sourceuris=None):self._cw=sessionself.source=sourceself.sourceuris=sourceurisself.stats={'created':set(),'updated':set()}defadd_schema_config(self,schemacfg,checkonly=False):"""added CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defdel_schema_config(self,schemacfg,checkonly=False):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defextid2entity(self,uri,etype,**sourceparams):sourceparams['parser']=selfeid=self.source.extid2eid(str(uri),etype,self._cw,sourceparams=sourceparams)ifself.sourceurisisnotNone:self.sourceuris.pop(str(uri),None)returnself._cw.entity_from_eid(eid,etype)defprocess(self,url,partialcommit=True):"""main callback: process the url"""raiseNotImplementedErrordefbefore_entity_copy(self,entity,sourceparams):raiseNotImplementedErrordefafter_entity_copy(self,entity,sourceparams):self.stats['created'].add(entity.eid)defcreated_during_pull(self,entity):returnentity.eidinself.stats['created']defupdated_during_pull(self,entity):returnentity.eidinself.stats['updated']defnotify_updated(self,entity):returnself.stats['updated'].add(entity.eid)classDataFeedXMLParser(DataFeedParser):defprocess(self,url,raise_on_error=False,partialcommit=True):"""IDataFeedParser main entry point"""error=Falseforargsinself.parse(url):try:self.process_item(*args)ifpartialcommit:# commit+set_pool instead of commit(reset_pool=False) to let# other a chance to get our poolself._cw.commit()self._cw.set_pool()exceptValidationError,exc:ifraise_on_error:raiseifpartialcommit:self.source.error('Skipping %s because of validation error %s'%(args,exc))self._cw.rollback()self._cw.set_pool()error=Trueelse:raisereturnerrordefparse(self,url):ifurl.startswith('http'):fromcubicweb.sobjects.parsersimportHOST_MAPPINGformappedurlinHOST_MAPPING:ifurl.startswith(mappedurl):url=url.replace(mappedurl,HOST_MAPPING[mappedurl],1)breakself.source.info('GET %s',url)stream=_OPENER.open(url)elifurl.startswith('file://'):stream=open(url[7:])else:stream=StringIO.StringIO(url)returnself.parse_etree(etree.parse(stream).getroot())defparse_etree(self,document):return[(document,)]defprocess_item(self,*args):raiseNotImplementedError# use a cookie enabled opener to use session cookie if any_OPENER=urllib2.build_opener()try:fromlogilab.commonimporturllib2ext_OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())exceptImportError:# python-kerberos not availablepass_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))