[notification] avoid leaking cnxsets (closes #3243810)
When sending notifications, we get each recipient as either an email
address or a CWUser. In the latter case, we create a temporary session
for that user and use it to send the mail. However, if we later decided
to not send the mail after all, we'd leak the session and its cnxset.
Add a try block inside the loop to make sure the temporary sessions are
closed properly.
# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Source to query another RQL remote repository"""__docformat__="restructuredtext en"_=unicodefromos.pathimportjoinfrombase64importb64decodefromlogilab.common.configurationimportREQUIREDfromyams.schemaimportrole_namefromrql.nodesimportConstantfromrql.utilsimportrqlvar_makerfromcubicwebimportdbapi,serverfromcubicwebimportValidationError,BadConnectionId,UnknownEidfromcubicweb.schemaimportVIRTUAL_RTYPESfromcubicweb.server.sourcesimport(AbstractSource,ConnectionWrapper,TimedCache,dbg_st_search,dbg_results)fromcubicweb.server.msplannerimportneged_relationdefuidtype(union,col,etype,args):select,col=union.locate_subquery(col,etype,args)returngetattr(select.selection[col],'uidtype',None)classReplaceByInOperator(Exception):def__init__(self,eids):self.eids=eidsclassRemoteSource(AbstractSource):"""Generic external repository source"""# boolean telling if modification hooks should be called when something is# modified in this sourceshould_call_hooks=False# boolean telling if the repository should connect to this source during# migrationconnect_for_migration=Falseoptions=(('cubicweb-user',{'type':'string','default':REQUIRED,'help':'user to use for connection on the distant repository','group':'remote-source','level':0,}),('cubicweb-password',{'type':'password','default':'','help':'user to use for connection on the distant repository','group':'remote-source','level':0,}),('base-url',{'type':'string','default':'','help':'url of the web site for the distant repository, if you want ''to generate external link to entities from this repository','group':'remote-source','level':1,}),('skip-external-entities',{'type':'yn','default':False,'help':'should entities not local to the source be considered or not','group':'remote-source','level':0,}),('synchronization-interval',{'type':'time','default':'5min','help':'interval between synchronization with the external \repository (default to 5 minutes).','group':'remote-source','level':2,}))PUBLIC_KEYS=AbstractSource.PUBLIC_KEYS+('base-url',)_conn=Nonedef__init__(self,repo,source_config,eid=None):super(RemoteSource,self).__init__(repo,source_config,eid)self._query_cache=TimedCache(1800)defupdate_config(self,source_entity,processed_config):"""update configuration from source entity"""super(RemoteSource,self).update_config(source_entity,processed_config)baseurl=processed_config.get('base-url')ifbaseurlandnotbaseurl.endswith('/'):processed_config['base-url']+='/'self.config=processed_configself._skip_externals=processed_config['skip-external-entities']ifsource_entityisnotNone:self.latest_retrieval=source_entity.latest_retrievaldef_entity_update(self,source_entity):super(RemoteSource,self)._entity_update(source_entity)ifself.urlsandlen(self.urls)>1:raiseValidationError(source_entity.eid,{'url':_('can only have one url')})defget_connection(self):try:returnself._get_connection()exceptConnectionErrorasex:self.critical("can't get connection to source %s: %s",self.uri,ex)returnConnectionWrapper()def_get_connection(self):"""open and return a connection to the source"""self.info('connecting to source %s as user %s',self.urls[0],self.config['cubicweb-user'])# XXX check protocol according to source type (zmq / pyro)returndbapi.connect(self.urls[0],login=self.config['cubicweb-user'],password=self.config['cubicweb-password'])defreset_caches(self):"""method called during test to reset potential source caches"""self._query_cache=TimedCache(1800)definit(self,activated,source_entity):"""method called by the repository once ready to handle request"""super(RemoteSource,self).init(activated,source_entity)self.load_mapping(source_entity._cw)ifactivated:interval=self.config['synchronization-interval']self.repo.looping_task(interval,self.synchronize)self.repo.looping_task(self._query_cache.ttl.seconds/10,self._query_cache.clear_expired)self.latest_retrieval=source_entity.latest_retrievaldefload_mapping(self,session=None):self.support_entities={}self.support_relations={}self.dont_cross_relations=set(('owned_by','created_by'))self.cross_relations=set()assertself.eidisnotNoneself._schemacfg_idx={}self._load_mapping(session)etype_options=set(('write',))rtype_options=set(('maycross','dontcross','write',))def_check_options(self,schemacfg,allowedoptions):ifschemacfg.options:options=set(w.strip()forwinschemacfg.options.split(':'))else:options=set()ifoptions-allowedoptions:options=', '.join(sorted(options-allowedoptions))msg=_('unknown option(s): %s'%options)raiseValidationError(schemacfg.eid,{role_name('options','subject'):msg})returnoptionsdefadd_schema_config(self,schemacfg,checkonly=False):"""added CWSourceSchemaConfig, modify mapping accordingly"""try:ertype=schemacfg.schema.nameexceptAttributeError:msg=schemacfg._cw._("attribute/relation can't be mapped, only ""entity and relation types")raiseValidationError(schemacfg.eid,{role_name('cw_for_schema','subject'):msg})ifschemacfg.schema.__regid__=='CWEType':options=self._check_options(schemacfg,self.etype_options)ifnotcheckonly:self.support_entities[ertype]='write'inoptionselse:# CWRTypeifertypein('is','is_instance_of','cw_source')orertypeinVIRTUAL_RTYPES:msg=schemacfg._cw._('%s relation should not be in mapped')%ertyperaiseValidationError(schemacfg.eid,{role_name('cw_for_schema','subject'):msg})options=self._check_options(schemacfg,self.rtype_options)if'dontcross'inoptions:if'maycross'inoptions:msg=schemacfg._("can't mix dontcross and maycross options")raiseValidationError(schemacfg.eid,{role_name('options','subject'):msg})if'write'inoptions:msg=schemacfg._("can't mix dontcross and write options")raiseValidationError(schemacfg.eid,{role_name('options','subject'):msg})ifnotcheckonly:self.dont_cross_relations.add(ertype)elifnotcheckonly:self.support_relations[ertype]='write'inoptionsif'maycross'inoptions:self.cross_relations.add(ertype)ifnotcheckonly:# add to an index to ease deletion handlingself._schemacfg_idx[schemacfg.eid]=ertypedefdel_schema_config(self,schemacfg,checkonly=False):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""ifcheckonly:returntry:ertype=self._schemacfg_idx[schemacfg.eid]ifertype[0].isupper():delself.support_entities[ertype]else:ifertypeinself.support_relations:delself.support_relations[ertype]ifertypeinself.cross_relations:self.cross_relations.remove(ertype)else:self.dont_cross_relations.remove(ertype)exceptException:self.error('while updating mapping consequently to removal of %s',schemacfg)deflocal_eid(self,cnx,extid,session):etype,dexturi,dextid=cnx.describe(extid)ifdexturi=='system'ornot(dexturiinself.repo.sources_by_uriorself._skip_externals):assertetypeinself.support_entities,etypeeid=self.repo.extid2eid(self,str(extid),etype,session)ifeid>0:returneid,Trueelifdexturiinself.repo.sources_by_uri:source=self.repo.sources_by_uri[dexturi]cnx=session.cnxset.connection(source.uri)eid=source.local_eid(cnx,dextid,session)[0]returneid,FalsereturnNone,Nonedefsynchronize(self,mtime=None):"""synchronize content known by this repository with content in the external repository """self.info('synchronizing remote source %s',self.uri)cnx=self.get_connection()try:extrepo=cnx._repoexceptAttributeError:# fake connection wrapper returned when we can't connect to the# external source (hence we've no chance to synchronize...)returnetypes=list(self.support_entities)ifmtimeisNone:mtime=self.latest_retrievalupdatetime,modified,deleted=extrepo.entities_modified_since(etypes,mtime)self._query_cache.clear()repo=self.reposession=repo.internal_session()source=repo.system_sourcetry:foretype,extidinmodified:try:eid=self.local_eid(cnx,extid,session)[0]ifeidisnotNone:rset=session.eid_rset(eid,etype)entity=rset.get_entity(0,0)entity.complete(entity.e_schema.indexable_attributes())source.index_entity(session,entity)exceptException:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueforetype,extidindeleted:try:eid=self.repo.extid2eid(self,str(extid),etype,session,insert=False)# entity has been deleted from external repository but is not known hereifeidisnotNone:entity=session.entity_from_eid(eid,etype)repo.delete_info(session,entity,self.uri,scleanup=self.eid)exceptException:ifself.repo.config.mode=='test':raiseself.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueself.latest_retrieval=updatetimesession.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',{'x':self.eid,'date':self.latest_retrieval})session.commit()finally:session.close()defget_connection(self):raiseNotImplementedError()defcheck_connection(self,cnx):"""check connection validity, return None if the connection is still valid else a new connection """ifnotisinstance(cnx,ConnectionWrapper):try:cnx.check()return# okexceptBadConnectionId:pass# try to reconnectreturnself.get_connection()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):assertdbg_st_search(self.uri,union,varmap,args,cachekey)rqlkey=union.as_string(kwargs=args)try:results=self._query_cache[rqlkey]exceptKeyError:results=self._syntax_tree_search(session,union,args)self._query_cache[rqlkey]=resultsassertdbg_results(results)returnresultsdef_syntax_tree_search(self,session,union,args):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """ifnotargsisNone:args=args.copy()# get cached cursor anywaycu=session.cnxset[self.uri]ifcuisNone:# this is a ConnectionWrapper instancemsg=session._("can't connect to source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri,txdata=True)return[]translator=RQL2RQL(self)try:rql=translator.generate(session,union,args)exceptUnknownEidasex:ifserver.DEBUG:print' unknown eid',ex,'no results'return[]ifserver.DEBUG&server.DBG_RQL:print' translated rql',rqltry:rset=cu.execute(rql,args)exceptExceptionasex:self.exception(str(ex))msg=session._("error while querying source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri,txdata=True)return[]descr=rset.descriptionifrset:needtranslation=[]rows=rset.rowsfori,etypeinenumerate(descr[0]):if(etypeisNoneornotself.schema.eschema(etype).finaloruidtype(union,i,etype,args)):needtranslation.append(i)ifneedtranslation:cnx=session.cnxset.connection(self.uri)forrowindexinxrange(rset.rowcount-1,-1,-1):row=rows[rowindex]localrow=Falseforcolindexinneedtranslation:ifrow[colindex]isnotNone:# optional variableeid,local=self.local_eid(cnx,row[colindex],session)iflocal:localrow=TrueifeidisnotNone:row[colindex]=eidelse:# skip this rowdelrows[rowindex]deldescr[rowindex]breakelse:# skip row if it only contains eids of entities which# are actually from a source we also know locally,# except if some args specified (XXX should actually# check if there are some args local to the source)ifnot(translator.has_local_eidorlocalrow):delrows[rowindex]deldescr[rowindex]results=rowselse:results=[]returnresultsdef_entity_relations_and_kwargs(self,session,entity):relations=[]kwargs={'x':self.repo.eid2extid(self,entity.eid,session)}forkey,valinentity.cw_attr_cache.iteritems():relations.append('X %s%%(%s)s'%(key,key))kwargs[key]=valreturnrelations,kwargsdefadd_entity(self,session,entity):"""add a new entity to the source"""raiseNotImplementedError()defupdate_entity(self,session,entity):"""update an entity in the source"""relations,kwargs=self._entity_relations_and_kwargs(session,entity)cu=session.cnxset[self.uri]cu.execute('SET %s WHERE X eid %%(x)s'%','.join(relations),kwargs)self._query_cache.clear()entity.cw_clear_all_caches()defdelete_entity(self,session,entity):"""delete an entity from the source"""ifsession.deleted_in_transaction(self.eid):# source is being deleted, don't propagateself._query_cache.clear()returncu=session.cnxset[self.uri]cu.execute('DELETE %s X WHERE X eid %%(x)s'%entity.cw_etype,{'x':self.repo.eid2extid(self,entity.eid,session)})self._query_cache.clear()defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""cu=session.cnxset[self.uri]cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.repo.eid2extid(self,subject,session),'y':self.repo.eid2extid(self,object,session)})self._query_cache.clear()session.entity_from_eid(subject).cw_clear_all_caches()session.entity_from_eid(object).cw_clear_all_caches()defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""ifsession.deleted_in_transaction(self.eid):# source is being deleted, don't propagateself._query_cache.clear()returncu=session.cnxset[self.uri]cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.repo.eid2extid(self,subject,session),'y':self.repo.eid2extid(self,object,session)})self._query_cache.clear()session.entity_from_eid(subject).cw_clear_all_caches()session.entity_from_eid(object).cw_clear_all_caches()classRQL2RQL(object):"""translate a local rql query to be executed on a distant repository"""def__init__(self,source):self.source=sourceself.repo=source.repoself.current_operator=Nonedef_accept_children(self,node):res=[]forchildinnode.children:rql=child.accept(self)ifrqlisnotNone:res.append(rql)returnresdefgenerate(self,session,rqlst,args):self._session=sessionself.kwargs=argsself.need_translation=Falseself.has_local_eid=Falsereturnself.visit_union(rqlst)defvisit_union(self,node):s=self._accept_children(node)iflen(s)>1:return' UNION '.join('(%s)'%qforqins)returns[0]defvisit_select(self,node):"""return the tree as an encoded rql string"""self._varmaker=rqlvar_maker(defined=node.defined_vars.copy())self._const_var={}ifnode.distinct:base='DISTINCT Any'else:base='Any's=['%s%s'%(base,','.join(v.accept(self)forvinnode.selection))]ifnode.groupby:s.append('GROUPBY %s'%', '.join(group.accept(self)forgroupinnode.groupby))ifnode.orderby:s.append('ORDERBY %s'%', '.join(self.visit_sortterm(term)forterminnode.orderby))ifnode.limitisnotNone:s.append('LIMIT %s'%node.limit)ifnode.offset:s.append('OFFSET %s'%node.offset)restrictions=[]ifnode.whereisnotNone:nr=node.where.accept(self)ifnrisnotNone:restrictions.append(nr)ifrestrictions:s.append('WHERE %s'%','.join(restrictions))ifnode.having:s.append('HAVING %s'%', '.join(term.accept(self)forterminnode.having))subqueries=[]forsubqueryinnode.with_:subqueries.append('%s BEING (%s)'%(','.join(ca.nameforcainsubquery.aliases),self.visit_union(subquery.query)))ifsubqueries:s.append('WITH %s'%(','.join(subqueries)))return' '.join(s)defvisit_and(self,node):res=self._accept_children(node)ifres:return', '.join(res)returndefvisit_or(self,node):res=self._accept_children(node)iflen(res)>1:return' OR '.join('(%s)'%rqlforrqlinres)elifres:returnres[0]returndefvisit_not(self,node):rql=node.children[0].accept(self)ifrql:return'NOT (%s)'%rqlreturndefvisit_exists(self,node):rql=node.children[0].accept(self)ifrql:return'EXISTS(%s)'%rqlreturndefvisit_relation(self,node):try:ifisinstance(node.children[0],Constant):# simplified rqlst, reintroduce eid relationtry:restr,lhs=self.process_eid_const(node.children[0])exceptUnknownEid:# can safely skip not relation with an unsupported eidifneged_relation(node):returnraiseelse:lhs=node.children[0].accept(self)restr=NoneexceptUnknownEid:# can safely skip not relation with an unsupported eidifneged_relation(node):return# XXX what about optional relation or outer NOT EXISTS()raiseifnode.optionalin('left','both'):lhs+='?'ifnode.r_type=='eid'ornotself.source.schema.rschema(node.r_type).final:self.need_translation=Trueself.current_operator=node.operator()ifisinstance(node.children[0],Constant):self.current_etypes=(node.children[0].uidtype,)else:self.current_etypes=node.children[0].variable.stinfo['possibletypes']try:rhs=node.children[1].accept(self)exceptUnknownEid:# can safely skip not relation with an unsupported eidifneged_relation(node):return# XXX what about optional relation or outer NOT EXISTS()raiseexceptReplaceByInOperatorasex:rhs='IN (%s)'%','.join(eidforeidinex.eids)self.need_translation=Falseself.current_operator=Noneifnode.optionalin('right','both'):rhs+='?'ifrestrisnotNone:return'%s%s%s, %s'%(lhs,node.r_type,rhs,restr)return'%s%s%s'%(lhs,node.r_type,rhs)defvisit_comparison(self,node):ifnode.operatorin('=','IS'):returnnode.children[0].accept(self)return'%s%s'%(node.operator.encode(),node.children[0].accept(self))defvisit_mathexpression(self,node):return'(%s%s%s)'%(node.children[0].accept(self),node.operator.encode(),node.children[1].accept(self))defvisit_function(self,node):#if node.name == 'IN':res=[]forchildinnode.children:try:rql=child.accept(self)exceptUnknownEidasex:continueres.append(rql)ifnotres:raiseexreturn'%s(%s)'%(node.name,', '.join(res))defvisit_constant(self,node):ifself.need_translationornode.uidtype:ifnode.type=='Int':self.has_local_eid=Truereturnstr(self.eid2extid(node.value))ifnode.type=='Substitute':key=node.value# ensure we have not yet translated the value...ifnotkeyinself._const_var:self.kwargs[key]=self.eid2extid(self.kwargs[key])self._const_var[key]=Noneself.has_local_eid=Truereturnnode.as_string()defvisit_variableref(self,node):"""get the sql name for a variable reference"""returnnode.namedefvisit_sortterm(self,node):ifnode.asc:returnnode.term.accept(self)return'%s DESC'%node.term.accept(self)defprocess_eid_const(self,const):value=const.eval(self.kwargs)try:returnNone,self._const_var[value]exceptException:var=self._varmaker.next()self.need_translation=Truerestr='%s eid %s'%(var,self.visit_constant(const))self.need_translation=Falseself._const_var[value]=varreturnrestr,vardefeid2extid(self,eid):try:returnself.repo.eid2extid(self.source,eid,self._session)exceptUnknownEid:operator=self.current_operatorifoperatorisnotNoneandoperator!='=':# deal with query like "X eid > 12"## The problem is that eid order in the external source may# differ from the local source## So search for all eids from this source matching the condition# locally and then to replace the "> 12" branch by "IN (eids)"## XXX we may have to insert a huge number of eids...)sql="SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"etypes=','.join("'%s'"%etypeforetypeinself.current_etypes)cu=self._session.system_sql(sql%(self.source.uri,etypes,operator,eid))# XXX buggy cu.rowcount which may be zero while there are some# resultsrows=cu.fetchall()ifrows:raiseReplaceByInOperator((b64decode(r[0])forrinrows))raise