pyrorql source now ignore external eids which are themselves coming from another external source already in use by the repository (should have the same uri)
"""Source to query another RQL repository using pyro:organization: Logilab:copyright: 2007-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"importthreadingfromos.pathimportjoinfrommx.DateTimeimportDateTimeFromTicksfromPyro.errorsimportPyroError,ConnectionClosedErrorfromlogilab.common.configurationimportREQUIREDfromrql.nodesimportConstantfromrql.utilsimportrqlvar_makerfromcubicwebimportdbapi,serverfromcubicwebimportBadConnectionId,UnknownEid,ConnectionErrorfromcubicweb.cwconfigimportregister_persistent_optionsfromcubicweb.server.sourcesimportAbstractSource,ConnectionWrapperclassReplaceByInOperator:def__init__(self,eids):self.eids=eidsclassPyroRQLSource(AbstractSource):"""External repository source, using Pyro connection"""# boolean telling if modification hooks should be called when something is# modified in this sourceshould_call_hooks=False# boolean telling if the repository should connect to this source during# migrationconnect_for_migration=Falsesupport_entities=Noneoptions=(# XXX pyro-ns host/port('pyro-ns-id',{'type':'string','default':REQUIRED,'help':'identifier of the repository in the pyro name server','group':'pyro-source','inputlevel':0,}),('mapping-file',{'type':'string','default':REQUIRED,'help':'path to a python file with the schema mapping definition','group':'pyro-source','inputlevel':1,}),('cubicweb-user',{'type':'string','default':REQUIRED,'help':'user to use for connection on the distant repository','group':'pyro-source','inputlevel':0,}),('cubicweb-password',{'type':'password','default':'','help':'user to use for connection on the distant repository','group':'pyro-source','inputlevel':0,}),('base-url',{'type':'string','default':'','help':'url of the web site for the distant repository, if you want ''to generate external link to entities from this repository','group':'pyro-source','inputlevel':1,}),('pyro-ns-host',{'type':'string','default':None,'help':'Pyro name server\'s host. If not set, default to the value \from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('pyro-ns-port',{'type':'int','default':None,'help':'Pyro name server\'s listening port. If not set, default to \the value from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('pyro-ns-group',{'type':'string','default':None,'help':'Pyro name server\'s group where the repository will be \registered. If not set, default to the value from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('synchronization-interval',{'type':'int','default':5*60,'help':'interval between synchronization with the external \repository (default to 5 minutes).','group':'pyro-source','inputlevel':2,}),)PUBLIC_KEYS=AbstractSource.PUBLIC_KEYS+('base-url',)_conn=Nonedef__init__(self,repo,appschema,source_config,*args,**kwargs):AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)mappingfile=source_config['mapping-file']ifnotmappingfile[0]=='/':mappingfile=join(repo.config.apphome,mappingfile)mapping={}execfile(mappingfile,mapping)self.support_entities=mapping['support_entities']self.support_relations=mapping.get('support_relations',{})self.dont_cross_relations=mapping.get('dont_cross_relations',())self.cross_relations=mapping.get('cross_relations',())baseurl=source_config.get('base-url')ifbaseurlandnotbaseurl.endswith('/'):source_config['base-url']+='/'self.config=source_configmyoptions=(('%s.latest-update-time'%self.uri,{'type':'int','sitewide':True,'default':0,'help':_('timestamp of the latest source synchronization.'),'group':'sources',}),)register_persistent_options(myoptions)deflast_update_time(self):pkey=u'sources.%s.latest-update-time'%self.urirql='Any V WHERE X is EProperty, X value V, X pkey %(k)s'session=self.repo.internal_session()try:rset=session.execute(rql,{'k':pkey})ifnotrset:# insert itsession.execute('INSERT EProperty X: X pkey %(k)s, X value %(v)s',{'k':pkey,'v':u'0'})session.commit()timestamp=0else:assertlen(rset)==1timestamp=int(rset[0][0])returnDateTimeFromTicks(timestamp)finally:session.close()definit(self):"""method called by the repository once ready to handle request"""interval=int(self.config.get('synchronization-interval',5*60))self.repo.looping_task(interval,self.synchronize)defsynchronize(self,mtime=None):"""synchronize content known by this repository with content in the external repository """self.info('synchronizing pyro source %s',self.uri)cnx=self.get_connection()extrepo=cnx._repoetypes=self.support_entities.keys()ifmtimeisNone:mtime=self.last_update_time()updatetime,modified,deleted=extrepo.entities_modified_since(etypes,mtime)repo=self.reposession=repo.internal_session()try:foretype,extidinmodified:try:exturi=cnx.describe(extid)[1]ifexturi=='system'ornotexturiinrepo.sources_by_uri:eid=self.extid2eid(extid,etype,session)rset=session.eid_rset(eid,etype)entity=rset.get_entity(0,0)entity.complete(entity.e_schema.indexable_attributes())repo.index_entity(session,entity)except:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueforetype,extidindeleted:try:eid=self.extid2eid(extid,etype,session,insert=False)# entity has been deleted from external repository but is not known hereifeidisnotNone:repo.delete_info(session,eid)except:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continuesession.execute('SET X value %(v)s WHERE X pkey %(k)s',{'k':u'sources.%s.latest-update-time'%self.uri,'v':unicode(int(updatetime.ticks()))})session.commit()finally:session.close()def_get_connection(self):"""open and return a connection to the source"""nshost=self.config.get('pyro-ns-host')orself.repo.config['pyro-ns-host']nsport=self.config.get('pyro-ns-port')orself.repo.config['pyro-ns-port']nsgroup=self.config.get('pyro-ns-group')orself.repo.config['pyro-ns-group']#cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])returndbapi.connect(database=self.config['pyro-ns-id'],user=self.config['cubicweb-user'],password=self.config['cubicweb-password'],host=nshost,port=nsport,group=nsgroup,setvreg=False)#cnxprops=cnxprops)defget_connection(self):try:returnself._get_connection()except(ConnectionError,PyroError):self.critical("can't get connection to source %s",self.uri,exc_info=1)returnConnectionWrapper()defcheck_connection(self,cnx):"""check connection validity, return None if the connection is still valid else a new connection """# we have to transfer manually thread ownership. This can be done safely# since the pool to which belong the connection is affected to one# session/thread and can't be called simultaneouslytry:cnx._repo._transferThread(threading.currentThread())exceptAttributeError:# inmemory connectionpassifnotisinstance(cnx,ConnectionWrapper):try:cnx.check()return# okexcept(BadConnectionId,ConnectionClosedError):pass# try to reconnectreturnself.get_connection()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """ifnotargsisNone:args=args.copy()ifserver.DEBUG:print'RQL FOR PYRO SOURCE',self.uriprintunion.as_string()ifargs:print'ARGS',argsprint'SOLUTIONS',','.join(str(s.solutions)forsinunion.children)# get cached cursor anywaycu=session.pool[self.uri]ifcuisNone:# this is a ConnectionWrapper instancemsg=session._("can't connect to source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]try:rql,cachekey=RQL2RQL(self).generate(session,union,args)exceptUnknownEid,ex:ifserver.DEBUG:print'unknown eid',ex,'no results'return[]ifserver.DEBUG:print'TRANSLATED RQL',rqltry:rset=cu.execute(rql,args,cachekey)exceptException,ex:self.exception(str(ex))msg=session._("error while querying source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]descr=rset.descriptionifrset:needtranslation=[]rows=rset.rowsfori,etypeinenumerate(descr[0]):if(etypeisNoneornotself.schema.eschema(etype).is_final()orgetattr(union.locate_subquery(i,etype,args).selection[i],'uidtype',None)):needtranslation.append(i)ifneedtranslation:cnx=session.pool.connection(self.uri)forrowindexinxrange(rset.rowcount-1,-1,-1):row=rows[rowindex]forcolindexinneedtranslation:ifrow[colindex]isnotNone:# optional variableetype=descr[rowindex][colindex]exttype,exturi,extid=cnx.describe(row[colindex])ifexturi=='system'ornotexturiinself.repo.sources_by_uri:eid=self.extid2eid(row[colindex],etype,session)row[colindex]=eidelse:# skip this rowprint'skip external',etype,exturi,extid,self.uridelrows[rowindex]deldescr[rowindex]breakresults=rowselse:results=[]ifserver.DEBUG:iflen(results)>10:print'--------------->',results[:10],'...',len(results)else:print'--------------->',resultsreturnresultsdef_entity_relations_and_kwargs(self,session,entity):relations=[]kwargs={'x':self.eid2extid(entity.eid,session)}forkey,valinentity.iteritems():relations.append('X %s%%(%s)s'%(key,key))kwargs[key]=valreturnrelations,kwargsdefadd_entity(self,session,entity):"""add a new entity to the source"""raiseNotImplementedError()defupdate_entity(self,session,entity):"""update an entity in the source"""relations,kwargs=self._entity_relations_and_kwargs(session,entity)cu=session.pool[self.uri]cu.execute('SET %s WHERE X eid %%(x)s'%','.join(relations),kwargs,'x')defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""cu=session.pool[self.uri]cu.execute('DELETE %s X WHERE X eid %%(x)s'%etype,{'x':self.eid2extid(eid,session)},'x')defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""cu=session.pool[self.uri]cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.eid2extid(subject,session),'y':self.eid2extid(object,session)},('x','y'))defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""cu=session.pool[self.uri]cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.eid2extid(subject,session),'y':self.eid2extid(object,session)},('x','y'))classRQL2RQL(object):"""translate a local rql query to be executed on a distant repository"""def__init__(self,source):self.source=sourceself.current_operator=Nonedef_accept_children(self,node):res=[]forchildinnode.children:rql=child.accept(self)ifrqlisnotNone:res.append(rql)returnresdefgenerate(self,session,rqlst,args):self._session=sessionself.kwargs=argsself.cachekey=[]self.need_translation=Falsereturnself.visit_union(rqlst),self.cachekeydefvisit_union(self,node):s=self._accept_children(node)iflen(s)>1:return' UNION '.join('(%s)'%qforqins)returns[0]defvisit_select(self,node):"""return the tree as an encoded rql string"""self._varmaker=rqlvar_maker(defined=node.defined_vars.copy())self._const_var={}ifnode.distinct:base='DISTINCT Any'else:base='Any's=['%s%s'%(base,','.join(v.accept(self)forvinnode.selection))]ifnode.groupby:s.append('GROUPBY %s'%', '.join(group.accept(self)forgroupinnode.groupby))ifnode.orderby:s.append('ORDERBY %s'%', '.join(self.visit_sortterm(term)forterminnode.orderby))ifnode.limitisnotNone:s.append('LIMIT %s'%node.limit)ifnode.offset:s.append('OFFSET %s'%node.offset)restrictions=[]ifnode.whereisnotNone:nr=node.where.accept(self)ifnrisnotNone:restrictions.append(nr)ifrestrictions:s.append('WHERE %s'%','.join(restrictions))ifnode.having:s.append('HAVING %s'%', '.join(term.accept(self)forterminnode.having))subqueries=[]forsubqueryinnode.with_:subqueries.append('%s BEING (%s)'%(','.join(ca.nameforcainsubquery.aliases),self.visit_union(subquery.query)))ifsubqueries:s.append('WITH %s'%(','.join(subqueries)))return' '.join(s)defvisit_and(self,node):res=self._accept_children(node)ifres:return', '.join(res)returndefvisit_or(self,node):res=self._accept_children(node)iflen(res)>1:return' OR '.join('(%s)'%rqlforrqlinres)elifres:returnres[0]returndefvisit_not(self,node):rql=node.children[0].accept(self)ifrql:return'NOT (%s)'%rqlreturndefvisit_exists(self,node):return'EXISTS(%s)'%node.children[0].accept(self)defvisit_relation(self,node):try:ifisinstance(node.children[0],Constant):# simplified rqlst, reintroduce eid relationtry:restr,lhs=self.process_eid_const(node.children[0])exceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):returnraiseelse:lhs=node.children[0].accept(self)restr=NoneexceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):return# XXX what about optional relation or outer NOT EXISTS()raiseifnode.optionalin('left','both'):lhs+='?'ifnode.r_type=='eid'ornotself.source.schema.rschema(node.r_type).is_final():self.need_translation=Trueself.current_operator=node.operator()ifisinstance(node.children[0],Constant):self.current_etypes=(node.children[0].uidtype,)else:self.current_etypes=node.children[0].variable.stinfo['possibletypes']try:rhs=node.children[1].accept(self)exceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):return# XXX what about optional relation or outer NOT EXISTS()raiseexceptReplaceByInOperator,ex:rhs='IN (%s)'%','.join(str(eid)foreidinex.eids)self.need_translation=Falseself.current_operator=Noneifnode.optionalin('right','both'):rhs+='?'ifrestrisnotNone:return'%s%s%s, %s'%(lhs,node.r_type,rhs,restr)return'%s%s%s'%(lhs,node.r_type,rhs)defvisit_comparison(self,node):ifnode.operatorin('=','IS'):returnnode.children[0].accept(self)return'%s%s'%(node.operator.encode(),node.children[0].accept(self))defvisit_mathexpression(self,node):return'(%s%s%s)'%(node.children[0].accept(self),node.operator.encode(),node.children[1].accept(self))defvisit_function(self,node):#if node.name == 'IN':res=[]forchildinnode.children:try:rql=child.accept(self)exceptUnknownEid,ex:continueres.append(rql)ifnotres:raiseexreturn'%s(%s)'%(node.name,', '.join(res))defvisit_constant(self,node):ifself.need_translationornode.uidtype:ifnode.type=='Int':returnstr(self.eid2extid(node.value))ifnode.type=='Substitute':key=node.value# ensure we have not yet translated the value...ifnotkeyinself._const_var:self.kwargs[key]=self.eid2extid(self.kwargs[key])self.cachekey.append(key)self._const_var[key]=Nonereturnnode.as_string()defvisit_variableref(self,node):"""get the sql name for a variable reference"""returnnode.namedefvisit_sortterm(self,node):ifnode.asc:returnnode.term.accept(self)return'%s DESC'%node.term.accept(self)defprocess_eid_const(self,const):value=const.eval(self.kwargs)try:returnNone,self._const_var[value]except:var=self._varmaker.next()self.need_translation=Truerestr='%s eid %s'%(var,self.visit_constant(const))self.need_translation=Falseself._const_var[value]=varreturnrestr,vardefeid2extid(self,eid):try:returnself.source.eid2extid(eid,self._session)exceptUnknownEid:operator=self.current_operatorifoperatorisnotNoneandoperator!='=':# deal with query like X eid > 12## The problem is# that eid order in the external source may differ from the# local source## So search for all eids from this# source matching the condition locally and then to replace the# > 12 branch by IN (eids) (XXX we may have to insert a huge# number of eids...)# planner so thatsql="SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"etypes=','.join("'%s'"%etypeforetypeinself.current_etypes)cu=self._session.system_sql(sql%(self.source.uri,etypes,operator,eid))# XXX buggy cu.rowcount which may be zero while there are some# resultsrows=cu.fetchall()ifrows:raiseReplaceByInOperator((r[0]forrinrows))raise