|
1 """Source to query another RQL repository using pyro |
|
2 |
|
3 :organization: Logilab |
|
4 :copyright: 2007-2008 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
5 :contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
6 """ |
|
7 __docformat__ = "restructuredtext en" |
|
8 |
|
9 import threading |
|
10 from os.path import join |
|
11 |
|
12 from mx.DateTime import DateTimeFromTicks |
|
13 |
|
14 from Pyro.errors import PyroError, ConnectionClosedError |
|
15 |
|
16 from logilab.common.configuration import REQUIRED |
|
17 |
|
18 from rql.nodes import Constant |
|
19 from rql.utils import rqlvar_maker |
|
20 |
|
21 from cubicweb import dbapi, server |
|
22 from cubicweb import BadConnectionId, UnknownEid, ConnectionError |
|
23 from cubicweb.cwconfig import register_persistent_options |
|
24 from cubicweb.server.sources import AbstractSource, ConnectionWrapper |
|
25 |
|
26 class ReplaceByInOperator: |
|
27 def __init__(self, eids): |
|
28 self.eids = eids |
|
29 |
|
30 class PyroRQLSource(AbstractSource): |
|
31 """External repository source, using Pyro connection""" |
|
32 |
|
33 # boolean telling if modification hooks should be called when something is |
|
34 # modified in this source |
|
35 should_call_hooks = False |
|
36 # boolean telling if the repository should connect to this source during |
|
37 # migration |
|
38 connect_for_migration = False |
|
39 |
|
40 support_entities = None |
|
41 |
|
42 options = ( |
|
43 # XXX pyro-ns host/port |
|
44 ('pyro-ns-id', |
|
45 {'type' : 'string', |
|
46 'default': REQUIRED, |
|
47 'help': 'identifier of the repository in the pyro name server', |
|
48 'group': 'pyro-source', 'inputlevel': 0, |
|
49 }), |
|
50 ('mapping-file', |
|
51 {'type' : 'string', |
|
52 'default': REQUIRED, |
|
53 'help': 'path to a python file with the schema mapping definition', |
|
54 'group': 'pyro-source', 'inputlevel': 1, |
|
55 }), |
|
56 ('cubicweb-user', |
|
57 {'type' : 'string', |
|
58 'default': REQUIRED, |
|
59 'help': 'user to use for connection on the distant repository', |
|
60 'group': 'pyro-source', 'inputlevel': 0, |
|
61 }), |
|
62 ('cubicweb-password', |
|
63 {'type' : 'password', |
|
64 'default': '', |
|
65 'help': 'user to use for connection on the distant repository', |
|
66 'group': 'pyro-source', 'inputlevel': 0, |
|
67 }), |
|
68 ('base-url', |
|
69 {'type' : 'string', |
|
70 'default': '', |
|
71 'help': 'url of the web site for the distant repository, if you want ' |
|
72 'to generate external link to entities from this repository', |
|
73 'group': 'pyro-source', 'inputlevel': 1, |
|
74 }), |
|
75 ('pyro-ns-host', |
|
76 {'type' : 'string', |
|
77 'default': None, |
|
78 'help': 'Pyro name server\'s host. If not set, default to the value \ |
|
79 from all_in_one.conf.', |
|
80 'group': 'pyro-source', 'inputlevel': 1, |
|
81 }), |
|
82 ('pyro-ns-port', |
|
83 {'type' : 'int', |
|
84 'default': None, |
|
85 'help': 'Pyro name server\'s listening port. If not set, default to \ |
|
86 the value from all_in_one.conf.', |
|
87 'group': 'pyro-source', 'inputlevel': 1, |
|
88 }), |
|
89 ('pyro-ns-group', |
|
90 {'type' : 'string', |
|
91 'default': None, |
|
92 'help': 'Pyro name server\'s group where the repository will be \ |
|
93 registered. If not set, default to the value from all_in_one.conf.', |
|
94 'group': 'pyro-source', 'inputlevel': 1, |
|
95 }), |
|
96 ('synchronization-interval', |
|
97 {'type' : 'int', |
|
98 'default': 5*60, |
|
99 'help': 'interval between synchronization with the external \ |
|
100 repository (default to 5 minutes).', |
|
101 'group': 'pyro-source', 'inputlevel': 2, |
|
102 }), |
|
103 |
|
104 ) |
|
105 |
|
106 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
|
107 _conn = None |
|
108 |
|
109 def __init__(self, repo, appschema, source_config, *args, **kwargs): |
|
110 AbstractSource.__init__(self, repo, appschema, source_config, |
|
111 *args, **kwargs) |
|
112 mappingfile = source_config['mapping-file'] |
|
113 if not mappingfile[0] == '/': |
|
114 mappingfile = join(repo.config.apphome, mappingfile) |
|
115 mapping = {} |
|
116 execfile(mappingfile, mapping) |
|
117 self.support_entities = mapping['support_entities'] |
|
118 self.support_relations = mapping.get('support_relations', {}) |
|
119 self.dont_cross_relations = mapping.get('dont_cross_relations', ()) |
|
120 baseurl = source_config.get('base-url') |
|
121 if baseurl and not baseurl.endswith('/'): |
|
122 source_config['base-url'] += '/' |
|
123 self.config = source_config |
|
124 myoptions = (('%s.latest-update-time' % self.uri, |
|
125 {'type' : 'int', 'sitewide': True, |
|
126 'default': 0, |
|
127 'help': _('timestamp of the latest source synchronization.'), |
|
128 'group': 'sources', |
|
129 }),) |
|
130 register_persistent_options(myoptions) |
|
131 |
|
132 def last_update_time(self): |
|
133 pkey = u'sources.%s.latest-update-time' % self.uri |
|
134 rql = 'Any V WHERE X is EProperty, X value V, X pkey %(k)s' |
|
135 session = self.repo.internal_session() |
|
136 try: |
|
137 rset = session.execute(rql, {'k': pkey}) |
|
138 if not rset: |
|
139 # insert it |
|
140 session.execute('INSERT EProperty X: X pkey %(k)s, X value %(v)s', |
|
141 {'k': pkey, 'v': u'0'}) |
|
142 session.commit() |
|
143 timestamp = 0 |
|
144 else: |
|
145 assert len(rset) == 1 |
|
146 timestamp = int(rset[0][0]) |
|
147 return DateTimeFromTicks(timestamp) |
|
148 finally: |
|
149 session.close() |
|
150 |
|
151 def init(self): |
|
152 """method called by the repository once ready to handle request""" |
|
153 interval = int(self.config.get('synchronization-interval', 5*60)) |
|
154 self.repo.looping_task(interval, self.synchronize) |
|
155 |
|
156 def synchronize(self, mtime=None): |
|
157 """synchronize content known by this repository with content in the |
|
158 external repository |
|
159 """ |
|
160 self.info('synchronizing pyro source %s', self.uri) |
|
161 extrepo = self.get_connection()._repo |
|
162 etypes = self.support_entities.keys() |
|
163 if mtime is None: |
|
164 mtime = self.last_update_time() |
|
165 updatetime, modified, deleted = extrepo.entities_modified_since(etypes, |
|
166 mtime) |
|
167 repo = self.repo |
|
168 session = repo.internal_session() |
|
169 try: |
|
170 for etype, extid in modified: |
|
171 try: |
|
172 eid = self.extid2eid(extid, etype, session) |
|
173 rset = session.eid_rset(eid, etype) |
|
174 entity = rset.get_entity(0, 0) |
|
175 entity.complete(entity.e_schema.indexable_attributes()) |
|
176 repo.index_entity(session, entity) |
|
177 except: |
|
178 self.exception('while updating %s with external id %s of source %s', |
|
179 etype, extid, self.uri) |
|
180 continue |
|
181 for etype, extid in deleted: |
|
182 try: |
|
183 eid = self.extid2eid(extid, etype, session, insert=False) |
|
184 # entity has been deleted from external repository but is not known here |
|
185 if eid is not None: |
|
186 repo.delete_info(session, eid) |
|
187 except: |
|
188 self.exception('while updating %s with external id %s of source %s', |
|
189 etype, extid, self.uri) |
|
190 continue |
|
191 session.execute('SET X value %(v)s WHERE X pkey %(k)s', |
|
192 {'k': u'sources.%s.latest-update-time' % self.uri, |
|
193 'v': unicode(int(updatetime.ticks()))}) |
|
194 session.commit() |
|
195 finally: |
|
196 session.close() |
|
197 |
|
198 def _get_connection(self): |
|
199 """open and return a connection to the source""" |
|
200 nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host'] |
|
201 nsport = self.config.get('pyro-ns-port') or self.repo.config['pyro-ns-port'] |
|
202 nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group'] |
|
203 #cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type']) |
|
204 return dbapi.connect(database=self.config['pyro-ns-id'], |
|
205 user=self.config['cubicweb-user'], |
|
206 password=self.config['cubicweb-password'], |
|
207 host=nshost, port=nsport, group=nsgroup, |
|
208 setvreg=False) #cnxprops=cnxprops) |
|
209 |
|
210 def get_connection(self): |
|
211 try: |
|
212 return self._get_connection() |
|
213 except (ConnectionError, PyroError): |
|
214 self.critical("can't get connection to source %s", self.uri, |
|
215 exc_info=1) |
|
216 return ConnectionWrapper() |
|
217 |
|
218 def check_connection(self, cnx): |
|
219 """check connection validity, return None if the connection is still valid |
|
220 else a new connection |
|
221 """ |
|
222 # we have to transfer manually thread ownership. This can be done safely |
|
223 # since the pool to which belong the connection is affected to one |
|
224 # session/thread and can't be called simultaneously |
|
225 try: |
|
226 cnx._repo._transferThread(threading.currentThread()) |
|
227 except AttributeError: |
|
228 # inmemory connection |
|
229 pass |
|
230 if not isinstance(cnx, ConnectionWrapper): |
|
231 try: |
|
232 cnx.check() |
|
233 return # ok |
|
234 except (BadConnectionId, ConnectionClosedError): |
|
235 pass |
|
236 # try to reconnect |
|
237 return self.get_connection() |
|
238 |
|
239 |
|
240 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
|
241 varmap=None): |
|
242 """return result from this source for a rql query (actually from a rql |
|
243 syntax tree and a solution dictionary mapping each used variable to a |
|
244 possible type). If cachekey is given, the query necessary to fetch the |
|
245 results (but not the results themselves) may be cached using this key. |
|
246 """ |
|
247 if not args is None: |
|
248 args = args.copy() |
|
249 if server.DEBUG: |
|
250 print 'RQL FOR PYRO SOURCE', self.uri |
|
251 print union.as_string() |
|
252 if args: print 'ARGS', args |
|
253 print 'SOLUTIONS', ','.join(str(s.solutions) for s in union.children) |
|
254 # get cached cursor anyway |
|
255 cu = session.pool[self.uri] |
|
256 if cu is None: |
|
257 # this is a ConnectionWrapper instance |
|
258 msg = session._("can't connect to source %s, some data may be missing") |
|
259 session.set_shared_data('sources_error', msg % self.uri) |
|
260 return [] |
|
261 try: |
|
262 rql, cachekey = RQL2RQL(self).generate(session, union, args) |
|
263 except UnknownEid, ex: |
|
264 if server.DEBUG: |
|
265 print 'unknown eid', ex, 'no results' |
|
266 return [] |
|
267 if server.DEBUG: |
|
268 print 'TRANSLATED RQL', rql |
|
269 try: |
|
270 rset = cu.execute(rql, args, cachekey) |
|
271 except Exception, ex: |
|
272 self.exception(str(ex)) |
|
273 msg = session._("error while querying source %s, some data may be missing") |
|
274 session.set_shared_data('sources_error', msg % self.uri) |
|
275 return [] |
|
276 descr = rset.description |
|
277 if rset: |
|
278 needtranslation = [] |
|
279 for i, etype in enumerate(descr[0]): |
|
280 if (etype is None or not self.schema.eschema(etype).is_final() or |
|
281 getattr(union.locate_subquery(i, etype, args).selection[i], 'uidtype', None)): |
|
282 needtranslation.append(i) |
|
283 if needtranslation: |
|
284 for rowindex, row in enumerate(rset): |
|
285 for colindex in needtranslation: |
|
286 if row[colindex] is not None: # optional variable |
|
287 etype = descr[rowindex][colindex] |
|
288 eid = self.extid2eid(row[colindex], etype, session) |
|
289 row[colindex] = eid |
|
290 results = rset.rows |
|
291 else: |
|
292 results = [] |
|
293 if server.DEBUG: |
|
294 if len(results)>10: |
|
295 print '--------------->', results[:10], '...', len(results) |
|
296 else: |
|
297 print '--------------->', results |
|
298 return results |
|
299 |
|
300 def _entity_relations_and_kwargs(self, session, entity): |
|
301 relations = [] |
|
302 kwargs = {'x': self.eid2extid(entity.eid, session)} |
|
303 for key, val in entity.iteritems(): |
|
304 relations.append('X %s %%(%s)s' % (key, key)) |
|
305 kwargs[key] = val |
|
306 return relations, kwargs |
|
307 |
|
308 def add_entity(self, session, entity): |
|
309 """add a new entity to the source""" |
|
310 raise NotImplementedError() |
|
311 |
|
312 def update_entity(self, session, entity): |
|
313 """update an entity in the source""" |
|
314 relations, kwargs = self._entity_relations_and_kwargs(session, entity) |
|
315 cu = session.pool[self.uri] |
|
316 cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), |
|
317 kwargs, 'x') |
|
318 |
|
319 def delete_entity(self, session, etype, eid): |
|
320 """delete an entity from the source""" |
|
321 cu = session.pool[self.uri] |
|
322 cu.execute('DELETE %s X WHERE X eid %%(x)s' % etype, |
|
323 {'x': self.eid2extid(eid, session)}, 'x') |
|
324 |
|
325 def add_relation(self, session, subject, rtype, object): |
|
326 """add a relation to the source""" |
|
327 cu = session.pool[self.uri] |
|
328 cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
|
329 {'x': self.eid2extid(subject, session), |
|
330 'y': self.eid2extid(object, session)}, ('x', 'y')) |
|
331 |
|
332 def delete_relation(self, session, subject, rtype, object): |
|
333 """delete a relation from the source""" |
|
334 cu = session.pool[self.uri] |
|
335 cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
|
336 {'x': self.eid2extid(subject, session), |
|
337 'y': self.eid2extid(object, session)}, ('x', 'y')) |
|
338 |
|
339 |
|
340 class RQL2RQL(object): |
|
341 """translate a local rql query to be executed on a distant repository""" |
|
342 def __init__(self, source): |
|
343 self.source = source |
|
344 |
|
345 def _accept_children(self, node): |
|
346 res = [] |
|
347 for child in node.children: |
|
348 rql = child.accept(self) |
|
349 if rql is not None: |
|
350 res.append(rql) |
|
351 return res |
|
352 |
|
353 def generate(self, session, rqlst, args): |
|
354 self._session = session |
|
355 self.kwargs = args |
|
356 self.cachekey = [] |
|
357 self.need_translation = False |
|
358 return self.visit_union(rqlst), self.cachekey |
|
359 |
|
360 def visit_union(self, node): |
|
361 s = self._accept_children(node) |
|
362 if len(s) > 1: |
|
363 return ' UNION '.join('(%s)' % q for q in s) |
|
364 return s[0] |
|
365 |
|
366 def visit_select(self, node): |
|
367 """return the tree as an encoded rql string""" |
|
368 self._varmaker = rqlvar_maker(defined=node.defined_vars.copy()) |
|
369 self._const_var = {} |
|
370 if node.distinct: |
|
371 base = 'DISTINCT Any' |
|
372 else: |
|
373 base = 'Any' |
|
374 s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))] |
|
375 if node.groupby: |
|
376 s.append('GROUPBY %s' % ', '.join(group.accept(self) |
|
377 for group in node.groupby)) |
|
378 if node.orderby: |
|
379 s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term) |
|
380 for term in node.orderby)) |
|
381 if node.limit is not None: |
|
382 s.append('LIMIT %s' % node.limit) |
|
383 if node.offset: |
|
384 s.append('OFFSET %s' % node.offset) |
|
385 restrictions = [] |
|
386 if node.where is not None: |
|
387 nr = node.where.accept(self) |
|
388 if nr is not None: |
|
389 restrictions.append(nr) |
|
390 if restrictions: |
|
391 s.append('WHERE %s' % ','.join(restrictions)) |
|
392 |
|
393 if node.having: |
|
394 s.append('HAVING %s' % ', '.join(term.accept(self) |
|
395 for term in node.having)) |
|
396 subqueries = [] |
|
397 for subquery in node.with_: |
|
398 subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases), |
|
399 self.visit_union(subquery.query))) |
|
400 if subqueries: |
|
401 s.append('WITH %s' % (','.join(subqueries))) |
|
402 return ' '.join(s) |
|
403 |
|
404 def visit_and(self, node): |
|
405 res = self._accept_children(node) |
|
406 if res: |
|
407 return ', '.join(res) |
|
408 return |
|
409 |
|
410 def visit_or(self, node): |
|
411 res = self._accept_children(node) |
|
412 if len(res) > 1: |
|
413 return ' OR '.join('(%s)' % rql for rql in res) |
|
414 elif res: |
|
415 return res[0] |
|
416 return |
|
417 |
|
418 def visit_not(self, node): |
|
419 rql = node.children[0].accept(self) |
|
420 if rql: |
|
421 return 'NOT (%s)' % rql |
|
422 return |
|
423 |
|
424 def visit_exists(self, node): |
|
425 return 'EXISTS(%s)' % node.children[0].accept(self) |
|
426 |
|
427 def visit_relation(self, node): |
|
428 try: |
|
429 if isinstance(node.children[0], Constant): |
|
430 # simplified rqlst, reintroduce eid relation |
|
431 restr, lhs = self.process_eid_const(node.children[0]) |
|
432 else: |
|
433 lhs = node.children[0].accept(self) |
|
434 restr = None |
|
435 except UnknownEid: |
|
436 # can safely skip not relation with an unsupported eid |
|
437 if node.neged(strict=True): |
|
438 return |
|
439 # XXX what about optional relation or outer NOT EXISTS() |
|
440 raise |
|
441 if node.optional in ('left', 'both'): |
|
442 lhs += '?' |
|
443 if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).is_final(): |
|
444 self.need_translation = True |
|
445 self.current_operator = node.operator() |
|
446 if isinstance(node.children[0], Constant): |
|
447 self.current_etypes = (node.children[0].uidtype,) |
|
448 else: |
|
449 self.current_etypes = node.children[0].variable.stinfo['possibletypes'] |
|
450 try: |
|
451 rhs = node.children[1].accept(self) |
|
452 except UnknownEid: |
|
453 # can safely skip not relation with an unsupported eid |
|
454 if node.neged(strict=True): |
|
455 return |
|
456 # XXX what about optional relation or outer NOT EXISTS() |
|
457 raise |
|
458 except ReplaceByInOperator, ex: |
|
459 rhs = 'IN (%s)' % ','.join(str(eid) for eid in ex.eids) |
|
460 self.need_translation = False |
|
461 self.current_operator = None |
|
462 if node.optional in ('right', 'both'): |
|
463 rhs += '?' |
|
464 if restr is not None: |
|
465 return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr) |
|
466 return '%s %s %s' % (lhs, node.r_type, rhs) |
|
467 |
|
468 def visit_comparison(self, node): |
|
469 if node.operator in ('=', 'IS'): |
|
470 return node.children[0].accept(self) |
|
471 return '%s %s' % (node.operator.encode(), |
|
472 node.children[0].accept(self)) |
|
473 |
|
474 def visit_mathexpression(self, node): |
|
475 return '(%s %s %s)' % (node.children[0].accept(self), |
|
476 node.operator.encode(), |
|
477 node.children[1].accept(self)) |
|
478 |
|
479 def visit_function(self, node): |
|
480 #if node.name == 'IN': |
|
481 res = [] |
|
482 for child in node.children: |
|
483 try: |
|
484 rql = child.accept(self) |
|
485 except UnknownEid, ex: |
|
486 continue |
|
487 res.append(rql) |
|
488 if not res: |
|
489 raise ex |
|
490 return '%s(%s)' % (node.name, ', '.join(res)) |
|
491 |
|
492 def visit_constant(self, node): |
|
493 if self.need_translation or node.uidtype: |
|
494 if node.type == 'Int': |
|
495 return str(self.eid2extid(node.value)) |
|
496 if node.type == 'Substitute': |
|
497 key = node.value |
|
498 # ensure we have not yet translated the value... |
|
499 if not key in self._const_var: |
|
500 self.kwargs[key] = self.eid2extid(self.kwargs[key]) |
|
501 self.cachekey.append(key) |
|
502 self._const_var[key] = None |
|
503 return node.as_string() |
|
504 |
|
505 def visit_variableref(self, node): |
|
506 """get the sql name for a variable reference""" |
|
507 return node.name |
|
508 |
|
509 def visit_sortterm(self, node): |
|
510 if node.asc: |
|
511 return node.term.accept(self) |
|
512 return '%s DESC' % node.term.accept(self) |
|
513 |
|
514 def process_eid_const(self, const): |
|
515 value = const.eval(self.kwargs) |
|
516 try: |
|
517 return None, self._const_var[value] |
|
518 except: |
|
519 var = self._varmaker.next() |
|
520 self.need_translation = True |
|
521 restr = '%s eid %s' % (var, self.visit_constant(const)) |
|
522 self.need_translation = False |
|
523 self._const_var[value] = var |
|
524 return restr, var |
|
525 |
|
526 def eid2extid(self, eid): |
|
527 try: |
|
528 return self.source.eid2extid(eid, self._session) |
|
529 except UnknownEid: |
|
530 operator = self.current_operator |
|
531 if operator is not None and operator != '=': |
|
532 # deal with query like X eid > 12 |
|
533 # |
|
534 # The problem is |
|
535 # that eid order in the external source may differ from the |
|
536 # local source |
|
537 # |
|
538 # So search for all eids from this |
|
539 # source matching the condition locally and then to replace the |
|
540 # > 12 branch by IN (eids) (XXX we may have to insert a huge |
|
541 # number of eids...) |
|
542 # planner so that |
|
543 sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s" |
|
544 etypes = ','.join("'%s'" % etype for etype in self.current_etypes) |
|
545 cu = self._session.system_sql(sql % (self.source.uri, etypes, |
|
546 operator, eid)) |
|
547 # XXX buggy cu.rowcount which may be zero while there are some |
|
548 # results |
|
549 rows = cu.fetchall() |
|
550 if rows: |
|
551 raise ReplaceByInOperator((r[0] for r in rows)) |
|
552 raise |
|
553 |