[doc] mention datadir-url configuration option in release notes
Related to #5204550
# copyright 2010-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""datafeed sources: copy data from an external data stream into the systemdatabase"""importurllib2importStringIOfromos.pathimportexistsfromdatetimeimportdatetime,timedeltafrombase64importb64decodefromcookielibimportCookieJarfromlxmlimportetreefromcubicwebimportRegistryNotFound,ObjectNotFound,ValidationError,UnknownEidfromcubicweb.server.repositoryimportpreprocess_inlined_relationsfromcubicweb.server.sourcesimportAbstractSourcefromcubicweb.appobjectimportAppObjectclassDataFeedSource(AbstractSource):use_cwuri_as_url=Trueoptions=(('synchronize',{'type':'yn','default':True,'help':('Is the repository responsible to automatically import ''content from this source? ''You should say yes unless you don\'t want this behaviour ''or if you use a multiple repositories setup, in which ''case you should say yes on one repository, no on others.'),'group':'datafeed-source','level':2,}),('synchronization-interval',{'type':'time','default':'5min','help':('Interval in seconds between synchronization with the ''external source (default to 5 minutes, must be >= 1 min).'),'group':'datafeed-source','level':2,}),('max-lock-lifetime',{'type':'time','default':'1h','help':('Maximum time allowed for a synchronization to be run. ''Exceeded that time, the synchronization will be considered ''as having failed and not properly released the lock, hence ''it won\'t be considered'),'group':'datafeed-source','level':2,}),('delete-entities',{'type':'yn','default':False,'help':('Should already imported entities not found anymore on the ''external source be deleted?'),'group':'datafeed-source','level':2,}),('logs-lifetime',{'type':'time','default':'10d','help':('Time before logs from datafeed imports are deleted.'),'group':'datafeed-source','level':2,}),('http-timeout',{'type':'time','default':'1min','help':('Timeout of HTTP GET requests, when synchronizing a source.'),'group':'datafeed-source','level':2,}),('use-cwuri-as-url',{'type':'yn','default':None,# explicitly unset'help':('Use cwuri (i.e. external URL) for link to the entity ''instead of its local URL.'),'group':'datafeed-source','level':1,}),)defcheck_config(self,source_entity):"""check configuration of source entity"""typed_config=super(DataFeedSource,self).check_config(source_entity)iftyped_config['synchronization-interval']<60:_=source_entity._cw._msg=_('synchronization-interval must be greater than 1 minute')raiseValidationError(source_entity.eid,{'config':msg})returntyped_configdef_entity_update(self,source_entity):super(DataFeedSource,self)._entity_update(source_entity)self.parser_id=source_entity.parserself.latest_retrieval=source_entity.latest_retrievaldefupdate_config(self,source_entity,typed_config):"""update configuration from source entity. `typed_config` is config properly typed with defaults set """super(DataFeedSource,self).update_config(source_entity,typed_config)self.synchro_interval=timedelta(seconds=typed_config['synchronization-interval'])self.max_lock_lifetime=timedelta(seconds=typed_config['max-lock-lifetime'])self.http_timeout=typed_config['http-timeout']# if typed_config['use-cwuri-as-url'] is set, we have to update# use_cwuri_as_url attribute and public configuration dictionary# accordinglyiftyped_config['use-cwuri-as-url']isnotNone:self.use_cwuri_as_url=typed_config['use-cwuri-as-url']self.public_config['use-cwuri-as-url']=self.use_cwuri_as_urldefinit(self,activated,source_entity):super(DataFeedSource,self).init(activated,source_entity)self.parser_id=source_entity.parserself.load_mapping(source_entity._cw)def_get_parser(self,cnx,**kwargs):returnself.repo.vreg['parsers'].select(self.parser_id,cnx,source=self,**kwargs)defload_mapping(self,cnx):self.mapping={}self.mapping_idx={}try:parser=self._get_parser(cnx)except(RegistryNotFound,ObjectNotFound):return# no parser yet, don't go furtherself._load_mapping(cnx,parser=parser)defadd_schema_config(self,schemacfg,checkonly=False,parser=None):"""added CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.add_schema_config(schemacfg,checkonly)defdel_schema_config(self,schemacfg,checkonly=False,parser=None):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.del_schema_config(schemacfg,checkonly)deffresh(self):ifself.latest_retrievalisNone:returnFalsereturndatetime.utcnow()<(self.latest_retrieval+self.synchro_interval)defupdate_latest_retrieval(self,cnx):self.latest_retrieval=datetime.utcnow()cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',{'x':self.eid,'date':self.latest_retrieval})cnx.commit()defacquire_synchronization_lock(self,cnx):# XXX race condition until WHERE of SET queries is executed using# 'SELECT FOR UPDATE'now=datetime.utcnow()ifnotcnx.execute('SET X in_synchronization %(now)s WHERE X eid %(x)s, ''X in_synchronization NULL OR X in_synchronization < %(maxdt)s',{'x':self.eid,'now':now,'maxdt':now-self.max_lock_lifetime}):self.error('concurrent synchronization detected, skip pull')cnx.commit()returnFalsecnx.commit()returnTruedefrelease_synchronization_lock(self,cnx):cnx.execute('SET X in_synchronization NULL WHERE X eid %(x)s',{'x':self.eid})cnx.commit()defpull_data(self,cnx,force=False,raise_on_error=False):"""Launch synchronization of the source if needed. This method is responsible to handle commit/rollback on the given connection. """ifnotforceandself.fresh():return{}ifnotself.acquire_synchronization_lock(cnx):return{}try:returnself._pull_data(cnx,force,raise_on_error)finally:cnx.rollback()# rollback first in case there is some dirty# transaction remainingself.release_synchronization_lock(cnx)def_pull_data(self,cnx,force=False,raise_on_error=False):importlog=self.init_import_log(cnx)myuris=self.source_cwuris(cnx)parser=self._get_parser(cnx,sourceuris=myuris,import_log=importlog)ifself.process_urls(parser,self.urls,raise_on_error):self.warning("some error occurred, don't attempt to delete entities")else:parser.handle_deletion(self.config,cnx,myuris)self.update_latest_retrieval(cnx)stats=parser.statsifstats.get('created'):importlog.record_info('added %s entities'%len(stats['created']))ifstats.get('updated'):importlog.record_info('updated %s entities'%len(stats['updated']))importlog.write_log(cnx,end_timestamp=self.latest_retrieval)cnx.commit()returnstatsdefprocess_urls(self,parser,urls,raise_on_error=False):error=Falseforurlinurls:self.info('pulling data from %s',url)try:ifparser.process(url,raise_on_error):error=TrueexceptIOErrorasexc:ifraise_on_error:raiseparser.import_log.record_error('could not pull data while processing %s: %s'%(url,exc))error=TrueexceptExceptionasexc:ifraise_on_error:raiseself.exception('error while processing %s: %s',url,exc)error=Truereturnerrordefbefore_entity_insertion(self,cnx,lid,etype,eid,sourceparams):"""called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. This method must return the an Entity instance representation of this entity. """entity=super(DataFeedSource,self).before_entity_insertion(cnx,lid,etype,eid,sourceparams)entity.cw_edited['cwuri']=lid.decode('utf-8')entity.cw_edited.set_defaults()sourceparams['parser'].before_entity_copy(entity,sourceparams)returnentitydefafter_entity_insertion(self,cnx,lid,entity,sourceparams):"""called by the repository after an entity stored here has been inserted in the system table. """relations=preprocess_inlined_relations(cnx,entity)ifcnx.is_hook_category_activated('integrity'):entity.cw_edited.check(creation=True)self.repo.system_source.add_entity(cnx,entity)entity.cw_edited.saved=entity._cw_is_saved=Truesourceparams['parser'].after_entity_copy(entity,sourceparams)# call hooks for inlined relationscall_hooks=self.repo.hm.call_hooksifself.should_call_hooks:forattr,valueinrelations:call_hooks('before_add_relation',cnx,eidfrom=entity.eid,rtype=attr,eidto=value)call_hooks('after_add_relation',cnx,eidfrom=entity.eid,rtype=attr,eidto=value)defsource_cwuris(self,cnx):sql=('SELECT extid, eid, type FROM entities, cw_source_relation ''WHERE entities.eid=cw_source_relation.eid_from ''AND cw_source_relation.eid_to=%s'%self.eid)returndict((b64decode(uri),(eid,type))foruri,eid,typeincnx.system_sql(sql).fetchall())definit_import_log(self,cnx,**kwargs):dataimport=cnx.create_entity('CWDataImport',cw_import_of=self,start_timestamp=datetime.utcnow(),**kwargs)dataimport.init()returndataimportclassDataFeedParser(AppObject):__registry__='parsers'def__init__(self,cnx,source,sourceuris=None,import_log=None,**kwargs):super(DataFeedParser,self).__init__(cnx,**kwargs)self.source=sourceself.sourceuris=sourceurisself.import_log=import_logself.stats={'created':set(),'updated':set(),'checked':set()}defnormalize_url(self,url):"""Normalize an url by looking if there is a replacement for it in `cubicweb.sobjects.URL_MAPPING`. This dictionary allow to redirect from one host to another, which may be useful for example in case of test instance using production data, while you don't want to load the external source nor to hack your `/etc/hosts` file. """# local import mandatory, it's available after registrationfromcubicweb.sobjectsimportURL_MAPPINGformappedurlinURL_MAPPING:ifurl.startswith(mappedurl):returnurl.replace(mappedurl,URL_MAPPING[mappedurl],1)returnurldefretrieve_url(self,url,data=None,headers=None):"""Return stream linked by the given url: * HTTP urls will be normalized (see :meth:`normalize_url`) * handle file:// URL * other will be considered as plain content, useful for testing purpose """ifheadersisNone:headers={}ifurl.startswith('http'):url=self.normalize_url(url)ifdata:self.source.info('POST %s%s',url,data)else:self.source.info('GET %s',url)req=urllib2.Request(url,data,headers)return_OPENER.open(req,timeout=self.source.http_timeout)ifurl.startswith('file://'):returnURLLibResponseAdapter(open(url[7:]),url)returnURLLibResponseAdapter(StringIO.StringIO(url),url)defadd_schema_config(self,schemacfg,checkonly=False):"""added CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defdel_schema_config(self,schemacfg,checkonly=False):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defextid2entity(self,uri,etype,**sourceparams):"""Return an entity for the given uri. May return None if it should be skipped. If a `raise_on_error` keyword parameter is passed, a ValidationError exception may be raised. """raise_on_error=sourceparams.pop('raise_on_error',False)cnx=self._cw# if cwsource is specified and repository has a source with the same# name, call extid2eid on that source so entity will be properly seen as# coming from this sourcesource_uri=sourceparams.pop('cwsource',None)ifsource_uriisnotNoneandsource_uri!='system':source=cnx.repo.sources_by_uri.get(source_uri,self.source)else:source=self.sourcesourceparams['parser']=selfifisinstance(uri,unicode):uri=uri.encode('utf-8')try:eid=cnx.repo.extid2eid(source,str(uri),etype,cnx,sourceparams=sourceparams)exceptValidationErrorasex:ifraise_on_error:raiseself.source.critical('error while creating %s: %s',etype,ex)self.import_log.record_error('error while creating %s: %s'%(etype,ex))returnNoneifeid<0:# entity has been moved away from its original source## Don't give etype to entity_from_eid so we get UnknownEid if the# entity has been removedtry:entity=cnx.entity_from_eid(-eid)exceptUnknownEid:returnNoneself.notify_updated(entity)# avoid later update from the source's datareturnentityifself.sourceurisisnotNone:self.sourceuris.pop(str(uri),None)returncnx.entity_from_eid(eid,etype)defprocess(self,url,raise_on_error=False):"""main callback: process the url"""raiseNotImplementedErrordefbefore_entity_copy(self,entity,sourceparams):raiseNotImplementedErrordefafter_entity_copy(self,entity,sourceparams):self.stats['created'].add(entity.eid)defcreated_during_pull(self,entity):returnentity.eidinself.stats['created']defupdated_during_pull(self,entity):returnentity.eidinself.stats['updated']defnotify_updated(self,entity):returnself.stats['updated'].add(entity.eid)defnotify_checked(self,entity):returnself.stats['checked'].add(entity.eid)defis_deleted(self,extid,etype,eid):"""return True if the entity of given external id, entity type and eid is actually deleted. Always return True by default, put more sensible stuff in sub-classes. """returnTruedefhandle_deletion(self,config,cnx,myuris):ifconfig['delete-entities']andmyuris:byetype={}forextid,(eid,etype)inmyuris.iteritems():ifself.is_deleted(extid,etype,eid):byetype.setdefault(etype,[]).append(str(eid))foretype,eidsinbyetype.iteritems():self.warning('delete %s%s entities',len(eids),etype)cnx.execute('DELETE %s X WHERE X eid IN (%s)'%(etype,','.join(eids)))cnx.commit()defupdate_if_necessary(self,entity,attrs):entity.complete(tuple(attrs))# check modification date and compare attribute values to only update# what's actually neededself.notify_checked(entity)mdate=attrs.get('modification_date')ifnotmdateormdate>entity.modification_date:attrs=dict((k,v)fork,vinattrs.iteritems()ifv!=getattr(entity,k))ifattrs:entity.cw_set(**attrs)self.notify_updated(entity)classDataFeedXMLParser(DataFeedParser):defprocess(self,url,raise_on_error=False):"""IDataFeedParser main entry point"""try:parsed=self.parse(url)exceptExceptionasex:ifraise_on_error:raiseself.import_log.record_error(str(ex))returnTrueerror=Falsecommit=self._cw.commitrollback=self._cw.rollbackforargsinparsed:try:self.process_item(*args,raise_on_error=raise_on_error)# commit+set_cnxset instead of commit(free_cnxset=False) to let# other a chance to get our connections setcommit()exceptValidationErrorasexc:ifraise_on_error:raiseself.source.error('Skipping %s because of validation error %s'%(args,exc))rollback()error=Truereturnerrordefparse(self,url):stream=self.retrieve_url(url)returnself.parse_etree(etree.parse(stream).getroot())defparse_etree(self,document):return[(document,)]defprocess_item(self,*args,**kwargs):raiseNotImplementedErrordefis_deleted(self,extid,etype,eid):ifextid.startswith('http'):try:_OPENER.open(self.normalize_url(extid),# XXX HTTP HEAD requesttimeout=self.source.http_timeout)excepturllib2.HTTPErrorasex:ifex.code==404:returnTrueelifextid.startswith('file://'):returnexists(extid[7:])returnFalseclassURLLibResponseAdapter(object):"""Thin wrapper to be used to fake a value returned by urllib2.urlopen"""def__init__(self,stream,url,code=200):self._stream=streamself._url=urlself.code=codedefread(self,*args):returnself._stream.read(*args)defgeturl(self):returnself._urldefgetcode(self):returnself.codedefinfo(self):frommimetoolsimportMessagereturnMessage(StringIO.StringIO())# use a cookie enabled opener to use session cookie if any_OPENER=urllib2.build_opener()try:fromlogilab.commonimporturllib2ext_OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())exceptImportError:# python-kerberos not availablepass_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))