# copyright 2010-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""datafeed sources: copy data from an external data stream into the systemdatabase"""importurllib2importStringIOfromos.pathimportexistsfromdatetimeimportdatetime,timedeltafrombase64importb64decodefromcookielibimportCookieJarfromlxmlimportetreefromcubicwebimportRegistryNotFound,ObjectNotFound,ValidationError,UnknownEidfromcubicweb.server.repositoryimportpreprocess_inlined_relationsfromcubicweb.server.sourcesimportAbstractSourcefromcubicweb.appobjectimportAppObjectclassDataFeedSource(AbstractSource):copy_based_source=Trueuse_cwuri_as_url=Trueoptions=(('synchronize',{'type':'yn','default':True,'help':('Is the repository responsible to automatically import ''content from this source? ''You should say yes unless you don\'t want this behaviour ''or if you use a multiple repositories setup, in which ''case you should say yes on one repository, no on others.'),'group':'datafeed-source','level':2,}),('synchronization-interval',{'type':'time','default':'5min','help':('Interval in seconds between synchronization with the ''external source (default to 5 minutes, must be >= 1 min).'),'group':'datafeed-source','level':2,}),('max-lock-lifetime',{'type':'time','default':'1h','help':('Maximum time allowed for a synchronization to be run. ''Exceeded that time, the synchronization will be considered ''as having failed and not properly released the lock, hence ''it won\'t be considered'),'group':'datafeed-source','level':2,}),('delete-entities',{'type':'yn','default':False,'help':('Should already imported entities not found anymore on the ''external source be deleted?'),'group':'datafeed-source','level':2,}),('logs-lifetime',{'type':'time','default':'10d','help':('Time before logs from datafeed imports are deleted.'),'group':'datafeed-source','level':2,}),('http-timeout',{'type':'time','default':'1min','help':('Timeout of HTTP GET requests, when synchronizing a source.'),'group':'datafeed-source','level':2,}),)defcheck_config(self,source_entity):"""check configuration of source entity"""typed_config=super(DataFeedSource,self).check_config(source_entity)iftyped_config['synchronization-interval']<60:_=source_entity._cw._msg=_('synchronization-interval must be greater than 1 minute')raiseValidationError(source_entity.eid,{'config':msg})returntyped_configdef_entity_update(self,source_entity):super(DataFeedSource,self)._entity_update(source_entity)self.parser_id=source_entity.parserself.latest_retrieval=source_entity.latest_retrievaldefupdate_config(self,source_entity,typed_config):"""update configuration from source entity. `typed_config` is config properly typed with defaults set """super(DataFeedSource,self).update_config(source_entity,typed_config)self.synchro_interval=timedelta(seconds=typed_config['synchronization-interval'])self.max_lock_lifetime=timedelta(seconds=typed_config['max-lock-lifetime'])self.http_timeout=typed_config['http-timeout']definit(self,activated,source_entity):super(DataFeedSource,self).init(activated,source_entity)self.parser_id=source_entity.parserself.load_mapping(source_entity._cw)def_get_parser(self,session,**kwargs):returnself.repo.vreg['parsers'].select(self.parser_id,session,source=self,**kwargs)defload_mapping(self,session):self.mapping={}self.mapping_idx={}try:parser=self._get_parser(session)except(RegistryNotFound,ObjectNotFound):return# no parser yet, don't go furtherself._load_mapping(session,parser=parser)defadd_schema_config(self,schemacfg,checkonly=False,parser=None):"""added CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.add_schema_config(schemacfg,checkonly)defdel_schema_config(self,schemacfg,checkonly=False,parser=None):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.del_schema_config(schemacfg,checkonly)deffresh(self):ifself.latest_retrievalisNone:returnFalsereturndatetime.utcnow()<(self.latest_retrieval+self.synchro_interval)defupdate_latest_retrieval(self,session):self.latest_retrieval=datetime.utcnow()session.set_cnxset()session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',{'x':self.eid,'date':self.latest_retrieval})session.commit()defacquire_synchronization_lock(self,session):# XXX race condition until WHERE of SET queries is executed using# 'SELECT FOR UPDATE'now=datetime.utcnow()session.set_cnxset()ifnotsession.execute('SET X in_synchronization %(now)s WHERE X eid %(x)s, ''X in_synchronization NULL OR X in_synchronization < %(maxdt)s',{'x':self.eid,'now':now,'maxdt':now-self.max_lock_lifetime}):self.error('concurrent synchronization detected, skip pull')session.commit()returnFalsesession.commit()returnTruedefrelease_synchronization_lock(self,session):session.set_cnxset()session.execute('SET X in_synchronization NULL WHERE X eid %(x)s',{'x':self.eid})session.commit()defpull_data(self,session,force=False,raise_on_error=False):"""Launch synchronization of the source if needed. This method is responsible to handle commit/rollback on the given session. """ifnotforceandself.fresh():return{}ifnotself.acquire_synchronization_lock(session):return{}try:withsession.transaction(free_cnxset=False):returnself._pull_data(session,force,raise_on_error)finally:self.release_synchronization_lock(session)def_pull_data(self,session,force=False,raise_on_error=False):importlog=self.init_import_log(session)myuris=self.source_cwuris(session)parser=self._get_parser(session,sourceuris=myuris,import_log=importlog)ifself.process_urls(parser,self.urls,raise_on_error):self.warning("some error occurred, don't attempt to delete entities")else:parser.handle_deletion(self.config,session,myuris)self.update_latest_retrieval(session)stats=parser.statsifstats.get('created'):importlog.record_info('added %s entities'%len(stats['created']))ifstats.get('updated'):importlog.record_info('updated %s entities'%len(stats['updated']))session.set_cnxset()importlog.write_log(session,end_timestamp=self.latest_retrieval)session.commit()returnstatsdefprocess_urls(self,parser,urls,raise_on_error=False):error=Falseforurlinurls:self.info('pulling data from %s',url)try:ifparser.process(url,raise_on_error):error=TrueexceptIOErrorasexc:ifraise_on_error:raiseparser.import_log.record_error('could not pull data while processing %s: %s'%(url,exc))error=TrueexceptExceptionasexc:ifraise_on_error:raiseself.exception('error while processing %s: %s',url,exc)error=Truereturnerrordefbefore_entity_insertion(self,session,lid,etype,eid,sourceparams):"""called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. This method must return the an Entity instance representation of this entity. """entity=super(DataFeedSource,self).before_entity_insertion(session,lid,etype,eid,sourceparams)entity.cw_edited['cwuri']=lid.decode('utf-8')entity.cw_edited.set_defaults()sourceparams['parser'].before_entity_copy(entity,sourceparams)returnentitydefafter_entity_insertion(self,session,lid,entity,sourceparams):"""called by the repository after an entity stored here has been inserted in the system table. """relations=preprocess_inlined_relations(session,entity)ifsession.is_hook_category_activated('integrity'):entity.cw_edited.check(creation=True)self.repo.system_source.add_entity(session,entity)entity.cw_edited.saved=entity._cw_is_saved=Truesourceparams['parser'].after_entity_copy(entity,sourceparams)# call hooks for inlined relationscall_hooks=self.repo.hm.call_hooksifself.should_call_hooks:forattr,valueinrelations:call_hooks('before_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)call_hooks('after_add_relation',session,eidfrom=entity.eid,rtype=attr,eidto=value)defsource_cwuris(self,session):sql=('SELECT extid, eid, type FROM entities, cw_source_relation ''WHERE entities.eid=cw_source_relation.eid_from ''AND cw_source_relation.eid_to=%s'%self.eid)returndict((b64decode(uri),(eid,type))foruri,eid,typeinsession.system_sql(sql).fetchall())definit_import_log(self,session,**kwargs):dataimport=session.create_entity('CWDataImport',cw_import_of=self,start_timestamp=datetime.utcnow(),**kwargs)dataimport.init()returndataimportclassDataFeedParser(AppObject):__registry__='parsers'def__init__(self,session,source,sourceuris=None,import_log=None,**kwargs):super(DataFeedParser,self).__init__(session,**kwargs)self.source=sourceself.sourceuris=sourceurisself.import_log=import_logself.stats={'created':set(),'updated':set(),'checked':set()}defnormalize_url(self,url):fromcubicweb.sobjectsimportURL_MAPPING# available after registrationformappedurlinURL_MAPPING:ifurl.startswith(mappedurl):returnurl.replace(mappedurl,URL_MAPPING[mappedurl],1)returnurldefadd_schema_config(self,schemacfg,checkonly=False):"""added CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defdel_schema_config(self,schemacfg,checkonly=False):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defextid2entity(self,uri,etype,**sourceparams):"""return an entity for the given uri. May return None if it should be skipped """session=self._cw# if cwsource is specified and repository has a source with the same# name, call extid2eid on that source so entity will be properly seen as# coming from this sourcesource_uri=sourceparams.pop('cwsource',None)ifsource_uriisnotNoneandsource_uri!='system':source=session.repo.sources_by_uri.get(source_uri,self.source)else:source=self.sourcesourceparams['parser']=selfifisinstance(uri,unicode):uri=uri.encode('utf-8')try:eid=session.repo.extid2eid(source,str(uri),etype,session,complete=False,commit=False,sourceparams=sourceparams)exceptValidationErrorasex:# XXX use critical so they are seen during tests. Should consider# raise_on_error instead?self.source.critical('error while creating %s: %s',etype,ex)self.import_log.record_error('error while creating %s: %s'%(etype,ex))returnNoneifeid<0:# entity has been moved away from its original source## Don't give etype to entity_from_eid so we get UnknownEid if the# entity has been removedtry:entity=session.entity_from_eid(-eid)exceptUnknownEid:returnNoneself.notify_updated(entity)# avoid later update from the source's datareturnentityifself.sourceurisisnotNone:self.sourceuris.pop(str(uri),None)returnsession.entity_from_eid(eid,etype)defprocess(self,url,raise_on_error=False):"""main callback: process the url"""raiseNotImplementedErrordefbefore_entity_copy(self,entity,sourceparams):raiseNotImplementedErrordefafter_entity_copy(self,entity,sourceparams):self.stats['created'].add(entity.eid)defcreated_during_pull(self,entity):returnentity.eidinself.stats['created']defupdated_during_pull(self,entity):returnentity.eidinself.stats['updated']defnotify_updated(self,entity):returnself.stats['updated'].add(entity.eid)defnotify_checked(self,entity):returnself.stats['checked'].add(entity.eid)defis_deleted(self,extid,etype,eid):"""return True if the entity of given external id, entity type and eid is actually deleted. Always return True by default, put more sensible stuff in sub-classes. """returnTruedefhandle_deletion(self,config,session,myuris):ifconfig['delete-entities']andmyuris:byetype={}forextid,(eid,etype)inmyuris.iteritems():ifself.is_deleted(extid,etype,eid):byetype.setdefault(etype,[]).append(str(eid))foretype,eidsinbyetype.iteritems():self.warning('delete %s%s entities',len(eids),etype)session.set_cnxset()session.execute('DELETE %s X WHERE X eid IN (%s)'%(etype,','.join(eids)))session.commit()defupdate_if_necessary(self,entity,attrs):entity.complete(tuple(attrs))# check modification date and compare attribute values to only update# what's actually neededself.notify_checked(entity)mdate=attrs.get('modification_date')ifnotmdateormdate>entity.modification_date:attrs=dict((k,v)fork,vinattrs.iteritems()ifv!=getattr(entity,k))ifattrs:entity.cw_set(**attrs)self.notify_updated(entity)classDataFeedXMLParser(DataFeedParser):defprocess(self,url,raise_on_error=False):"""IDataFeedParser main entry point"""try:parsed=self.parse(url)exceptExceptionasex:ifraise_on_error:raiseself.import_log.record_error(str(ex))returnTrueerror=False# Check whether self._cw is a session or a connectionifgetattr(self._cw,'commit',None)isnotNone:commit=self._cw.commitset_cnxset=self._cw.set_cnxsetrollback=self._cw.rollbackelse:commit=self._cw.cnx.commitset_cnxset=lambda:Nonerollback=self._cw.cnx.rollbackforargsinparsed:try:self.process_item(*args)# commit+set_cnxset instead of commit(free_cnxset=False) to let# other a chance to get our connections setcommit()set_cnxset()exceptValidationErrorasexc:ifraise_on_error:raiseself.source.error('Skipping %s because of validation error %s'%(args,exc))rollback()set_cnxset()error=Truereturnerrordefparse(self,url):ifurl.startswith('http'):url=self.normalize_url(url)self.source.info('GET %s',url)stream=_OPENER.open(url,timeout=self.source.http_timeout)elifurl.startswith('file://'):stream=open(url[7:])else:stream=StringIO.StringIO(url)returnself.parse_etree(etree.parse(stream).getroot())defparse_etree(self,document):return[(document,)]defprocess_item(self,*args):raiseNotImplementedErrordefis_deleted(self,extid,etype,eid):ifextid.startswith('http'):try:_OPENER.open(self.normalize_url(extid),# XXX HTTP HEAD requesttimeout=self.source.http_timeout)excepturllib2.HTTPErrorasex:ifex.code==404:returnTrueelifextid.startswith('file://'):returnexists(extid[7:])returnFalse# use a cookie enabled opener to use session cookie if any_OPENER=urllib2.build_opener()try:fromlogilab.commonimporturllib2ext_OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())exceptImportError:# python-kerberos not availablepass_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))