turn default logging threshold to warning (we usually want them), and log 'no schema for eid' pb using warning instead of error, so we see them in logs but not during migration
"""Source to query another RQL repository using pyro:organization: Logilab:copyright: 2007-2010 LOGILAB S.A. (Paris, FRANCE), license is LGPL v2.:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr:license: GNU Lesser General Public License, v2.1 - http://www.gnu.org/licenses"""__docformat__="restructuredtext en"importthreadingfromos.pathimportjoinfromtimeimportmktimefromdatetimeimportdatetimefrombase64importb64decodefromPyro.errorsimportPyroError,ConnectionClosedErrorfromlogilab.common.configurationimportREQUIREDfromrql.nodesimportConstantfromrql.utilsimportrqlvar_makerfromcubicwebimportdbapi,serverfromcubicwebimportBadConnectionId,UnknownEid,ConnectionErrorfromcubicweb.cwconfigimportregister_persistent_optionsfromcubicweb.server.sourcesimport(AbstractSource,ConnectionWrapper,TimedCache,dbg_st_search,dbg_results)defuidtype(union,col,etype,args):select,col=union.locate_subquery(col,etype,args)returngetattr(select.selection[col],'uidtype',None)classReplaceByInOperator(Exception):def__init__(self,eids):self.eids=eidsclassPyroRQLSource(AbstractSource):"""External repository source, using Pyro connection"""# boolean telling if modification hooks should be called when something is# modified in this sourceshould_call_hooks=False# boolean telling if the repository should connect to this source during# migrationconnect_for_migration=Falsesupport_entities=Noneoptions=(# XXX pyro-ns host/port('pyro-ns-id',{'type':'string','default':REQUIRED,'help':'identifier of the repository in the pyro name server','group':'pyro-source','inputlevel':0,}),('mapping-file',{'type':'string','default':REQUIRED,'help':'path to a python file with the schema mapping definition','group':'pyro-source','inputlevel':1,}),('cubicweb-user',{'type':'string','default':REQUIRED,'help':'user to use for connection on the distant repository','group':'pyro-source','inputlevel':0,}),('cubicweb-password',{'type':'password','default':'','help':'user to use for connection on the distant repository','group':'pyro-source','inputlevel':0,}),('base-url',{'type':'string','default':'','help':'url of the web site for the distant repository, if you want ''to generate external link to entities from this repository','group':'pyro-source','inputlevel':1,}),('pyro-ns-host',{'type':'string','default':None,'help':'Pyro name server\'s host. If not set, default to the value \from all_in_one.conf. It may contains port information using <host>:<port> notation.','group':'pyro-source','inputlevel':1,}),('pyro-ns-group',{'type':'string','default':None,'help':'Pyro name server\'s group where the repository will be \registered. If not set, default to the value from all_in_one.conf.','group':'pyro-source','inputlevel':1,}),('synchronization-interval',{'type':'int','default':5*60,'help':'interval between synchronization with the external \repository (default to 5 minutes).','group':'pyro-source','inputlevel':2,}),)PUBLIC_KEYS=AbstractSource.PUBLIC_KEYS+('base-url',)_conn=Nonedef__init__(self,repo,appschema,source_config,*args,**kwargs):AbstractSource.__init__(self,repo,appschema,source_config,*args,**kwargs)mappingfile=source_config['mapping-file']ifnotmappingfile[0]=='/':mappingfile=join(repo.config.apphome,mappingfile)mapping={}execfile(mappingfile,mapping)self.support_entities=mapping['support_entities']self.support_relations=mapping.get('support_relations',{})self.dont_cross_relations=mapping.get('dont_cross_relations',())self.cross_relations=mapping.get('cross_relations',())baseurl=source_config.get('base-url')ifbaseurlandnotbaseurl.endswith('/'):source_config['base-url']+='/'self.config=source_configmyoptions=(('%s.latest-update-time'%self.uri,{'type':'int','sitewide':True,'default':0,'help':_('timestamp of the latest source synchronization.'),'group':'sources',}),)register_persistent_options(myoptions)self._query_cache=TimedCache(30)defreset_caches(self):"""method called during test to reset potential source caches"""self._query_cache=TimedCache(30)deflast_update_time(self):pkey=u'sources.%s.latest-update-time'%self.urirql='Any V WHERE X is CWProperty, X value V, X pkey %(k)s'session=self.repo.internal_session()try:rset=session.execute(rql,{'k':pkey})ifnotrset:# insert itsession.execute('INSERT CWProperty X: X pkey %(k)s, X value %(v)s',{'k':pkey,'v':u'0'})session.commit()timestamp=0else:assertlen(rset)==1timestamp=int(rset[0][0])returndatetime.fromtimestamp(timestamp)finally:session.close()definit(self):"""method called by the repository once ready to handle request"""interval=int(self.config.get('synchronization-interval',5*60))self.repo.looping_task(interval,self.synchronize)self.repo.looping_task(self._query_cache.ttl.seconds/10,self._query_cache.clear_expired)defsynchronize(self,mtime=None):"""synchronize content known by this repository with content in the external repository """self.info('synchronizing pyro source %s',self.uri)cnx=self.get_connection()try:extrepo=cnx._repoexceptAttributeError:# fake connection wrapper returned when we can't connect to the# external source (hence we've no chance to synchronize...)returnetypes=self.support_entities.keys()ifmtimeisNone:mtime=self.last_update_time()updatetime,modified,deleted=extrepo.entities_modified_since(etypes,mtime)self._query_cache.clear()repo=self.reposession=repo.internal_session()try:foretype,extidinmodified:try:exturi=cnx.describe(extid)[1]ifexturi=='system'ornotexturiinrepo.sources_by_uri:eid=self.extid2eid(str(extid),etype,session)rset=session.eid_rset(eid,etype)entity=rset.get_entity(0,0)entity.complete(entity.e_schema.indexable_attributes())repo.index_entity(session,entity)except:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continueforetype,extidindeleted:try:eid=self.extid2eid(str(extid),etype,session,insert=False)# entity has been deleted from external repository but is not known hereifeidisnotNone:repo.delete_info(session,eid)except:self.exception('while updating %s with external id %s of source %s',etype,extid,self.uri)continuesession.execute('SET X value %(v)s WHERE X pkey %(k)s',{'k':u'sources.%s.latest-update-time'%self.uri,'v':unicode(int(mktime(updatetime.timetuple())))})session.commit()finally:session.close()def_get_connection(self):"""open and return a connection to the source"""nshost=self.config.get('pyro-ns-host')orself.repo.config['pyro-ns-host']nsgroup=self.config.get('pyro-ns-group')orself.repo.config['pyro-ns-group']self.info('connecting to instance :%s.%s for user %s',nsgroup,self.config['pyro-ns-id'],self.config['cubicweb-user'])#cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])returndbapi.connect(database=self.config['pyro-ns-id'],login=self.config['cubicweb-user'],password=self.config['cubicweb-password'],host=nshost,group=nsgroup,setvreg=False)#cnxprops=cnxprops)defget_connection(self):try:returnself._get_connection()except(ConnectionError,PyroError):self.critical("can't get connection to source %s",self.uri,exc_info=1)returnConnectionWrapper()defcheck_connection(self,cnx):"""check connection validity, return None if the connection is still valid else a new connection """# we have to transfer manually thread ownership. This can be done safely# since the pool to which belong the connection is affected to one# session/thread and can't be called simultaneouslytry:cnx._repo._transferThread(threading.currentThread())exceptAttributeError:# inmemory connectionpassifnotisinstance(cnx,ConnectionWrapper):try:cnx.check()return# okexcept(BadConnectionId,ConnectionClosedError):pass# try to reconnectreturnself.get_connection()defsyntax_tree_search(self,session,union,args=None,cachekey=None,varmap=None):assertdbg_st_search(self.uri,union,varmap,args,cachekey)rqlkey=union.as_string(kwargs=args)try:results=self._query_cache[rqlkey]exceptKeyError:results=self._syntax_tree_search(session,union,args)self._query_cache[rqlkey]=resultsassertdbg_results(results)returnresultsdef_syntax_tree_search(self,session,union,args):"""return result from this source for a rql query (actually from a rql syntax tree and a solution dictionary mapping each used variable to a possible type). If cachekey is given, the query necessary to fetch the results (but not the results themselves) may be cached using this key. """ifnotargsisNone:args=args.copy()# get cached cursor anywaycu=session.pool[self.uri]ifcuisNone:# this is a ConnectionWrapper instancemsg=session._("can't connect to source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]try:rql,cachekey=RQL2RQL(self).generate(session,union,args)exceptUnknownEid,ex:ifserver.DEBUG:print' unknown eid',ex,'no results'return[]ifserver.DEBUG&server.DBG_RQL:print' translated rql',rqltry:rset=cu.execute(rql,args,cachekey)exceptException,ex:self.exception(str(ex))msg=session._("error while querying source %s, some data may be missing")session.set_shared_data('sources_error',msg%self.uri)return[]descr=rset.descriptionifrset:needtranslation=[]rows=rset.rowsfori,etypeinenumerate(descr[0]):if(etypeisNoneornotself.schema.eschema(etype).finaloruidtype(union,i,etype,args)):needtranslation.append(i)ifneedtranslation:cnx=session.pool.connection(self.uri)forrowindexinxrange(rset.rowcount-1,-1,-1):row=rows[rowindex]forcolindexinneedtranslation:ifrow[colindex]isnotNone:# optional variableetype=descr[rowindex][colindex]exttype,exturi,extid=cnx.describe(row[colindex])ifexturi=='system'ornotexturiinself.repo.sources_by_uri:eid=self.extid2eid(str(row[colindex]),etype,session)row[colindex]=eidelse:# skip this rowdelrows[rowindex]deldescr[rowindex]breakresults=rowselse:results=[]returnresultsdef_entity_relations_and_kwargs(self,session,entity):relations=[]kwargs={'x':self.eid2extid(entity.eid,session)}forkey,valinentity.iteritems():relations.append('X %s%%(%s)s'%(key,key))kwargs[key]=valreturnrelations,kwargsdefadd_entity(self,session,entity):"""add a new entity to the source"""raiseNotImplementedError()defupdate_entity(self,session,entity):"""update an entity in the source"""relations,kwargs=self._entity_relations_and_kwargs(session,entity)cu=session.pool[self.uri]cu.execute('SET %s WHERE X eid %%(x)s'%','.join(relations),kwargs,'x')self._query_cache.clear()entity.clear_all_caches()defdelete_entity(self,session,etype,eid):"""delete an entity from the source"""cu=session.pool[self.uri]cu.execute('DELETE %s X WHERE X eid %%(x)s'%etype,{'x':self.eid2extid(eid,session)},'x')self._query_cache.clear()defadd_relation(self,session,subject,rtype,object):"""add a relation to the source"""cu=session.pool[self.uri]cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.eid2extid(subject,session),'y':self.eid2extid(object,session)},('x','y'))self._query_cache.clear()session.entity_from_eid(subject).clear_all_caches()session.entity_from_eid(object).clear_all_caches()defdelete_relation(self,session,subject,rtype,object):"""delete a relation from the source"""cu=session.pool[self.uri]cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s'%rtype,{'x':self.eid2extid(subject,session),'y':self.eid2extid(object,session)},('x','y'))self._query_cache.clear()session.entity_from_eid(subject).clear_all_caches()session.entity_from_eid(object).clear_all_caches()classRQL2RQL(object):"""translate a local rql query to be executed on a distant repository"""def__init__(self,source):self.source=sourceself.current_operator=Nonedef_accept_children(self,node):res=[]forchildinnode.children:rql=child.accept(self)ifrqlisnotNone:res.append(rql)returnresdefgenerate(self,session,rqlst,args):self._session=sessionself.kwargs=argsself.cachekey=[]self.need_translation=Falsereturnself.visit_union(rqlst),self.cachekeydefvisit_union(self,node):s=self._accept_children(node)iflen(s)>1:return' UNION '.join('(%s)'%qforqins)returns[0]defvisit_select(self,node):"""return the tree as an encoded rql string"""self._varmaker=rqlvar_maker(defined=node.defined_vars.copy())self._const_var={}ifnode.distinct:base='DISTINCT Any'else:base='Any's=['%s%s'%(base,','.join(v.accept(self)forvinnode.selection))]ifnode.groupby:s.append('GROUPBY %s'%', '.join(group.accept(self)forgroupinnode.groupby))ifnode.orderby:s.append('ORDERBY %s'%', '.join(self.visit_sortterm(term)forterminnode.orderby))ifnode.limitisnotNone:s.append('LIMIT %s'%node.limit)ifnode.offset:s.append('OFFSET %s'%node.offset)restrictions=[]ifnode.whereisnotNone:nr=node.where.accept(self)ifnrisnotNone:restrictions.append(nr)ifrestrictions:s.append('WHERE %s'%','.join(restrictions))ifnode.having:s.append('HAVING %s'%', '.join(term.accept(self)forterminnode.having))subqueries=[]forsubqueryinnode.with_:subqueries.append('%s BEING (%s)'%(','.join(ca.nameforcainsubquery.aliases),self.visit_union(subquery.query)))ifsubqueries:s.append('WITH %s'%(','.join(subqueries)))return' '.join(s)defvisit_and(self,node):res=self._accept_children(node)ifres:return', '.join(res)returndefvisit_or(self,node):res=self._accept_children(node)iflen(res)>1:return' OR '.join('(%s)'%rqlforrqlinres)elifres:returnres[0]returndefvisit_not(self,node):rql=node.children[0].accept(self)ifrql:return'NOT (%s)'%rqlreturndefvisit_exists(self,node):return'EXISTS(%s)'%node.children[0].accept(self)defvisit_relation(self,node):try:ifisinstance(node.children[0],Constant):# simplified rqlst, reintroduce eid relationtry:restr,lhs=self.process_eid_const(node.children[0])exceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):returnraiseelse:lhs=node.children[0].accept(self)restr=NoneexceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):return# XXX what about optional relation or outer NOT EXISTS()raiseifnode.optionalin('left','both'):lhs+='?'ifnode.r_type=='eid'ornotself.source.schema.rschema(node.r_type).final:self.need_translation=Trueself.current_operator=node.operator()ifisinstance(node.children[0],Constant):self.current_etypes=(node.children[0].uidtype,)else:self.current_etypes=node.children[0].variable.stinfo['possibletypes']try:rhs=node.children[1].accept(self)exceptUnknownEid:# can safely skip not relation with an unsupported eidifnode.neged(strict=True):return# XXX what about optional relation or outer NOT EXISTS()raiseexceptReplaceByInOperator,ex:rhs='IN (%s)'%','.join(eidforeidinex.eids)self.need_translation=Falseself.current_operator=Noneifnode.optionalin('right','both'):rhs+='?'ifrestrisnotNone:return'%s%s%s, %s'%(lhs,node.r_type,rhs,restr)return'%s%s%s'%(lhs,node.r_type,rhs)defvisit_comparison(self,node):ifnode.operatorin('=','IS'):returnnode.children[0].accept(self)return'%s%s'%(node.operator.encode(),node.children[0].accept(self))defvisit_mathexpression(self,node):return'(%s%s%s)'%(node.children[0].accept(self),node.operator.encode(),node.children[1].accept(self))defvisit_function(self,node):#if node.name == 'IN':res=[]forchildinnode.children:try:rql=child.accept(self)exceptUnknownEid,ex:continueres.append(rql)ifnotres:raiseexreturn'%s(%s)'%(node.name,', '.join(res))defvisit_constant(self,node):ifself.need_translationornode.uidtype:ifnode.type=='Int':returnstr(self.eid2extid(node.value))ifnode.type=='Substitute':key=node.value# ensure we have not yet translated the value...ifnotkeyinself._const_var:self.kwargs[key]=self.eid2extid(self.kwargs[key])self.cachekey.append(key)self._const_var[key]=Nonereturnnode.as_string()defvisit_variableref(self,node):"""get the sql name for a variable reference"""returnnode.namedefvisit_sortterm(self,node):ifnode.asc:returnnode.term.accept(self)return'%s DESC'%node.term.accept(self)defprocess_eid_const(self,const):value=const.eval(self.kwargs)try:returnNone,self._const_var[value]except:var=self._varmaker.next()self.need_translation=Truerestr='%s eid %s'%(var,self.visit_constant(const))self.need_translation=Falseself._const_var[value]=varreturnrestr,vardefeid2extid(self,eid):try:returnself.source.eid2extid(eid,self._session)exceptUnknownEid:operator=self.current_operatorifoperatorisnotNoneandoperator!='=':# deal with query like "X eid > 12"## The problem is that eid order in the external source may# differ from the local source## So search for all eids from this source matching the condition# locally and then to replace the "> 12" branch by "IN (eids)"## XXX we may have to insert a huge number of eids...)sql="SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"etypes=','.join("'%s'"%etypeforetypeinself.current_etypes)cu=self._session.system_sql(sql%(self.source.uri,etypes,operator,eid))# XXX buggy cu.rowcount which may be zero while there are some# resultsrows=cu.fetchall()ifrows:raiseReplaceByInOperator((b64decode(r[0])forrinrows))raise