20 from rql.utils import rqlvar_maker |
20 from rql.utils import rqlvar_maker |
21 |
21 |
22 from cubicweb import dbapi, server |
22 from cubicweb import dbapi, server |
23 from cubicweb import BadConnectionId, UnknownEid, ConnectionError |
23 from cubicweb import BadConnectionId, UnknownEid, ConnectionError |
24 from cubicweb.cwconfig import register_persistent_options |
24 from cubicweb.cwconfig import register_persistent_options |
25 from cubicweb.server.sources import AbstractSource, ConnectionWrapper |
25 from cubicweb.server.sources import AbstractSource, ConnectionWrapper, TimedCache |
26 |
26 |
27 class ReplaceByInOperator(Exception): |
27 class ReplaceByInOperator(Exception): |
28 def __init__(self, eids): |
28 def __init__(self, eids): |
29 self.eids = eids |
29 self.eids = eids |
30 |
30 |
128 'default': 0, |
128 'default': 0, |
129 'help': _('timestamp of the latest source synchronization.'), |
129 'help': _('timestamp of the latest source synchronization.'), |
130 'group': 'sources', |
130 'group': 'sources', |
131 }),) |
131 }),) |
132 register_persistent_options(myoptions) |
132 register_persistent_options(myoptions) |
|
133 self._query_cache = TimedCache(30) |
133 |
134 |
134 def last_update_time(self): |
135 def last_update_time(self): |
135 pkey = u'sources.%s.latest-update-time' % self.uri |
136 pkey = u'sources.%s.latest-update-time' % self.uri |
136 rql = 'Any V WHERE X is EProperty, X value V, X pkey %(k)s' |
137 rql = 'Any V WHERE X is EProperty, X value V, X pkey %(k)s' |
137 session = self.repo.internal_session() |
138 session = self.repo.internal_session() |
152 |
153 |
153 def init(self): |
154 def init(self): |
154 """method called by the repository once ready to handle request""" |
155 """method called by the repository once ready to handle request""" |
155 interval = int(self.config.get('synchronization-interval', 5*60)) |
156 interval = int(self.config.get('synchronization-interval', 5*60)) |
156 self.repo.looping_task(interval, self.synchronize) |
157 self.repo.looping_task(interval, self.synchronize) |
|
158 self.repo.looping_task(self._query_cache.ttl.seconds/10, self._query_cache.clear_expired) |
157 |
159 |
158 def synchronize(self, mtime=None): |
160 def synchronize(self, mtime=None): |
159 """synchronize content known by this repository with content in the |
161 """synchronize content known by this repository with content in the |
160 external repository |
162 external repository |
161 """ |
163 """ |
239 except (BadConnectionId, ConnectionClosedError): |
241 except (BadConnectionId, ConnectionClosedError): |
240 pass |
242 pass |
241 # try to reconnect |
243 # try to reconnect |
242 return self.get_connection() |
244 return self.get_connection() |
243 |
245 |
244 |
|
245 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
246 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
246 varmap=None): |
247 varmap=None): |
|
248 #assert not varmap, (varmap, union) |
|
249 rqlkey = union.as_string(kwargs=args) |
|
250 try: |
|
251 results = self._query_cache[rqlkey] |
|
252 except KeyError: |
|
253 results = self._syntax_tree_search(session, union, args) |
|
254 self._query_cache[rqlkey] = results |
|
255 return results |
|
256 |
|
257 def _syntax_tree_search(self, session, union, args): |
247 """return result from this source for a rql query (actually from a rql |
258 """return result from this source for a rql query (actually from a rql |
248 syntax tree and a solution dictionary mapping each used variable to a |
259 syntax tree and a solution dictionary mapping each used variable to a |
249 possible type). If cachekey is given, the query necessary to fetch the |
260 possible type). If cachekey is given, the query necessary to fetch the |
250 results (but not the results themselves) may be cached using this key. |
261 results (but not the results themselves) may be cached using this key. |
251 """ |
262 """ |