|
1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """Source to query another RQL remote repository""" |
|
19 |
|
20 __docformat__ = "restructuredtext en" |
|
21 _ = unicode |
|
22 |
|
23 from os.path import join |
|
24 from base64 import b64decode |
|
25 |
|
26 from logilab.common.configuration import REQUIRED |
|
27 |
|
28 from yams.schema import role_name |
|
29 |
|
30 from rql.nodes import Constant |
|
31 from rql.utils import rqlvar_maker |
|
32 |
|
33 from cubicweb import dbapi, server |
|
34 from cubicweb import ValidationError, BadConnectionId, UnknownEid |
|
35 from cubicweb.schema import VIRTUAL_RTYPES |
|
36 from cubicweb.server.sources import (AbstractSource, ConnectionWrapper, |
|
37 TimedCache, dbg_st_search, dbg_results) |
|
38 from cubicweb.server.msplanner import neged_relation |
|
39 |
|
40 def uidtype(union, col, etype, args): |
|
41 select, col = union.locate_subquery(col, etype, args) |
|
42 return getattr(select.selection[col], 'uidtype', None) |
|
43 |
|
44 |
|
45 class ReplaceByInOperator(Exception): |
|
46 def __init__(self, eids): |
|
47 self.eids = eids |
|
48 |
|
49 class RemoteSource(AbstractSource): |
|
50 """Generic external repository source""" |
|
51 |
|
52 CNX_TYPE = None # Must be ovewritted ! |
|
53 |
|
54 # boolean telling if modification hooks should be called when something is |
|
55 # modified in this source |
|
56 should_call_hooks = False |
|
57 # boolean telling if the repository should connect to this source during |
|
58 # migration |
|
59 connect_for_migration = False |
|
60 |
|
61 options = ( |
|
62 |
|
63 ('cubicweb-user', |
|
64 {'type' : 'string', |
|
65 'default': REQUIRED, |
|
66 'help': 'user to use for connection on the distant repository', |
|
67 'group': 'remote-source', 'level': 0, |
|
68 }), |
|
69 ('cubicweb-password', |
|
70 {'type' : 'password', |
|
71 'default': '', |
|
72 'help': 'user to use for connection on the distant repository', |
|
73 'group': 'remote-source', 'level': 0, |
|
74 }), |
|
75 ('base-url', |
|
76 {'type' : 'string', |
|
77 'default': '', |
|
78 'help': 'url of the web site for the distant repository, if you want ' |
|
79 'to generate external link to entities from this repository', |
|
80 'group': 'remote-source', 'level': 1, |
|
81 }), |
|
82 ('skip-external-entities', |
|
83 {'type' : 'yn', |
|
84 'default': False, |
|
85 'help': 'should entities not local to the source be considered or not', |
|
86 'group': 'remote-source', 'level': 0, |
|
87 }), |
|
88 ('synchronization-interval', |
|
89 {'type' : 'time', |
|
90 'default': '5min', |
|
91 'help': 'interval between synchronization with the external \ |
|
92 repository (default to 5 minutes).', |
|
93 'group': 'remote-source', 'level': 2, |
|
94 })) |
|
95 |
|
96 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
|
97 |
|
98 _conn = None |
|
99 |
|
100 def __init__(self, repo, source_config, eid=None): |
|
101 super(AbstractSource, self).__init__(repo, source_config, eid) |
|
102 self.update_config(None, self.check_conf_dict(eid, source_config, |
|
103 fail_if_unknown=False)) |
|
104 self._query_cache = TimedCache(1800) |
|
105 |
|
106 def update_config(self, source_entity, processed_config): |
|
107 """update configuration from source entity""" |
|
108 baseurl = processed_config.get('base-url') |
|
109 if baseurl and not baseurl.endswith('/'): |
|
110 processed_config['base-url'] += '/' |
|
111 self.config = processed_config |
|
112 self._skip_externals = processed_config['skip-external-entities'] |
|
113 if source_entity is not None: |
|
114 self.latest_retrieval = source_entity.latest_retrieval |
|
115 |
|
116 def _get_connection(self): |
|
117 """open and return a connection to the source""" |
|
118 self.info('connecting to source %(base-url)s with user %(cubicweb-user)s', |
|
119 self.config) |
|
120 cnxprops = ConnectionProperties(cnxtype=self.CNX_TYPE) |
|
121 return dbapi.connect(login=self.config['cubicweb-user'], |
|
122 password=self.config['cubicweb-password'], |
|
123 cnxprops=cnxprops) |
|
124 |
|
125 def get_connection(self): |
|
126 try: |
|
127 return self._get_connection() |
|
128 except ConnectionError, ex: |
|
129 self.critical("can't get connection to source %s: %s", self.uri, ex) |
|
130 return ConnectionWrapper() |
|
131 |
|
132 |
|
133 def reset_caches(self): |
|
134 """method called during test to reset potential source caches""" |
|
135 self._query_cache = TimedCache(1800) |
|
136 |
|
137 def init(self, activated, source_entity): |
|
138 """method called by the repository once ready to handle request""" |
|
139 self.load_mapping(source_entity._cw) |
|
140 if activated: |
|
141 interval = self.config['synchronization-interval'] |
|
142 self.repo.looping_task(interval, self.synchronize) |
|
143 self.repo.looping_task(self._query_cache.ttl.seconds/10, |
|
144 self._query_cache.clear_expired) |
|
145 self.latest_retrieval = source_entity.latest_retrieval |
|
146 |
|
147 def load_mapping(self, session=None): |
|
148 self.support_entities = {} |
|
149 self.support_relations = {} |
|
150 self.dont_cross_relations = set(('owned_by', 'created_by')) |
|
151 self.cross_relations = set() |
|
152 assert self.eid is not None |
|
153 self._schemacfg_idx = {} |
|
154 self._load_mapping(session) |
|
155 |
|
156 etype_options = set(('write',)) |
|
157 rtype_options = set(('maycross', 'dontcross', 'write',)) |
|
158 |
|
159 def _check_options(self, schemacfg, allowedoptions): |
|
160 if schemacfg.options: |
|
161 options = set(w.strip() for w in schemacfg.options.split(':')) |
|
162 else: |
|
163 options = set() |
|
164 if options - allowedoptions: |
|
165 options = ', '.join(sorted(options - allowedoptions)) |
|
166 msg = _('unknown option(s): %s' % options) |
|
167 raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg}) |
|
168 return options |
|
169 |
|
170 def add_schema_config(self, schemacfg, checkonly=False): |
|
171 """added CWSourceSchemaConfig, modify mapping accordingly""" |
|
172 try: |
|
173 ertype = schemacfg.schema.name |
|
174 except AttributeError: |
|
175 msg = schemacfg._cw._("attribute/relation can't be mapped, only " |
|
176 "entity and relation types") |
|
177 raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg}) |
|
178 if schemacfg.schema.__regid__ == 'CWEType': |
|
179 options = self._check_options(schemacfg, self.etype_options) |
|
180 if not checkonly: |
|
181 self.support_entities[ertype] = 'write' in options |
|
182 else: # CWRType |
|
183 if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES: |
|
184 msg = schemacfg._cw._('%s relation should not be in mapped') % ertype |
|
185 raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg}) |
|
186 options = self._check_options(schemacfg, self.rtype_options) |
|
187 if 'dontcross' in options: |
|
188 if 'maycross' in options: |
|
189 msg = schemacfg._("can't mix dontcross and maycross options") |
|
190 raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg}) |
|
191 if 'write' in options: |
|
192 msg = schemacfg._("can't mix dontcross and write options") |
|
193 raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg}) |
|
194 if not checkonly: |
|
195 self.dont_cross_relations.add(ertype) |
|
196 elif not checkonly: |
|
197 self.support_relations[ertype] = 'write' in options |
|
198 if 'maycross' in options: |
|
199 self.cross_relations.add(ertype) |
|
200 if not checkonly: |
|
201 # add to an index to ease deletion handling |
|
202 self._schemacfg_idx[schemacfg.eid] = ertype |
|
203 |
|
204 def del_schema_config(self, schemacfg, checkonly=False): |
|
205 """deleted CWSourceSchemaConfig, modify mapping accordingly""" |
|
206 if checkonly: |
|
207 return |
|
208 try: |
|
209 ertype = self._schemacfg_idx[schemacfg.eid] |
|
210 if ertype[0].isupper(): |
|
211 del self.support_entities[ertype] |
|
212 else: |
|
213 if ertype in self.support_relations: |
|
214 del self.support_relations[ertype] |
|
215 if ertype in self.cross_relations: |
|
216 self.cross_relations.remove(ertype) |
|
217 else: |
|
218 self.dont_cross_relations.remove(ertype) |
|
219 except Exception: |
|
220 self.error('while updating mapping consequently to removal of %s', |
|
221 schemacfg) |
|
222 |
|
223 def local_eid(self, cnx, extid, session): |
|
224 etype, dexturi, dextid = cnx.describe(extid) |
|
225 if dexturi == 'system' or not ( |
|
226 dexturi in self.repo.sources_by_uri or self._skip_externals): |
|
227 assert etype in self.support_entities, etype |
|
228 eid = self.repo.extid2eid(self, str(extid), etype, session) |
|
229 if eid > 0: |
|
230 return eid, True |
|
231 elif dexturi in self.repo.sources_by_uri: |
|
232 source = self.repo.sources_by_uri[dexturi] |
|
233 cnx = session.cnxset.connection(source.uri) |
|
234 eid = source.local_eid(cnx, dextid, session)[0] |
|
235 return eid, False |
|
236 return None, None |
|
237 |
|
238 def synchronize(self, mtime=None): |
|
239 """synchronize content known by this repository with content in the |
|
240 external repository |
|
241 """ |
|
242 self.info('synchronizing remote %s source %s', (self.CNX_TYPE, self.uri)) |
|
243 cnx = self.get_connection() |
|
244 try: |
|
245 extrepo = cnx._repo |
|
246 except AttributeError: |
|
247 # fake connection wrapper returned when we can't connect to the |
|
248 # external source (hence we've no chance to synchronize...) |
|
249 return |
|
250 etypes = self.support_entities.keys() |
|
251 if mtime is None: |
|
252 mtime = self.latest_retrieval |
|
253 updatetime, modified, deleted = extrepo.entities_modified_since( |
|
254 etypes, mtime) |
|
255 self._query_cache.clear() |
|
256 repo = self.repo |
|
257 session = repo.internal_session() |
|
258 source = repo.system_source |
|
259 try: |
|
260 for etype, extid in modified: |
|
261 try: |
|
262 eid = self.local_eid(cnx, extid, session)[0] |
|
263 if eid is not None: |
|
264 rset = session.eid_rset(eid, etype) |
|
265 entity = rset.get_entity(0, 0) |
|
266 entity.complete(entity.e_schema.indexable_attributes()) |
|
267 source.index_entity(session, entity) |
|
268 except Exception: |
|
269 self.exception('while updating %s with external id %s of source %s', |
|
270 etype, extid, self.uri) |
|
271 continue |
|
272 for etype, extid in deleted: |
|
273 try: |
|
274 eid = self.repo.extid2eid(self, str(extid), etype, session, |
|
275 insert=False) |
|
276 # entity has been deleted from external repository but is not known here |
|
277 if eid is not None: |
|
278 entity = session.entity_from_eid(eid, etype) |
|
279 repo.delete_info(session, entity, self.uri, |
|
280 scleanup=self.eid) |
|
281 except Exception: |
|
282 if self.repo.config.mode == 'test': |
|
283 raise |
|
284 self.exception('while updating %s with external id %s of source %s', |
|
285 etype, extid, self.uri) |
|
286 continue |
|
287 self.latest_retrieval = updatetime |
|
288 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
289 {'x': self.eid, 'date': self.latest_retrieval}) |
|
290 session.commit() |
|
291 finally: |
|
292 session.close() |
|
293 |
|
294 def get_connection(self): |
|
295 raise NotImplementedError() |
|
296 |
|
297 def check_connection(self, cnx): |
|
298 """check connection validity, return None if the connection is still valid |
|
299 else a new connection |
|
300 """ |
|
301 if not isinstance(cnx, ConnectionWrapper): |
|
302 try: |
|
303 cnx.check() |
|
304 return # ok |
|
305 except (BadConnectionId, ConnectionClosedError): |
|
306 pass |
|
307 # try to reconnect |
|
308 return self.get_connection() |
|
309 |
|
310 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
|
311 varmap=None): |
|
312 assert dbg_st_search(self.uri, union, varmap, args, cachekey) |
|
313 rqlkey = union.as_string(kwargs=args) |
|
314 try: |
|
315 results = self._query_cache[rqlkey] |
|
316 except KeyError: |
|
317 results = self._syntax_tree_search(session, union, args) |
|
318 self._query_cache[rqlkey] = results |
|
319 assert dbg_results(results) |
|
320 return results |
|
321 |
|
322 def _syntax_tree_search(self, session, union, args): |
|
323 """return result from this source for a rql query (actually from a rql |
|
324 syntax tree and a solution dictionary mapping each used variable to a |
|
325 possible type). If cachekey is given, the query necessary to fetch the |
|
326 results (but not the results themselves) may be cached using this key. |
|
327 """ |
|
328 if not args is None: |
|
329 args = args.copy() |
|
330 # get cached cursor anyway |
|
331 cu = session.cnxset[self.uri] |
|
332 if cu is None: |
|
333 # this is a ConnectionWrapper instance |
|
334 msg = session._("can't connect to source %s, some data may be missing") |
|
335 session.set_shared_data('sources_error', msg % self.uri, txdata=True) |
|
336 return [] |
|
337 translator = RQL2RQL(self) |
|
338 try: |
|
339 rql = translator.generate(session, union, args) |
|
340 except UnknownEid, ex: |
|
341 if server.DEBUG: |
|
342 print ' unknown eid', ex, 'no results' |
|
343 return [] |
|
344 if server.DEBUG & server.DBG_RQL: |
|
345 print ' translated rql', rql |
|
346 try: |
|
347 rset = cu.execute(rql, args) |
|
348 except Exception, ex: |
|
349 self.exception(str(ex)) |
|
350 msg = session._("error while querying source %s, some data may be missing") |
|
351 session.set_shared_data('sources_error', msg % self.uri, txdata=True) |
|
352 return [] |
|
353 descr = rset.description |
|
354 if rset: |
|
355 needtranslation = [] |
|
356 rows = rset.rows |
|
357 for i, etype in enumerate(descr[0]): |
|
358 if (etype is None or not self.schema.eschema(etype).final |
|
359 or uidtype(union, i, etype, args)): |
|
360 needtranslation.append(i) |
|
361 if needtranslation: |
|
362 cnx = session.cnxset.connection(self.uri) |
|
363 for rowindex in xrange(rset.rowcount - 1, -1, -1): |
|
364 row = rows[rowindex] |
|
365 localrow = False |
|
366 for colindex in needtranslation: |
|
367 if row[colindex] is not None: # optional variable |
|
368 eid, local = self.local_eid(cnx, row[colindex], session) |
|
369 if local: |
|
370 localrow = True |
|
371 if eid is not None: |
|
372 row[colindex] = eid |
|
373 else: |
|
374 # skip this row |
|
375 del rows[rowindex] |
|
376 del descr[rowindex] |
|
377 break |
|
378 else: |
|
379 # skip row if it only contains eids of entities which |
|
380 # are actually from a source we also know locally, |
|
381 # except if some args specified (XXX should actually |
|
382 # check if there are some args local to the source) |
|
383 if not (translator.has_local_eid or localrow): |
|
384 del rows[rowindex] |
|
385 del descr[rowindex] |
|
386 results = rows |
|
387 else: |
|
388 results = [] |
|
389 return results |
|
390 |
|
391 def _entity_relations_and_kwargs(self, session, entity): |
|
392 relations = [] |
|
393 kwargs = {'x': self.repo.eid2extid(self, entity.eid, session)} |
|
394 for key, val in entity.cw_attr_cache.iteritems(): |
|
395 relations.append('X %s %%(%s)s' % (key, key)) |
|
396 kwargs[key] = val |
|
397 return relations, kwargs |
|
398 |
|
399 def add_entity(self, session, entity): |
|
400 """add a new entity to the source""" |
|
401 raise NotImplementedError() |
|
402 |
|
403 def update_entity(self, session, entity): |
|
404 """update an entity in the source""" |
|
405 relations, kwargs = self._entity_relations_and_kwargs(session, entity) |
|
406 cu = session.cnxset[self.uri] |
|
407 cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs) |
|
408 self._query_cache.clear() |
|
409 entity.cw_clear_all_caches() |
|
410 |
|
411 def delete_entity(self, session, entity): |
|
412 """delete an entity from the source""" |
|
413 if session.deleted_in_transaction(self.eid): |
|
414 # source is being deleted, don't propagate |
|
415 self._query_cache.clear() |
|
416 return |
|
417 cu = session.cnxset[self.uri] |
|
418 cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__, |
|
419 {'x': self.repo.eid2extid(self, entity.eid, session)}) |
|
420 self._query_cache.clear() |
|
421 |
|
422 def add_relation(self, session, subject, rtype, object): |
|
423 """add a relation to the source""" |
|
424 cu = session.cnxset[self.uri] |
|
425 cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
|
426 {'x': self.repo.eid2extid(self, subject, session), |
|
427 'y': self.repo.eid2extid(self, object, session)}) |
|
428 self._query_cache.clear() |
|
429 session.entity_from_eid(subject).cw_clear_all_caches() |
|
430 session.entity_from_eid(object).cw_clear_all_caches() |
|
431 |
|
432 def delete_relation(self, session, subject, rtype, object): |
|
433 """delete a relation from the source""" |
|
434 if session.deleted_in_transaction(self.eid): |
|
435 # source is being deleted, don't propagate |
|
436 self._query_cache.clear() |
|
437 return |
|
438 cu = session.cnxset[self.uri] |
|
439 cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
|
440 {'x': self.repo.eid2extid(self, subject, session), |
|
441 'y': self.repo.eid2extid(self, object, session)}) |
|
442 self._query_cache.clear() |
|
443 session.entity_from_eid(subject).cw_clear_all_caches() |
|
444 session.entity_from_eid(object).cw_clear_all_caches() |
|
445 |
|
446 |
|
447 class RQL2RQL(object): |
|
448 """translate a local rql query to be executed on a distant repository""" |
|
449 def __init__(self, source): |
|
450 self.source = source |
|
451 self.repo = source.repo |
|
452 self.current_operator = None |
|
453 |
|
454 def _accept_children(self, node): |
|
455 res = [] |
|
456 for child in node.children: |
|
457 rql = child.accept(self) |
|
458 if rql is not None: |
|
459 res.append(rql) |
|
460 return res |
|
461 |
|
462 def generate(self, session, rqlst, args): |
|
463 self._session = session |
|
464 self.kwargs = args |
|
465 self.need_translation = False |
|
466 self.has_local_eid = False |
|
467 return self.visit_union(rqlst) |
|
468 |
|
469 def visit_union(self, node): |
|
470 s = self._accept_children(node) |
|
471 if len(s) > 1: |
|
472 return ' UNION '.join('(%s)' % q for q in s) |
|
473 return s[0] |
|
474 |
|
475 def visit_select(self, node): |
|
476 """return the tree as an encoded rql string""" |
|
477 self._varmaker = rqlvar_maker(defined=node.defined_vars.copy()) |
|
478 self._const_var = {} |
|
479 if node.distinct: |
|
480 base = 'DISTINCT Any' |
|
481 else: |
|
482 base = 'Any' |
|
483 s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))] |
|
484 if node.groupby: |
|
485 s.append('GROUPBY %s' % ', '.join(group.accept(self) |
|
486 for group in node.groupby)) |
|
487 if node.orderby: |
|
488 s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term) |
|
489 for term in node.orderby)) |
|
490 if node.limit is not None: |
|
491 s.append('LIMIT %s' % node.limit) |
|
492 if node.offset: |
|
493 s.append('OFFSET %s' % node.offset) |
|
494 restrictions = [] |
|
495 if node.where is not None: |
|
496 nr = node.where.accept(self) |
|
497 if nr is not None: |
|
498 restrictions.append(nr) |
|
499 if restrictions: |
|
500 s.append('WHERE %s' % ','.join(restrictions)) |
|
501 |
|
502 if node.having: |
|
503 s.append('HAVING %s' % ', '.join(term.accept(self) |
|
504 for term in node.having)) |
|
505 subqueries = [] |
|
506 for subquery in node.with_: |
|
507 subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases), |
|
508 self.visit_union(subquery.query))) |
|
509 if subqueries: |
|
510 s.append('WITH %s' % (','.join(subqueries))) |
|
511 return ' '.join(s) |
|
512 |
|
513 def visit_and(self, node): |
|
514 res = self._accept_children(node) |
|
515 if res: |
|
516 return ', '.join(res) |
|
517 return |
|
518 |
|
519 def visit_or(self, node): |
|
520 res = self._accept_children(node) |
|
521 if len(res) > 1: |
|
522 return ' OR '.join('(%s)' % rql for rql in res) |
|
523 elif res: |
|
524 return res[0] |
|
525 return |
|
526 |
|
527 def visit_not(self, node): |
|
528 rql = node.children[0].accept(self) |
|
529 if rql: |
|
530 return 'NOT (%s)' % rql |
|
531 return |
|
532 |
|
533 def visit_exists(self, node): |
|
534 rql = node.children[0].accept(self) |
|
535 if rql: |
|
536 return 'EXISTS(%s)' % rql |
|
537 return |
|
538 |
|
539 def visit_relation(self, node): |
|
540 try: |
|
541 if isinstance(node.children[0], Constant): |
|
542 # simplified rqlst, reintroduce eid relation |
|
543 try: |
|
544 restr, lhs = self.process_eid_const(node.children[0]) |
|
545 except UnknownEid: |
|
546 # can safely skip not relation with an unsupported eid |
|
547 if neged_relation(node): |
|
548 return |
|
549 raise |
|
550 else: |
|
551 lhs = node.children[0].accept(self) |
|
552 restr = None |
|
553 except UnknownEid: |
|
554 # can safely skip not relation with an unsupported eid |
|
555 if neged_relation(node): |
|
556 return |
|
557 # XXX what about optional relation or outer NOT EXISTS() |
|
558 raise |
|
559 if node.optional in ('left', 'both'): |
|
560 lhs += '?' |
|
561 if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final: |
|
562 self.need_translation = True |
|
563 self.current_operator = node.operator() |
|
564 if isinstance(node.children[0], Constant): |
|
565 self.current_etypes = (node.children[0].uidtype,) |
|
566 else: |
|
567 self.current_etypes = node.children[0].variable.stinfo['possibletypes'] |
|
568 try: |
|
569 rhs = node.children[1].accept(self) |
|
570 except UnknownEid: |
|
571 # can safely skip not relation with an unsupported eid |
|
572 if neged_relation(node): |
|
573 return |
|
574 # XXX what about optional relation or outer NOT EXISTS() |
|
575 raise |
|
576 except ReplaceByInOperator, ex: |
|
577 rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids) |
|
578 self.need_translation = False |
|
579 self.current_operator = None |
|
580 if node.optional in ('right', 'both'): |
|
581 rhs += '?' |
|
582 if restr is not None: |
|
583 return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr) |
|
584 return '%s %s %s' % (lhs, node.r_type, rhs) |
|
585 |
|
586 def visit_comparison(self, node): |
|
587 if node.operator in ('=', 'IS'): |
|
588 return node.children[0].accept(self) |
|
589 return '%s %s' % (node.operator.encode(), |
|
590 node.children[0].accept(self)) |
|
591 |
|
592 def visit_mathexpression(self, node): |
|
593 return '(%s %s %s)' % (node.children[0].accept(self), |
|
594 node.operator.encode(), |
|
595 node.children[1].accept(self)) |
|
596 |
|
597 def visit_function(self, node): |
|
598 #if node.name == 'IN': |
|
599 res = [] |
|
600 for child in node.children: |
|
601 try: |
|
602 rql = child.accept(self) |
|
603 except UnknownEid, ex: |
|
604 continue |
|
605 res.append(rql) |
|
606 if not res: |
|
607 raise ex |
|
608 return '%s(%s)' % (node.name, ', '.join(res)) |
|
609 |
|
610 def visit_constant(self, node): |
|
611 if self.need_translation or node.uidtype: |
|
612 if node.type == 'Int': |
|
613 self.has_local_eid = True |
|
614 return str(self.eid2extid(node.value)) |
|
615 if node.type == 'Substitute': |
|
616 key = node.value |
|
617 # ensure we have not yet translated the value... |
|
618 if not key in self._const_var: |
|
619 self.kwargs[key] = self.eid2extid(self.kwargs[key]) |
|
620 self._const_var[key] = None |
|
621 self.has_local_eid = True |
|
622 return node.as_string() |
|
623 |
|
624 def visit_variableref(self, node): |
|
625 """get the sql name for a variable reference""" |
|
626 return node.name |
|
627 |
|
628 def visit_sortterm(self, node): |
|
629 if node.asc: |
|
630 return node.term.accept(self) |
|
631 return '%s DESC' % node.term.accept(self) |
|
632 |
|
633 def process_eid_const(self, const): |
|
634 value = const.eval(self.kwargs) |
|
635 try: |
|
636 return None, self._const_var[value] |
|
637 except Exception: |
|
638 var = self._varmaker.next() |
|
639 self.need_translation = True |
|
640 restr = '%s eid %s' % (var, self.visit_constant(const)) |
|
641 self.need_translation = False |
|
642 self._const_var[value] = var |
|
643 return restr, var |
|
644 |
|
645 def eid2extid(self, eid): |
|
646 try: |
|
647 return self.repo.eid2extid(self.source, eid, self._session) |
|
648 except UnknownEid: |
|
649 operator = self.current_operator |
|
650 if operator is not None and operator != '=': |
|
651 # deal with query like "X eid > 12" |
|
652 # |
|
653 # The problem is that eid order in the external source may |
|
654 # differ from the local source |
|
655 # |
|
656 # So search for all eids from this source matching the condition |
|
657 # locally and then to replace the "> 12" branch by "IN (eids)" |
|
658 # |
|
659 # XXX we may have to insert a huge number of eids...) |
|
660 sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s" |
|
661 etypes = ','.join("'%s'" % etype for etype in self.current_etypes) |
|
662 cu = self._session.system_sql(sql % (self.source.uri, etypes, |
|
663 operator, eid)) |
|
664 # XXX buggy cu.rowcount which may be zero while there are some |
|
665 # results |
|
666 rows = cu.fetchall() |
|
667 if rows: |
|
668 raise ReplaceByInOperator((b64decode(r[0]) for r in rows)) |
|
669 raise |
|
670 |