repaire cctl db-init -d on sqlserver (closes #1979670)
requires an updated yams
# copyright 2010-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""datafeed sources: copy data from an external data stream into the systemdatabase"""from__future__importwith_statementimporturllib2importStringIOfromdatetimeimportdatetime,timedeltafrombase64importb64decodefromcookielibimportCookieJarfromlxmlimportetreefromcubicwebimportRegistryNotFound,ObjectNotFound,ValidationError,UnknownEidfromcubicweb.server.sourcesimportAbstractSourcefromcubicweb.appobjectimportAppObjectclassDataFeedSource(AbstractSource):copy_based_source=Trueuse_cwuri_as_url=Trueoptions=(('synchronize',{'type':'yn','default':True,'help':('Is the repository responsible to automatically import ''content from this source? ''You should say yes unless you don\'t want this behaviour ''or if you use a multiple repositories setup, in which ''case you should say yes on one repository, no on others.'),'group':'datafeed-source','level':2,}),('synchronization-interval',{'type':'time','default':'5min','help':('Interval in seconds between synchronization with the ''external source (default to 5 minutes, must be >= 1 min).'),'group':'datafeed-source','level':2,}),('delete-entities',{'type':'yn','default':True,'help':('Should already imported entities not found anymore on the ''external source be deleted?'),'group':'datafeed-source','level':2,}),)def__init__(self,repo,source_config,eid=None):AbstractSource.__init__(self,repo,source_config,eid)self.update_config(None,self.check_conf_dict(eid,source_config,fail_if_unknown=False))defcheck_config(self,source_entity):"""check configuration of source entity"""typedconfig=super(DataFeedSource,self).check_config(source_entity)iftypedconfig['synchronization-interval']<60:_=source_entity._cw._msg=_('synchronization-interval must be greater than 1 minute')raiseValidationError(source_entity.eid,{'config':msg})returntypedconfigdef_entity_update(self,source_entity):source_entity.complete()self.parser_id=source_entity.parserself.latest_retrieval=source_entity.latest_retrievalself.urls=[url.strip()forurlinsource_entity.url.splitlines()ifurl.strip()]defupdate_config(self,source_entity,typedconfig):"""update configuration from source entity. `typedconfig` is config properly typed with defaults set """self.synchro_interval=timedelta(seconds=typedconfig['synchronization-interval'])ifsource_entityisnotNone:self._entity_update(source_entity)self.config=typedconfigdefinit(self,activated,source_entity):ifactivated:self._entity_update(source_entity)self.parser_id=source_entity.parserself.load_mapping(source_entity._cw)def_get_parser(self,session,**kwargs):returnself.repo.vreg['parsers'].select(self.parser_id,session,source=self,**kwargs)defload_mapping(self,session):self.mapping={}self.mapping_idx={}try:parser=self._get_parser(session)except(RegistryNotFound,ObjectNotFound):return# no parser yet, don't go furtherself._load_mapping(session,parser=parser)defadd_schema_config(self,schemacfg,checkonly=False,parser=None):"""added CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.add_schema_config(schemacfg,checkonly)defdel_schema_config(self,schemacfg,checkonly=False,parser=None):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""ifparserisNone:parser=self._get_parser(schemacfg._cw)parser.del_schema_config(schemacfg,checkonly)deffresh(self):ifself.latest_retrievalisNone:returnFalsereturndatetime.utcnow()<(self.latest_retrieval+self.synchro_interval)defupdate_latest_retrieval(self,session):self.latest_retrieval=datetime.utcnow()session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',{'x':self.eid,'date':self.latest_retrieval})defacquire_synchronization_lock(self,session):# XXX race condition until WHERE of SET queries is executed using# 'SELECT FOR UPDATE'ifnotsession.execute('SET X synchronizing TRUE WHERE X eid %(x)s, X synchronizing FALSE',{'x':self.eid}):self.error('concurrent synchronization detected, skip pull')session.commit(free_cnxset=False)returnFalsesession.commit(free_cnxset=False)returnTruedefrelease_synchronization_lock(self,session):session.set_cnxset()session.execute('SET X synchronizing FALSE WHERE X eid %(x)s',{'x':self.eid})session.commit()defpull_data(self,session,force=False,raise_on_error=False):"""Launch synchronization of the source if needed. This method is responsible to handle commit/rollback on the given session. """ifnotforceandself.fresh():return{}ifnotself.acquire_synchronization_lock(session):return{}try:withsession.transaction(free_cnxset=False):returnself._pull_data(session,force,raise_on_error)finally:self.release_synchronization_lock(session)def_pull_data(self,session,force=False,raise_on_error=False):ifself.config['delete-entities']:myuris=self.source_cwuris(session)else:myuris=Noneparser=self._get_parser(session,sourceuris=myuris)ifself.process_urls(parser,self.urls,raise_on_error):self.warning("some error occured, don't attempt to delete entities")elifself.config['delete-entities']andmyuris:byetype={}foreid,etypeinmyuris.values():byetype.setdefault(etype,[]).append(str(eid))self.error('delete %s entities %s',self.uri,byetype)foretype,eidsinbyetype.iteritems():session.execute('DELETE %s X WHERE X eid IN (%s)'%(etype,','.join(eids)))self.update_latest_retrieval(session)returnparser.statsdefprocess_urls(self,parser,urls,raise_on_error=False):error=Falseforurlinurls:self.info('pulling data from %s',url)try:ifparser.process(url,raise_on_error):error=TrueexceptIOError,exc:ifraise_on_error:raiseself.error('could not pull data while processing %s: %s',url,exc)error=TrueexceptException,exc:ifraise_on_error:raiseself.exception('error while processing %s: %s',url,exc)error=Truereturnerrordefbefore_entity_insertion(self,session,lid,etype,eid,sourceparams):"""called by the repository when an eid has been attributed for an entity stored here but the entity has not been inserted in the system table yet. This method must return the an Entity instance representation of this entity. """entity=super(DataFeedSource,self).before_entity_insertion(session,lid,etype,eid,sourceparams)entity.cw_edited['cwuri']=lid.decode('utf-8')entity.cw_edited.set_defaults()sourceparams['parser'].before_entity_copy(entity,sourceparams)returnentitydefafter_entity_insertion(self,session,lid,entity,sourceparams):"""called by the repository after an entity stored here has been inserted in the system table. """ifsession.is_hook_category_activated('integrity'):entity.cw_edited.check(creation=True)self.repo.system_source.add_entity(session,entity)entity.cw_edited.saved=entity._cw_is_saved=Truesourceparams['parser'].after_entity_copy(entity,sourceparams)defsource_cwuris(self,session):sql=('SELECT extid, eid, type FROM entities, cw_source_relation ''WHERE entities.eid=cw_source_relation.eid_from ''AND cw_source_relation.eid_to=%s'%self.eid)returndict((b64decode(uri),(eid,type))foruri,eid,typeinsession.system_sql(sql))classDataFeedParser(AppObject):__registry__='parsers'def__init__(self,session,source,sourceuris=None,**kwargs):super(DataFeedParser,self).__init__(session,**kwargs)self.source=sourceself.sourceuris=sourceurisself.stats={'created':set(),'updated':set()}defadd_schema_config(self,schemacfg,checkonly=False):"""added CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defdel_schema_config(self,schemacfg,checkonly=False):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""msg=schemacfg._cw._("this parser doesn't use a mapping")raiseValidationError(schemacfg.eid,{None:msg})defextid2entity(self,uri,etype,**sourceparams):"""return an entity for the given uri. May return None if it should be skipped """session=self._cw# if cwsource is specified and repository has a source with the same# name, call extid2eid on that source so entity will be properly seen as# coming from this sourcesource_uri=sourceparams.pop('cwsource',None)ifsource_uriisnotNoneandsource_uri!='system':source=session.repo.sources_by_uri.get(source_uri,self.source)else:source=self.sourcesourceparams['parser']=selfifisinstance(uri,unicode):uri=uri.encode('utf-8')try:eid=session.repo.extid2eid(source,str(uri),etype,session,complete=False,commit=False,sourceparams=sourceparams)exceptValidationError,ex:self.source.error('error while creating %s: %s',etype,ex)returnNoneifeid<0:# entity has been moved away from its original source## Don't give etype to entity_from_eid so we get UnknownEid if the# entity has been removedtry:entity=session.entity_from_eid(-eid)exceptUnknownEid:returnNoneself.notify_updated(entity)# avoid later update from the source's datareturnentityifself.sourceurisisnotNone:self.sourceuris.pop(str(uri),None)returnsession.entity_from_eid(eid,etype)defprocess(self,url,partialcommit=True):"""main callback: process the url"""raiseNotImplementedErrordefbefore_entity_copy(self,entity,sourceparams):raiseNotImplementedErrordefafter_entity_copy(self,entity,sourceparams):self.stats['created'].add(entity.eid)defcreated_during_pull(self,entity):returnentity.eidinself.stats['created']defupdated_during_pull(self,entity):returnentity.eidinself.stats['updated']defnotify_updated(self,entity):returnself.stats['updated'].add(entity.eid)classDataFeedXMLParser(DataFeedParser):defprocess(self,url,raise_on_error=False,partialcommit=True):"""IDataFeedParser main entry point"""try:parsed=self.parse(url)exceptException,ex:ifraise_on_error:raiseself.source.error(str(ex))returnTrueerror=Falseforargsinparsed:try:self.process_item(*args)ifpartialcommit:# commit+set_cnxset instead of commit(free_cnxset=False) to let# other a chance to get our connections setself._cw.commit()self._cw.set_cnxset()exceptValidationError,exc:ifraise_on_error:raiseifpartialcommit:self.source.error('Skipping %s because of validation error %s'%(args,exc))self._cw.rollback()self._cw.set_cnxset()error=Trueelse:raisereturnerrordefparse(self,url):ifurl.startswith('http'):fromcubicweb.sobjects.parsersimportURL_MAPPINGformappedurlinURL_MAPPING:ifurl.startswith(mappedurl):url=url.replace(mappedurl,URL_MAPPING[mappedurl],1)breakself.source.info('GET %s',url)stream=_OPENER.open(url)elifurl.startswith('file://'):stream=open(url[7:])else:stream=StringIO.StringIO(url)returnself.parse_etree(etree.parse(stream).getroot())defparse_etree(self,document):return[(document,)]defprocess_item(self,*args):raiseNotImplementedError# use a cookie enabled opener to use session cookie if any_OPENER=urllib2.build_opener()try:fromlogilab.commonimporturllib2ext_OPENER.add_handler(urllib2ext.HTTPGssapiAuthHandler())exceptImportError:# python-kerberos not availablepass_OPENER.add_handler(urllib2.HTTPCookieProcessor(CookieJar()))