server/sources/pyrorql.py
changeset 1238 fa29b5b60107
parent 520 29342c0cf55f
child 1239 7eda09aa7553
equal deleted inserted replaced
1237:c836bdb3b17b 1238:fa29b5b60107
     1 """Source to query another RQL repository using pyro
     1 """Source to query another RQL repository using pyro
     2 
     2 
     3 :organization: Logilab
     3 :organization: Logilab
     4 :copyright: 2007-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     4 :copyright: 2007-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
     6 """
     6 """
     7 __docformat__ = "restructuredtext en"
     7 __docformat__ = "restructuredtext en"
     8 
     8 
     9 import threading
     9 import threading
    19 from rql.utils import rqlvar_maker
    19 from rql.utils import rqlvar_maker
    20 
    20 
    21 from cubicweb import dbapi, server
    21 from cubicweb import dbapi, server
    22 from cubicweb import BadConnectionId, UnknownEid, ConnectionError
    22 from cubicweb import BadConnectionId, UnknownEid, ConnectionError
    23 from cubicweb.cwconfig import register_persistent_options
    23 from cubicweb.cwconfig import register_persistent_options
    24 from cubicweb.server.sources import AbstractSource, ConnectionWrapper
    24 from cubicweb.server.sources import AbstractSource, ConnectionWrapper, TimedCache
    25 
    25 
    26 class ReplaceByInOperator:
    26 class ReplaceByInOperator:
    27     def __init__(self, eids):
    27     def __init__(self, eids):
    28         self.eids = eids
    28         self.eids = eids
    29         
    29         
   127                        'default': 0,
   127                        'default': 0,
   128                        'help': _('timestamp of the latest source synchronization.'),
   128                        'help': _('timestamp of the latest source synchronization.'),
   129                        'group': 'sources', 
   129                        'group': 'sources', 
   130                        }),)
   130                        }),)
   131         register_persistent_options(myoptions)
   131         register_persistent_options(myoptions)
       
   132         self._query_cache = TimedCache(30)
   132 
   133 
   133     def last_update_time(self):
   134     def last_update_time(self):
   134         pkey = u'sources.%s.latest-update-time' % self.uri
   135         pkey = u'sources.%s.latest-update-time' % self.uri
   135         rql = 'Any V WHERE X is EProperty, X value V, X pkey %(k)s'
   136         rql = 'Any V WHERE X is EProperty, X value V, X pkey %(k)s'
   136         session = self.repo.internal_session()
   137         session = self.repo.internal_session()
   151 
   152 
   152     def init(self):
   153     def init(self):
   153         """method called by the repository once ready to handle request"""
   154         """method called by the repository once ready to handle request"""
   154         interval = int(self.config.get('synchronization-interval', 5*60))
   155         interval = int(self.config.get('synchronization-interval', 5*60))
   155         self.repo.looping_task(interval, self.synchronize) 
   156         self.repo.looping_task(interval, self.synchronize) 
       
   157         self.repo.looping_task(self._query_cache.ttl.seconds/10, self._query_cache.clear_expired) 
   156 
   158 
   157     def synchronize(self, mtime=None):
   159     def synchronize(self, mtime=None):
   158         """synchronize content known by this repository with content in the
   160         """synchronize content known by this repository with content in the
   159         external repository
   161         external repository
   160         """
   162         """
   238             except (BadConnectionId, ConnectionClosedError):
   240             except (BadConnectionId, ConnectionClosedError):
   239                 pass
   241                 pass
   240         # try to reconnect
   242         # try to reconnect
   241         return self.get_connection()
   243         return self.get_connection()
   242         
   244         
   243     
       
   244     def syntax_tree_search(self, session, union, args=None, cachekey=None,
   245     def syntax_tree_search(self, session, union, args=None, cachekey=None,
   245                            varmap=None):
   246                            varmap=None):
       
   247         assert not varmap, (varmap, union)
       
   248         rqlkey = union.as_string(kwargs=args)
       
   249         try:
       
   250             results = self._query_cache[rqlkey]
       
   251             print 'cache hit', rqlkey
       
   252         except KeyError:
       
   253             results = self._syntax_tree_search(session, union, args)
       
   254             print 'cache miss', rqlkey
       
   255             self._query_cache[rqlkey] = results
       
   256         return results
       
   257     
       
   258     def _syntax_tree_search(self, session, union, args):
   246         """return result from this source for a rql query (actually from a rql 
   259         """return result from this source for a rql query (actually from a rql 
   247         syntax tree and a solution dictionary mapping each used variable to a 
   260         syntax tree and a solution dictionary mapping each used variable to a 
   248         possible type). If cachekey is given, the query necessary to fetch the
   261         possible type). If cachekey is given, the query necessary to fetch the
   249         results (but not the results themselves) may be cached using this key.
   262         results (but not the results themselves) may be cached using this key.
   250         """
   263         """