refactoring owl view (now corresponding to tbox) and owlabox view
"""Source to query another RQL repository using pyro:organization: Logilab:copyright: 2007-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr"""__docformat__="restructuredtext en"importthreadingfromos.pathimportjoinfrommx.DateTimeimportDateTimeFromTicksfromPyro.errorsimportPyroError,ConnectionClosedErrorfromlogilab.common.configurationimportREQUIREDfromrql.nodesimportConstantfromrql.utilsimportrqlvar_makerfromcubicwebimportdbapi,serverfromcubicwebimportBadConnectionId,UnknownEid,ConnectionErrorfromcubicweb.cwconfigimportregister_persistent_optionsfromcubicweb.server.sourcesimportAbstractSource,ConnectionWrapperclassReplaceByInOperator:def__init__(self,eids):self.eids=eidsclassPyroRQLSource(AbstractSource):"""External repository source, using Pyro connection"""# boolean telling if modification hooks should be called when something is# modified in this sourceshould_call_hooks=False# boolean telling if the repository should connect to this source during# migrationconnect_for_migration=Falsesupport_entities=Noneoptions=(# XXX pyro-ns host/port('pyro-ns-id',{'type':'string','default':REQUIRED,'help':'identifier of the repository in the pyro name server','group':'pyro-source','inputlevel':0,}),('mapping-file',{'type':'string','default':REQUIRED,'help':'path to a python file with the schema mapping definition','group':'pyro-source','inputlevel':1,}),('cubicweb-user',{'type':'string','default':REQUIRED,'help':'user to use for connection on the distant repository','group':'pyro-source','inputlevel':0,}),('cubicweb-password',{'type':'password','default':'','help':'user to use for connection on the distant repository','group':'pyro-source','inputlevel':0,}),('base-url',{'type':'string','default':'','help':'url of the web site for the distant repository, if you want ''to generate external link to entities from this repository','group':'pyro-source','inputlevel':1,}),('pyro-ns-host',{'type':'string','default':None,'help':'Pyro name server\'s host. If not set, default to the value \from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('pyro-ns-port',{'type':'int','default':None,'help':'Pyro name server\'s listening port. If not set, default to \the value from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('pyro-ns-group',{'type':'string','default':None,'help':'Pyro name server\'s group where the repository will be \registered. If not set, default to the value from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('synchronization-interval',{'type':'int','default':5*60,'help':'interval between synchronization with the external \repository (default to 5 minutes).','group':'pyro-source','inputlevel':2,}),)PUBLIC_KEYS=AbstractSource.PUBLIC_KEYS+('base-url',)_conn=Nonedef__init__(self,repo,appschema,source_config,*args,**kwargs):AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)mappingfile=source_config['mapping-file']ifnotmappingfile[0]=='/':mappingfile=join(repo.config.apphome,mappingfile)mapping={}execfile(mappingfile,mapping)self.support_entities=mapping['support_entities']self.support_relations=mapping.get('support_relations',{})self.dont_cross_relations=mapping.get('dont_cross_relations',())self.cross_relations=mapping.get('cross_relations',())baseurl=source_config.get('base-url')ifbaseurlandnotbaseurl.endswith('/'):source_config['base-url']+='/'self.config=source_configmyoptions=(('%s.latest-update-time'%self.uri,{'type':'int','sitewide':True,'default':0,'help':_('timestamp of the latest source synchronization.'),'group':'sources',}),)register_persistent_options(myoptions)deflast_update_time(self):pkey=u'sources.%s.latest-update-time'%self.urirql='Any V WHERE X is EProperty, X value V, X pkey %(k)s'session=self.repo.internal_session()try:rset=session.execute(rql,{'k':pkey})ifnotrset:# insert itsession.execute('INSERT EProperty X: X pkey %(k)s, X value %(v)s',{'k':pkey,'v':u'0'})session.commit()timestamp=0else:assertlen(rset)==1timestamp=int(rset[0][0])returnDateTimeFromTicks(timestamp)finally:session.close()definit(self):"""method called by the repository once ready to handle request"""interval=int(self.config.get('synchronization-interval',5*60))self.repo.looping_task(interval,self.synchronize)defsynchronize(self,mtime=None):"""synchronize content known by this repository with content in the external repository """self.info('synchronizing pyro source %s',self.uri)cnx=self.get_connection()extrepo=cnx._repoetypes=self.support_entities.keys()ifmtimeisNone:mtime=self.last_update_time()updatetime,modified,deleted=extrepo.entities_modified_since(etypes,mtime)repo=self.reposession=repo.internal_session()try:foretype,extidinmodified:try:exturi=cnx.describe(extid)[1]ifexturi=='system'ornotexturiinrepo.sources_by_uri:eid=self.extid2eid(extid,etype,session)rset=session.eid_rset(eid,etype)entity=rset.get_entity(0,0)entity.complete(entity.e_schema.indexable_attributes())repo.index_entity(session,entity)except:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueforetype,extidindeleted:try:eid=self.extid2eid(extid,etype,session,insert=False)# entity has been deleted from external repository but is not known hereifeidisnotNone:repo.delete_info(session,eid)except:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continuesession.execute('SET X value %(v)s WHERE X pkey %(k)s',{'k':u'sources.%s.latest-update-time'%self.uri,'v':unicode(int(updatetime.ticks()))})session.commit()finally:session.close()def_get_connection(self):"""open and return a connection to the source"""nshost=self.config.get('pyro-ns-host')orself.repo.config['pyro-ns-host']nsport=self.config.get('pyro-ns-port')orself.repo.config['pyro-ns-port']nsgroup=self.config.get('pyro-ns-group')orself.repo.config['pyro-ns-group']#cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])returndbapi.connect(database=self.config['pyro-ns-id'],user=self.config['cubicweb-user'],password=self.config['cubicweb-password'],host=nshost,port=nsport,group=nsgroup,setvreg=False)#cnxprops=cnxprops)defget_connection(self):try:returnself._get_connection()except(ConnectionError,PyroError):self.critical("can't get connection to source %s",self.uri,exc_info=1)returnConnectionWrapper()defcheck_connection(self,cnx):"""check connection validity, return None if the connection is still valid else a new connection """# we have to transfer manually thread ownership. This can be done safely# since the pool to which belong the connection is affected to one# session/thread and can't be called simultaneouslytry:cnx._repo._transferThread(threading.currentThread())exceptAttributeError:# inmemory connectionpassifnotisinstance(cnx,ConnectionWrapper):try:cnx.check()return# okexcept(BadConnectionId,ConnectionClosedError):pass# try to reconnectreturnself.get_connection()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """ifnotargsisNone:args=args.copy()ifserver.DEBUG:print'RQL FOR PYRO SOURCE',self.uriprintunion.as_string()ifargs:print'ARGS',argsprint'SOLUTIONS',','.join(str(s.solutions)forsinunion.children)# get cached cursor anywaycu=session.pool[self.uri]ifcuisNone:# this is a ConnectionWrapper instancemsg=session._("can't connect to source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]try:rql,cachekey=RQL2RQL(self).generate(session,union,args)exceptUnknownEid,ex:ifserver.DEBUG:print'unknown eid',ex,'no results'return[]ifserver.DEBUG:print'TRANSLATED RQL',rqltry:rset=cu.execute(rql,args,cachekey)exceptException,ex:self.exception(str(ex))msg=session._("error while querying source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]descr=rset.descriptionifrset:needtranslation=[]rows=rset.rowsfori,etypeinenumerate(descr[0]):if(etypeisNoneornotself.schema.eschema(etype).is_final()orgetattr(union.locate_subquery(i,etype,args).selection[i],'uidtype',None)):needtranslation.append(i)ifneedtranslation:cnx=session.pool.connection(self.uri)forrowindexinxrange(rset.rowcount-1,-1,-1):row=rows[rowindex]forcolindexinneedtranslation:ifrow[colindex]isnotNone:# optional variableetype=descr[rowindex][colindex]exttype,exturi,extid=cnx.describe(row[colindex])ifexturi=='system'ornotexturiinself.repo.sources_by_uri:eid=self.extid2eid(row[colindex],etype,session)row[colindex]=eidelse:# skip this rowdelrows[rowindex]deldescr[rowindex]breakresults=rowselse:results=[]ifserver.DEBUG:iflen(results)>10:print'--------------->',results[:10],'...',len(results)else:print'--------------->',resultsreturnresultsdef_entity_relations_and_kwargs(self,session,entity):relations=[]kwargs={'x':self.eid2extid(entity.eid,session)}forkey,valinentity.iteritems():relations.append('X %s%%(%s)s'%(key,key))kwargs[key]=valreturnrelations,kwargsdefadd_entity(self,session,entity):"""add a new entity to the source"""raiseNotImplementedError()defupdate_entity(self,session,entity):"""update an entity in the source"""relations,kwargs=self._entity_relations_and_kwargs(session,entity)cu=session.pool[self.uri]cu.execute('SET %s WHERE X eid %%(x)s'%','.join(relations),kwargs,'x')defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""cu=session.pool[self.uri]cu.execute('DELETE %s X WHERE X eid %%(x)s'%etype,{'x':self.eid2extid(eid,session)},'x')defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""cu=session.pool[self.uri]cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.eid2extid(subject,session),'y':self.eid2extid(object,session)},('x','y'))defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""cu=session.pool[self.uri]cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.eid2extid(subject,session),'y':self.eid2extid(object,session)},('x','y'))classRQL2RQL(object):"""translate a local rql query to be executed on a distant repository"""def__init__(self,source):self.source=sourceself.current_operator=Nonedef_accept_children(self,node):res=[]forchildinnode.children:rql=child.accept(self)ifrqlisnotNone:res.append(rql)returnresdefgenerate(self,session,rqlst,args):self._session=sessionself.kwargs=argsself.cachekey=[]self.need_translation=Falsereturnself.visit_union(rqlst),self.cachekeydefvisit_union(self,node):s=self._accept_children(node)iflen(s)>1:return' UNION '.join('(%s)'%qforqins)returns[0]defvisit_select(self,node):"""return the tree as an encoded rql string"""self._varmaker=rqlvar_maker(defined=node.defined_vars.copy())self._const_var={}ifnode.distinct:base='DISTINCT Any'else:base='Any's=['%s%s'%(base,','.join(v.accept(self)forvinnode.selection))]ifnode.groupby:s.append('GROUPBY %s'%', '.join(group.accept(self)forgroupinnode.groupby))ifnode.orderby:s.append('ORDERBY %s'%', '.join(self.visit_sortterm(term)forterminnode.orderby))ifnode.limitisnotNone:s.append('LIMIT %s'%node.limit)ifnode.offset:s.append('OFFSET %s'%node.offset)restrictions=[]ifnode.whereisnotNone:nr=node.where.accept(self)ifnrisnotNone:restrictions.append(nr)ifrestrictions:s.append('WHERE %s'%','.join(restrictions))ifnode.having:s.append('HAVING %s'%', '.join(term.accept(self)forterminnode.having))subqueries=[]forsubqueryinnode.with_:subqueries.append('%s BEING (%s)'%(','.join(ca.nameforcainsubquery.aliases),self.visit_union(subquery.query)))ifsubqueries:s.append('WITH %s'%(','.join(subqueries)))return' '.join(s)defvisit_and(self,node):res=self._accept_children(node)ifres:return', '.join(res)returndefvisit_or(self,node):res=self._accept_children(node)iflen(res)>1:return' OR '.join('(%s)'%rqlforrqlinres)elifres:returnres[0]returndefvisit_not(self,node):rql=node.children[0].accept(self)ifrql:return'NOT (%s)'%rqlreturndefvisit_exists(self,node):return'EXISTS(%s)'%node.children[0].accept(self)defvisit_relation(self,node):try:ifisinstance(node.children[0],Constant):# simplified rqlst, reintroduce eid relationtry:restr,lhs=self.process_eid_const(node.children[0])exceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):returnraiseelse:lhs=node.children[0].accept(self)restr=NoneexceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):return# XXX what about optional relation or outer NOT EXISTS()raiseifnode.optionalin('left','both'):lhs+='?'ifnode.r_type=='eid'ornotself.source.schema.rschema(node.r_type).is_final():self.need_translation=Trueself.current_operator=node.operator()ifisinstance(node.children[0],Constant):self.current_etypes=(node.children[0].uidtype,)else:self.current_etypes=node.children[0].variable.stinfo['possibletypes']try:rhs=node.children[1].accept(self)exceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):return# XXX what about optional relation or outer NOT EXISTS()raiseexceptReplaceByInOperator,ex:rhs='IN (%s)'%','.join(str(eid)foreidinex.eids)self.need_translation=Falseself.current_operator=Noneifnode.optionalin('right','both'):rhs+='?'ifrestrisnotNone:return'%s%s%s, %s'%(lhs,node.r_type,rhs,restr)return'%s%s%s'%(lhs,node.r_type,rhs)defvisit_comparison(self,node):ifnode.operatorin('=','IS'):returnnode.children[0].accept(self)return'%s%s'%(node.operator.encode(),node.children[0].accept(self))defvisit_mathexpression(self,node):return'(%s%s%s)'%(node.children[0].accept(self),node.operator.encode(),node.children[1].accept(self))defvisit_function(self,node):#if node.name == 'IN':res=[]forchildinnode.children:try:rql=child.accept(self)exceptUnknownEid,ex:continueres.append(rql)ifnotres:raiseexreturn'%s(%s)'%(node.name,', '.join(res))defvisit_constant(self,node):ifself.need_translationornode.uidtype:ifnode.type=='Int':returnstr(self.eid2extid(node.value))ifnode.type=='Substitute':key=node.value# ensure we have not yet translated the value...ifnotkeyinself._const_var:self.kwargs[key]=self.eid2extid(self.kwargs[key])self.cachekey.append(key)self._const_var[key]=Nonereturnnode.as_string()defvisit_variableref(self,node):"""get the sql name for a variable reference"""returnnode.namedefvisit_sortterm(self,node):ifnode.asc:returnnode.term.accept(self)return'%s DESC'%node.term.accept(self)defprocess_eid_const(self,const):value=const.eval(self.kwargs)try:returnNone,self._const_var[value]except:var=self._varmaker.next()self.need_translation=Truerestr='%s eid %s'%(var,self.visit_constant(const))self.need_translation=Falseself._const_var[value]=varreturnrestr,vardefeid2extid(self,eid):try:returnself.source.eid2extid(eid,self._session)exceptUnknownEid:operator=self.current_operatorifoperatorisnotNoneandoperator!='=':# deal with query like X eid > 12## The problem is# that eid order in the external source may differ from the# local source## So search for all eids from this# source matching the condition locally and then to replace the# > 12 branch by IN (eids) (XXX we may have to insert a huge# number of eids...)# planner so thatsql="SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"etypes=','.join("'%s'"%etypeforetypeinself.current_etypes)cu=self._session.system_sql(sql%(self.source.uri,etypes,operator,eid))# XXX buggy cu.rowcount which may be zero while there are some# resultsrows=cu.fetchall()ifrows:raiseReplaceByInOperator((r[0]forrinrows))raise