[js table sort] use lower cased tag names since HTML is case insensitive and XHTML is case sensitive and wants lower case tag names
# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>."""Source to query another RQL repository using pyro"""__docformat__="restructuredtext en"_=unicodeimportthreadingfromos.pathimportjoinfromtimeimportmktimefromdatetimeimportdatetimefrombase64importb64decodefromPyro.errorsimportPyroError,ConnectionClosedErrorfromlogilab.common.configurationimportREQUIREDfromlogilab.common.optik_extimportcheck_ynfromyams.schemaimportrole_namefromrql.nodesimportConstantfromrql.utilsimportrqlvar_makerfromcubicwebimportdbapi,serverfromcubicwebimportValidationError,BadConnectionId,UnknownEid,ConnectionErrorfromcubicweb.schemaimportVIRTUAL_RTYPESfromcubicweb.cwconfigimportregister_persistent_optionsfromcubicweb.server.sourcesimport(AbstractSource,ConnectionWrapper,TimedCache,dbg_st_search,dbg_results)fromcubicweb.server.msplannerimportneged_relationdefuidtype(union,col,etype,args):select,col=union.locate_subquery(col,etype,args)returngetattr(select.selection[col],'uidtype',None)classReplaceByInOperator(Exception):def__init__(self,eids):self.eids=eidsclassPyroRQLSource(AbstractSource):"""External repository source, using Pyro connection"""# boolean telling if modification hooks should be called when something is# modified in this sourceshould_call_hooks=False# boolean telling if the repository should connect to this source during# migrationconnect_for_migration=Falseoptions=(# XXX pyro-ns host/port('pyro-ns-id',{'type':'string','default':REQUIRED,'help':'identifier of the repository in the pyro name server','group':'pyro-source','level':0,}),('cubicweb-user',{'type':'string','default':REQUIRED,'help':'user to use for connection on the distant repository','group':'pyro-source','level':0,}),('cubicweb-password',{'type':'password','default':'','help':'user to use for connection on the distant repository','group':'pyro-source','level':0,}),('base-url',{'type':'string','default':'','help':'url of the web site for the distant repository, if you want ''to generate external link to entities from this repository','group':'pyro-source','level':1,}),('skip-external-entities',{'type':'yn','default':False,'help':'should entities not local to the source be considered or not','group':'pyro-source','level':0,}),('pyro-ns-host',{'type':'string','default':None,'help':'Pyro name server\'s host. If not set, default to the value \from all_in_one.conf. It may contains port information using <host>:<port> notation.','group':'pyro-source','level':1,}),('pyro-ns-group',{'type':'string','default':None,'help':'Pyro name server\'s group where the repository will be \registered. If not set, default to the value from all_in_one.conf.','group':'pyro-source','level':2,}),('synchronization-interval',{'type':'time','default':'5min','help':'interval between synchronization with the external \repository (default to 5 minutes).','group':'pyro-source','level':2,}),)PUBLIC_KEYS=AbstractSource.PUBLIC_KEYS+('base-url',)_conn=Nonedef__init__(self,repo,source_config,eid=None):AbstractSource.__init__(self,repo,source_config,eid)self.update_config(None,self.check_conf_dict(eid,source_config,fail_if_unknown=False))self._query_cache=TimedCache(1800)defupdate_config(self,source_entity,processed_config):"""update configuration from source entity"""# XXX get it through pyro if unsetbaseurl=processed_config.get('base-url')ifbaseurlandnotbaseurl.endswith('/'):processed_config['base-url']+='/'self.config=processed_configself._skip_externals=processed_config['skip-external-entities']ifsource_entityisnotNone:self.latest_retrieval=source_entity.latest_retrievaldefreset_caches(self):"""method called during test to reset potential source caches"""self._query_cache=TimedCache(1800)definit(self,activated,source_entity):"""method called by the repository once ready to handle request"""self.load_mapping(source_entity._cw)ifactivated:interval=self.config['synchronization-interval']self.repo.looping_task(interval,self.synchronize)self.repo.looping_task(self._query_cache.ttl.seconds/10,self._query_cache.clear_expired)self.latest_retrieval=source_entity.latest_retrievaldefload_mapping(self,session=None):self.support_entities={}self.support_relations={}self.dont_cross_relations=set(('owned_by','created_by'))self.cross_relations=set()assertself.eidisnotNoneself._schemacfg_idx={}self._load_mapping(session)etype_options=set(('write',))rtype_options=set(('maycross','dontcross','write',))def_check_options(self,schemacfg,allowedoptions):ifschemacfg.options:options=set(w.strip()forwinschemacfg.options.split(':'))else:options=set()ifoptions-allowedoptions:options=', '.join(sorted(options-allowedoptions))msg=_('unknown option(s): %s'%options)raiseValidationError(schemacfg.eid,{role_name('options','subject'):msg})returnoptionsdefadd_schema_config(self,schemacfg,checkonly=False):"""added CWSourceSchemaConfig, modify mapping accordingly"""try:ertype=schemacfg.schema.nameexceptAttributeError:msg=schemacfg._cw._("attribute/relation can't be mapped, only ""entity and relation types")raiseValidationError(schemacfg.eid,{role_name('cw_for_schema','subject'):msg})ifschemacfg.schema.__regid__=='CWEType':options=self._check_options(schemacfg,self.etype_options)ifnotcheckonly:self.support_entities[ertype]='write'inoptionselse:# CWRTypeifertypein('is','is_instance_of','cw_source')orertypeinVIRTUAL_RTYPES:msg=schemacfg._cw._('%s relation should not be in mapped')%ertyperaiseValidationError(schemacfg.eid,{role_name('cw_for_schema','subject'):msg})options=self._check_options(schemacfg,self.rtype_options)if'dontcross'inoptions:if'maycross'inoptions:msg=schemacfg._("can't mix dontcross and maycross options")raiseValidationError(schemacfg.eid,{role_name('options','subject'):msg})if'write'inoptions:msg=schemacfg._("can't mix dontcross and write options")raiseValidationError(schemacfg.eid,{role_name('options','subject'):msg})ifnotcheckonly:self.dont_cross_relations.add(ertype)elifnotcheckonly:self.support_relations[ertype]='write'inoptionsif'maycross'inoptions:self.cross_relations.add(ertype)ifnotcheckonly:# add to an index to ease deletion handlingself._schemacfg_idx[schemacfg.eid]=ertypedefdel_schema_config(self,schemacfg,checkonly=False):"""deleted CWSourceSchemaConfig, modify mapping accordingly"""ifcheckonly:returntry:ertype=self._schemacfg_idx[schemacfg.eid]ifertype[0].isupper():delself.support_entities[ertype]else:ifertypeinself.support_relations:delself.support_relations[ertype]ifertypeinself.cross_relations:self.cross_relations.remove(ertype)else:self.dont_cross_relations.remove(ertype)exceptException:self.error('while updating mapping consequently to removal of %s',schemacfg)deflocal_eid(self,cnx,extid,session):etype,dexturi,dextid=cnx.describe(extid)ifdexturi=='system'ornot(dexturiinself.repo.sources_by_uriorself._skip_externals):assertetypeinself.support_entities,etypeeid=self.repo.extid2eid(self,str(extid),etype,session)ifeid>0:returneid,Trueelifdexturiinself.repo.sources_by_uri:source=self.repo.sources_by_uri[dexturi]cnx=session.cnxset.connection(source.uri)eid=source.local_eid(cnx,dextid,session)[0]returneid,FalsereturnNone,Nonedefsynchronize(self,mtime=None):"""synchronize content known by this repository with content in the external repository """self.info('synchronizing pyro source %s',self.uri)cnx=self.get_connection()try:extrepo=cnx._repoexceptAttributeError:# fake connection wrapper returned when we can't connect to the# external source (hence we've no chance to synchronize...)returnetypes=self.support_entities.keys()ifmtimeisNone:mtime=self.latest_retrievalupdatetime,modified,deleted=extrepo.entities_modified_since(etypes,mtime)self._query_cache.clear()repo=self.reposession=repo.internal_session()source=repo.system_sourcetry:foretype,extidinmodified:try:eid=self.local_eid(cnx,extid,session)[0]ifeidisnotNone:rset=session.eid_rset(eid,etype)entity=rset.get_entity(0,0)entity.complete(entity.e_schema.indexable_attributes())source.index_entity(session,entity)exceptException:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueforetype,extidindeleted:try:eid=self.repo.extid2eid(self,str(extid),etype,session,insert=False)# entity has been deleted from external repository but is not known hereifeidisnotNone:entity=session.entity_from_eid(eid,etype)repo.delete_info(session,entity,self.uri,scleanup=self.eid)exceptException:ifself.repo.config.mode=='test':raiseself.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueself.latest_retrieval=updatetimesession.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',{'x':self.eid,'date':self.latest_retrieval})session.commit()finally:session.close()def_get_connection(self):"""open and return a connection to the source"""nshost=self.config.get('pyro-ns-host')orself.repo.config['pyro-ns-host']nsgroup=self.config.get('pyro-ns-group')orself.repo.config['pyro-ns-group']self.info('connecting to instance :%s.%s for user %s',nsgroup,self.config['pyro-ns-id'],self.config['cubicweb-user'])#cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])returndbapi.connect(database=self.config['pyro-ns-id'],login=self.config['cubicweb-user'],password=self.config['cubicweb-password'],host=nshost,group=nsgroup,setvreg=False)#cnxprops=cnxprops)defget_connection(self):try:returnself._get_connection()except(ConnectionError,PyroError),ex:self.critical("can't get connection to source %s: %s",self.uri,ex)returnConnectionWrapper()defcheck_connection(self,cnx):"""check connection validity, return None if the connection is still valid else a new connection """# we have to transfer manually thread ownership. This can be done safely# since the connections set holding the connection is affected to one# session/thread and can't be called simultaneouslytry:cnx._repo._transferThread(threading.currentThread())exceptAttributeError:# inmemory connectionpassifnotisinstance(cnx,ConnectionWrapper):try:cnx.check()return# okexcept(BadConnectionId,ConnectionClosedError):pass# try to reconnectreturnself.get_connection()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):assertdbg_st_search(self.uri,union,varmap,args,cachekey)rqlkey=union.as_string(kwargs=args)try:results=self._query_cache[rqlkey]exceptKeyError:results=self._syntax_tree_search(session,union,args)self._query_cache[rqlkey]=resultsassertdbg_results(results)returnresultsdef_syntax_tree_search(self,session,union,args):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """ifnotargsisNone:args=args.copy()# get cached cursor anywaycu=session.cnxset[self.uri]ifcuisNone:# this is a ConnectionWrapper instancemsg=session._("can't connect to source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]translator=RQL2RQL(self)try:rql=translator.generate(session,union,args)exceptUnknownEid,ex:ifserver.DEBUG:print' unknown eid',ex,'no results'return[]ifserver.DEBUG&server.DBG_RQL:print' translated rql',rqltry:rset=cu.execute(rql,args)exceptException,ex:self.exception(str(ex))msg=session._("error while querying source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]descr=rset.descriptionifrset:needtranslation=[]rows=rset.rowsfori,etypeinenumerate(descr[0]):if(etypeisNoneornotself.schema.eschema(etype).finaloruidtype(union,i,etype,args)):needtranslation.append(i)ifneedtranslation:cnx=session.cnxset.connection(self.uri)forrowindexinxrange(rset.rowcount-1,-1,-1):row=rows[rowindex]localrow=Falseforcolindexinneedtranslation:ifrow[colindex]isnotNone:# optional variableeid,local=self.local_eid(cnx,row[colindex],session)iflocal:localrow=TrueifeidisnotNone:row[colindex]=eidelse:# skip this rowdelrows[rowindex]deldescr[rowindex]breakelse:# skip row if it only contains eids of entities which# are actually from a source we also know locally,# except if some args specified (XXX should actually# check if there are some args local to the source)ifnot(translator.has_local_eidorlocalrow):delrows[rowindex]deldescr[rowindex]results=rowselse:results=[]returnresultsdef_entity_relations_and_kwargs(self,session,entity):relations=[]kwargs={'x':self.repo.eid2extid(self,entity.eid,session)}forkey,valinentity.cw_attr_cache.iteritems():relations.append('X %s%%(%s)s'%(key,key))kwargs[key]=valreturnrelations,kwargsdefadd_entity(self,session,entity):"""add a new entity to the source"""raiseNotImplementedError()defupdate_entity(self,session,entity):"""update an entity in the source"""relations,kwargs=self._entity_relations_and_kwargs(session,entity)cu=session.cnxset[self.uri]cu.execute('SET %s WHERE X eid %%(x)s'%','.join(relations),kwargs)self._query_cache.clear()entity.cw_clear_all_caches()defdelete_entity(self,session,entity):"""delete an entity from the source"""ifsession.deleted_in_transaction(self.eid):# source is being deleted, don't propagateself._query_cache.clear()returncu=session.cnxset[self.uri]cu.execute('DELETE %s X WHERE X eid %%(x)s'%entity.__regid__,{'x':self.repo.eid2extid(self,entity.eid,session)})self._query_cache.clear()defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""cu=session.cnxset[self.uri]cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.repo.eid2extid(self,subject,session),'y':self.repo.eid2extid(self,object,session)})self._query_cache.clear()session.entity_from_eid(subject).cw_clear_all_caches()session.entity_from_eid(object).cw_clear_all_caches()defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""ifsession.deleted_in_transaction(self.eid):# source is being deleted, don't propagateself._query_cache.clear()returncu=session.cnxset[self.uri]cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.repo.eid2extid(self,subject,session),'y':self.repo.eid2extid(self,object,session)})self._query_cache.clear()session.entity_from_eid(subject).cw_clear_all_caches()session.entity_from_eid(object).cw_clear_all_caches()classRQL2RQL(object):"""translate a local rql query to be executed on a distant repository"""def__init__(self,source):self.source=sourceself.repo=source.repoself.current_operator=Nonedef_accept_children(self,node):res=[]forchildinnode.children:rql=child.accept(self)ifrqlisnotNone:res.append(rql)returnresdefgenerate(self,session,rqlst,args):self._session=sessionself.kwargs=argsself.need_translation=Falseself.has_local_eid=Falsereturnself.visit_union(rqlst)defvisit_union(self,node):s=self._accept_children(node)iflen(s)>1:return' UNION '.join('(%s)'%qforqins)returns[0]defvisit_select(self,node):"""return the tree as an encoded rql string"""self._varmaker=rqlvar_maker(defined=node.defined_vars.copy())self._const_var={}ifnode.distinct:base='DISTINCT Any'else:base='Any's=['%s%s'%(base,','.join(v.accept(self)forvinnode.selection))]ifnode.groupby:s.append('GROUPBY %s'%', '.join(group.accept(self)forgroupinnode.groupby))ifnode.orderby:s.append('ORDERBY %s'%', '.join(self.visit_sortterm(term)forterminnode.orderby))ifnode.limitisnotNone:s.append('LIMIT %s'%node.limit)ifnode.offset:s.append('OFFSET %s'%node.offset)restrictions=[]ifnode.whereisnotNone:nr=node.where.accept(self)ifnrisnotNone:restrictions.append(nr)ifrestrictions:s.append('WHERE %s'%','.join(restrictions))ifnode.having:s.append('HAVING %s'%', '.join(term.accept(self)forterminnode.having))subqueries=[]forsubqueryinnode.with_:subqueries.append('%s BEING (%s)'%(','.join(ca.nameforcainsubquery.aliases),self.visit_union(subquery.query)))ifsubqueries:s.append('WITH %s'%(','.join(subqueries)))return' '.join(s)defvisit_and(self,node):res=self._accept_children(node)ifres:return', '.join(res)returndefvisit_or(self,node):res=self._accept_children(node)iflen(res)>1:return' OR '.join('(%s)'%rqlforrqlinres)elifres:returnres[0]returndefvisit_not(self,node):rql=node.children[0].accept(self)ifrql:return'NOT (%s)'%rqlreturndefvisit_exists(self,node):rql=node.children[0].accept(self)ifrql:return'EXISTS(%s)'%rqlreturndefvisit_relation(self,node):try:ifisinstance(node.children[0],Constant):# simplified rqlst, reintroduce eid relationtry:restr,lhs=self.process_eid_const(node.children[0])exceptUnknownEid:# can safely skip not relation with an unsupported eidifneged_relation(node):returnraiseelse:lhs=node.children[0].accept(self)restr=NoneexceptUnknownEid:# can safely skip not relation with an unsupported eidifneged_relation(node):return# XXX what about optional relation or outer NOT EXISTS()raiseifnode.optionalin('left','both'):lhs+='?'ifnode.r_type=='eid'ornotself.source.schema.rschema(node.r_type).final:self.need_translation=Trueself.current_operator=node.operator()ifisinstance(node.children[0],Constant):self.current_etypes=(node.children[0].uidtype,)else:self.current_etypes=node.children[0].variable.stinfo['possibletypes']try:rhs=node.children[1].accept(self)exceptUnknownEid:# can safely skip not relation with an unsupported eidifneged_relation(node):return# XXX what about optional relation or outer NOT EXISTS()raiseexceptReplaceByInOperator,ex:rhs='IN (%s)'%','.join(eidforeidinex.eids)self.need_translation=Falseself.current_operator=Noneifnode.optionalin('right','both'):rhs+='?'ifrestrisnotNone:return'%s%s%s, %s'%(lhs,node.r_type,rhs,restr)return'%s%s%s'%(lhs,node.r_type,rhs)defvisit_comparison(self,node):ifnode.operatorin('=','IS'):returnnode.children[0].accept(self)return'%s%s'%(node.operator.encode(),node.children[0].accept(self))defvisit_mathexpression(self,node):return'(%s%s%s)'%(node.children[0].accept(self),node.operator.encode(),node.children[1].accept(self))defvisit_function(self,node):#if node.name == 'IN':res=[]forchildinnode.children:try:rql=child.accept(self)exceptUnknownEid,ex:continueres.append(rql)ifnotres:raiseexreturn'%s(%s)'%(node.name,', '.join(res))defvisit_constant(self,node):ifself.need_translationornode.uidtype:ifnode.type=='Int':self.has_local_eid=Truereturnstr(self.eid2extid(node.value))ifnode.type=='Substitute':key=node.value# ensure we have not yet translated the value...ifnotkeyinself._const_var:self.kwargs[key]=self.eid2extid(self.kwargs[key])self._const_var[key]=Noneself.has_local_eid=Truereturnnode.as_string()defvisit_variableref(self,node):"""get the sql name for a variable reference"""returnnode.namedefvisit_sortterm(self,node):ifnode.asc:returnnode.term.accept(self)return'%s DESC'%node.term.accept(self)defprocess_eid_const(self,const):value=const.eval(self.kwargs)try:returnNone,self._const_var[value]exceptException:var=self._varmaker.next()self.need_translation=Truerestr='%s eid %s'%(var,self.visit_constant(const))self.need_translation=Falseself._const_var[value]=varreturnrestr,vardefeid2extid(self,eid):try:returnself.repo.eid2extid(self.source,eid,self._session)exceptUnknownEid:operator=self.current_operatorifoperatorisnotNoneandoperator!='=':# deal with query like "X eid > 12"## The problem is that eid order in the external source may# differ from the local source## So search for all eids from this source matching the condition# locally and then to replace the "> 12" branch by "IN (eids)"## XXX we may have to insert a huge number of eids...)sql="SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"etypes=','.join("'%s'"%etypeforetypeinself.current_etypes)cu=self._session.system_sql(sql%(self.source.uri,etypes,operator,eid))# XXX buggy cu.rowcount which may be zero while there are some# resultsrows=cu.fetchall()ifrows:raiseReplaceByInOperator((b64decode(r[0])forrinrows))raise