25 from cubicweb.server.sources import AbstractSource, ConnectionWrapper, TimedCache |
25 from cubicweb.server.sources import AbstractSource, ConnectionWrapper, TimedCache |
26 |
26 |
27 class ReplaceByInOperator(Exception): |
27 class ReplaceByInOperator(Exception): |
28 def __init__(self, eids): |
28 def __init__(self, eids): |
29 self.eids = eids |
29 self.eids = eids |
30 |
30 |
31 class PyroRQLSource(AbstractSource): |
31 class PyroRQLSource(AbstractSource): |
32 """External repository source, using Pyro connection""" |
32 """External repository source, using Pyro connection""" |
33 |
33 |
34 # boolean telling if modification hooks should be called when something is |
34 # boolean telling if modification hooks should be called when something is |
35 # modified in this source |
35 # modified in this source |
36 should_call_hooks = False |
36 should_call_hooks = False |
37 # boolean telling if the repository should connect to this source during |
37 # boolean telling if the repository should connect to this source during |
38 # migration |
38 # migration |
39 connect_for_migration = False |
39 connect_for_migration = False |
40 |
40 |
41 support_entities = None |
41 support_entities = None |
42 |
42 |
43 options = ( |
43 options = ( |
44 # XXX pyro-ns host/port |
44 # XXX pyro-ns host/port |
45 ('pyro-ns-id', |
45 ('pyro-ns-id', |
46 {'type' : 'string', |
46 {'type' : 'string', |
47 'default': REQUIRED, |
47 'default': REQUIRED, |
99 'default': 5*60, |
99 'default': 5*60, |
100 'help': 'interval between synchronization with the external \ |
100 'help': 'interval between synchronization with the external \ |
101 repository (default to 5 minutes).', |
101 repository (default to 5 minutes).', |
102 'group': 'pyro-source', 'inputlevel': 2, |
102 'group': 'pyro-source', 'inputlevel': 2, |
103 }), |
103 }), |
104 |
104 |
105 ) |
105 ) |
106 |
106 |
107 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
107 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
108 _conn = None |
108 _conn = None |
109 |
109 |
125 self.config = source_config |
125 self.config = source_config |
126 myoptions = (('%s.latest-update-time' % self.uri, |
126 myoptions = (('%s.latest-update-time' % self.uri, |
127 {'type' : 'int', 'sitewide': True, |
127 {'type' : 'int', 'sitewide': True, |
128 'default': 0, |
128 'default': 0, |
129 'help': _('timestamp of the latest source synchronization.'), |
129 'help': _('timestamp of the latest source synchronization.'), |
130 'group': 'sources', |
130 'group': 'sources', |
131 }),) |
131 }),) |
132 register_persistent_options(myoptions) |
132 register_persistent_options(myoptions) |
133 self._query_cache = TimedCache(30) |
133 self._query_cache = TimedCache(30) |
134 |
134 |
135 def last_update_time(self): |
135 def last_update_time(self): |
152 session.close() |
152 session.close() |
153 |
153 |
154 def init(self): |
154 def init(self): |
155 """method called by the repository once ready to handle request""" |
155 """method called by the repository once ready to handle request""" |
156 interval = int(self.config.get('synchronization-interval', 5*60)) |
156 interval = int(self.config.get('synchronization-interval', 5*60)) |
157 self.repo.looping_task(interval, self.synchronize) |
157 self.repo.looping_task(interval, self.synchronize) |
158 self.repo.looping_task(self._query_cache.ttl.seconds/10, self._query_cache.clear_expired) |
158 self.repo.looping_task(self._query_cache.ttl.seconds/10, self._query_cache.clear_expired) |
159 |
159 |
160 def synchronize(self, mtime=None): |
160 def synchronize(self, mtime=None): |
161 """synchronize content known by this repository with content in the |
161 """synchronize content known by this repository with content in the |
162 external repository |
162 external repository |
163 """ |
163 """ |
167 etypes = self.support_entities.keys() |
167 etypes = self.support_entities.keys() |
168 if mtime is None: |
168 if mtime is None: |
169 mtime = self.last_update_time() |
169 mtime = self.last_update_time() |
170 updatetime, modified, deleted = extrepo.entities_modified_since(etypes, |
170 updatetime, modified, deleted = extrepo.entities_modified_since(etypes, |
171 mtime) |
171 mtime) |
|
172 self._query_cache.clear() |
172 repo = self.repo |
173 repo = self.repo |
173 session = repo.internal_session() |
174 session = repo.internal_session() |
174 try: |
175 try: |
175 for etype, extid in modified: |
176 for etype, extid in modified: |
176 try: |
177 try: |
199 {'k': u'sources.%s.latest-update-time' % self.uri, |
200 {'k': u'sources.%s.latest-update-time' % self.uri, |
200 'v': unicode(int(mktime(updatetime.timetuple())))}) |
201 'v': unicode(int(mktime(updatetime.timetuple())))}) |
201 session.commit() |
202 session.commit() |
202 finally: |
203 finally: |
203 session.close() |
204 session.close() |
204 |
205 |
205 def _get_connection(self): |
206 def _get_connection(self): |
206 """open and return a connection to the source""" |
207 """open and return a connection to the source""" |
207 nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host'] |
208 nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host'] |
208 nsport = self.config.get('pyro-ns-port') or self.repo.config['pyro-ns-port'] |
209 nsport = self.config.get('pyro-ns-port') or self.repo.config['pyro-ns-port'] |
209 nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group'] |
210 nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group'] |
219 return self._get_connection() |
220 return self._get_connection() |
220 except (ConnectionError, PyroError): |
221 except (ConnectionError, PyroError): |
221 self.critical("can't get connection to source %s", self.uri, |
222 self.critical("can't get connection to source %s", self.uri, |
222 exc_info=1) |
223 exc_info=1) |
223 return ConnectionWrapper() |
224 return ConnectionWrapper() |
224 |
225 |
225 def check_connection(self, cnx): |
226 def check_connection(self, cnx): |
226 """check connection validity, return None if the connection is still valid |
227 """check connection validity, return None if the connection is still valid |
227 else a new connection |
228 else a new connection |
228 """ |
229 """ |
229 # we have to transfer manually thread ownership. This can be done safely |
230 # we have to transfer manually thread ownership. This can be done safely |
240 return # ok |
241 return # ok |
241 except (BadConnectionId, ConnectionClosedError): |
242 except (BadConnectionId, ConnectionClosedError): |
242 pass |
243 pass |
243 # try to reconnect |
244 # try to reconnect |
244 return self.get_connection() |
245 return self.get_connection() |
245 |
246 |
246 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
247 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
247 varmap=None): |
248 varmap=None): |
248 #assert not varmap, (varmap, union) |
249 #assert not varmap, (varmap, union) |
249 rqlkey = union.as_string(kwargs=args) |
250 rqlkey = union.as_string(kwargs=args) |
250 try: |
251 try: |
251 results = self._query_cache[rqlkey] |
252 results = self._query_cache[rqlkey] |
252 except KeyError: |
253 except KeyError: |
253 results = self._syntax_tree_search(session, union, args) |
254 results = self._syntax_tree_search(session, union, args) |
254 self._query_cache[rqlkey] = results |
255 self._query_cache[rqlkey] = results |
255 return results |
256 return results |
256 |
257 |
257 def _syntax_tree_search(self, session, union, args): |
258 def _syntax_tree_search(self, session, union, args): |
258 """return result from this source for a rql query (actually from a rql |
259 """return result from this source for a rql query (actually from a rql |
259 syntax tree and a solution dictionary mapping each used variable to a |
260 syntax tree and a solution dictionary mapping each used variable to a |
260 possible type). If cachekey is given, the query necessary to fetch the |
261 possible type). If cachekey is given, the query necessary to fetch the |
261 results (but not the results themselves) may be cached using this key. |
262 results (but not the results themselves) may be cached using this key. |
262 """ |
263 """ |
263 if not args is None: |
264 if not args is None: |
264 args = args.copy() |
265 args = args.copy() |
328 kwargs = {'x': self.eid2extid(entity.eid, session)} |
329 kwargs = {'x': self.eid2extid(entity.eid, session)} |
329 for key, val in entity.iteritems(): |
330 for key, val in entity.iteritems(): |
330 relations.append('X %s %%(%s)s' % (key, key)) |
331 relations.append('X %s %%(%s)s' % (key, key)) |
331 kwargs[key] = val |
332 kwargs[key] = val |
332 return relations, kwargs |
333 return relations, kwargs |
333 |
334 |
334 def add_entity(self, session, entity): |
335 def add_entity(self, session, entity): |
335 """add a new entity to the source""" |
336 """add a new entity to the source""" |
336 raise NotImplementedError() |
337 raise NotImplementedError() |
337 |
338 |
338 def update_entity(self, session, entity): |
339 def update_entity(self, session, entity): |
339 """update an entity in the source""" |
340 """update an entity in the source""" |
340 relations, kwargs = self._entity_relations_and_kwargs(session, entity) |
341 relations, kwargs = self._entity_relations_and_kwargs(session, entity) |
341 cu = session.pool[self.uri] |
342 cu = session.pool[self.uri] |
342 cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), |
343 cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), |
352 """add a relation to the source""" |
353 """add a relation to the source""" |
353 cu = session.pool[self.uri] |
354 cu = session.pool[self.uri] |
354 cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
355 cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
355 {'x': self.eid2extid(subject, session), |
356 {'x': self.eid2extid(subject, session), |
356 'y': self.eid2extid(object, session)}, ('x', 'y')) |
357 'y': self.eid2extid(object, session)}, ('x', 'y')) |
357 |
358 |
358 def delete_relation(self, session, subject, rtype, object): |
359 def delete_relation(self, session, subject, rtype, object): |
359 """delete a relation from the source""" |
360 """delete a relation from the source""" |
360 cu = session.pool[self.uri] |
361 cu = session.pool[self.uri] |
361 cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
362 cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
362 {'x': self.eid2extid(subject, session), |
363 {'x': self.eid2extid(subject, session), |
366 class RQL2RQL(object): |
367 class RQL2RQL(object): |
367 """translate a local rql query to be executed on a distant repository""" |
368 """translate a local rql query to be executed on a distant repository""" |
368 def __init__(self, source): |
369 def __init__(self, source): |
369 self.source = source |
370 self.source = source |
370 self.current_operator = None |
371 self.current_operator = None |
371 |
372 |
372 def _accept_children(self, node): |
373 def _accept_children(self, node): |
373 res = [] |
374 res = [] |
374 for child in node.children: |
375 for child in node.children: |
375 rql = child.accept(self) |
376 rql = child.accept(self) |
376 if rql is not None: |
377 if rql is not None: |
377 res.append(rql) |
378 res.append(rql) |
378 return res |
379 return res |
379 |
380 |
380 def generate(self, session, rqlst, args): |
381 def generate(self, session, rqlst, args): |
381 self._session = session |
382 self._session = session |
382 self.kwargs = args |
383 self.kwargs = args |
383 self.cachekey = [] |
384 self.cachekey = [] |
384 self.need_translation = False |
385 self.need_translation = False |
385 return self.visit_union(rqlst), self.cachekey |
386 return self.visit_union(rqlst), self.cachekey |
386 |
387 |
387 def visit_union(self, node): |
388 def visit_union(self, node): |
388 s = self._accept_children(node) |
389 s = self._accept_children(node) |
389 if len(s) > 1: |
390 if len(s) > 1: |
390 return ' UNION '.join('(%s)' % q for q in s) |
391 return ' UNION '.join('(%s)' % q for q in s) |
391 return s[0] |
392 return s[0] |
392 |
393 |
393 def visit_select(self, node): |
394 def visit_select(self, node): |
394 """return the tree as an encoded rql string""" |
395 """return the tree as an encoded rql string""" |
395 self._varmaker = rqlvar_maker(defined=node.defined_vars.copy()) |
396 self._varmaker = rqlvar_maker(defined=node.defined_vars.copy()) |
396 self._const_var = {} |
397 self._const_var = {} |
397 if node.distinct: |
398 if node.distinct: |
414 nr = node.where.accept(self) |
415 nr = node.where.accept(self) |
415 if nr is not None: |
416 if nr is not None: |
416 restrictions.append(nr) |
417 restrictions.append(nr) |
417 if restrictions: |
418 if restrictions: |
418 s.append('WHERE %s' % ','.join(restrictions)) |
419 s.append('WHERE %s' % ','.join(restrictions)) |
419 |
420 |
420 if node.having: |
421 if node.having: |
421 s.append('HAVING %s' % ', '.join(term.accept(self) |
422 s.append('HAVING %s' % ', '.join(term.accept(self) |
422 for term in node.having)) |
423 for term in node.having)) |
423 subqueries = [] |
424 subqueries = [] |
424 for subquery in node.with_: |
425 for subquery in node.with_: |
425 subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases), |
426 subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases), |
426 self.visit_union(subquery.query))) |
427 self.visit_union(subquery.query))) |
427 if subqueries: |
428 if subqueries: |
428 s.append('WITH %s' % (','.join(subqueries))) |
429 s.append('WITH %s' % (','.join(subqueries))) |
429 return ' '.join(s) |
430 return ' '.join(s) |
430 |
431 |
431 def visit_and(self, node): |
432 def visit_and(self, node): |
432 res = self._accept_children(node) |
433 res = self._accept_children(node) |
433 if res: |
434 if res: |
434 return ', '.join(res) |
435 return ', '.join(res) |
435 return |
436 return |
436 |
437 |
437 def visit_or(self, node): |
438 def visit_or(self, node): |
438 res = self._accept_children(node) |
439 res = self._accept_children(node) |
439 if len(res) > 1: |
440 if len(res) > 1: |
440 return ' OR '.join('(%s)' % rql for rql in res) |
441 return ' OR '.join('(%s)' % rql for rql in res) |
441 elif res: |
442 elif res: |
442 return res[0] |
443 return res[0] |
443 return |
444 return |
444 |
445 |
445 def visit_not(self, node): |
446 def visit_not(self, node): |
446 rql = node.children[0].accept(self) |
447 rql = node.children[0].accept(self) |
447 if rql: |
448 if rql: |
448 return 'NOT (%s)' % rql |
449 return 'NOT (%s)' % rql |
449 return |
450 return |
450 |
451 |
451 def visit_exists(self, node): |
452 def visit_exists(self, node): |
452 return 'EXISTS(%s)' % node.children[0].accept(self) |
453 return 'EXISTS(%s)' % node.children[0].accept(self) |
453 |
454 |
454 def visit_relation(self, node): |
455 def visit_relation(self, node): |
455 try: |
456 try: |
456 if isinstance(node.children[0], Constant): |
457 if isinstance(node.children[0], Constant): |
457 # simplified rqlst, reintroduce eid relation |
458 # simplified rqlst, reintroduce eid relation |
458 try: |
459 try: |
495 if node.optional in ('right', 'both'): |
496 if node.optional in ('right', 'both'): |
496 rhs += '?' |
497 rhs += '?' |
497 if restr is not None: |
498 if restr is not None: |
498 return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr) |
499 return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr) |
499 return '%s %s %s' % (lhs, node.r_type, rhs) |
500 return '%s %s %s' % (lhs, node.r_type, rhs) |
500 |
501 |
501 def visit_comparison(self, node): |
502 def visit_comparison(self, node): |
502 if node.operator in ('=', 'IS'): |
503 if node.operator in ('=', 'IS'): |
503 return node.children[0].accept(self) |
504 return node.children[0].accept(self) |
504 return '%s %s' % (node.operator.encode(), |
505 return '%s %s' % (node.operator.encode(), |
505 node.children[0].accept(self)) |
506 node.children[0].accept(self)) |
506 |
507 |
507 def visit_mathexpression(self, node): |
508 def visit_mathexpression(self, node): |
508 return '(%s %s %s)' % (node.children[0].accept(self), |
509 return '(%s %s %s)' % (node.children[0].accept(self), |
509 node.operator.encode(), |
510 node.operator.encode(), |
510 node.children[1].accept(self)) |
511 node.children[1].accept(self)) |
511 |
512 |
512 def visit_function(self, node): |
513 def visit_function(self, node): |
513 #if node.name == 'IN': |
514 #if node.name == 'IN': |
514 res = [] |
515 res = [] |
515 for child in node.children: |
516 for child in node.children: |
516 try: |
517 try: |
519 continue |
520 continue |
520 res.append(rql) |
521 res.append(rql) |
521 if not res: |
522 if not res: |
522 raise ex |
523 raise ex |
523 return '%s(%s)' % (node.name, ', '.join(res)) |
524 return '%s(%s)' % (node.name, ', '.join(res)) |
524 |
525 |
525 def visit_constant(self, node): |
526 def visit_constant(self, node): |
526 if self.need_translation or node.uidtype: |
527 if self.need_translation or node.uidtype: |
527 if node.type == 'Int': |
528 if node.type == 'Int': |
528 return str(self.eid2extid(node.value)) |
529 return str(self.eid2extid(node.value)) |
529 if node.type == 'Substitute': |
530 if node.type == 'Substitute': |
556 self._const_var[value] = var |
557 self._const_var[value] = var |
557 return restr, var |
558 return restr, var |
558 |
559 |
559 def eid2extid(self, eid): |
560 def eid2extid(self, eid): |
560 try: |
561 try: |
561 return self.source.eid2extid(eid, self._session) |
562 return self.source.eid2extid(eid, self._session) |
562 except UnknownEid: |
563 except UnknownEid: |
563 operator = self.current_operator |
564 operator = self.current_operator |
564 if operator is not None and operator != '=': |
565 if operator is not None and operator != '=': |
565 # deal with query like X eid > 12 |
566 # deal with query like X eid > 12 |
566 # |
567 # |