19 |
19 |
20 __docformat__ = "restructuredtext en" |
20 __docformat__ = "restructuredtext en" |
21 _ = unicode |
21 _ = unicode |
22 |
22 |
23 import threading |
23 import threading |
24 from os.path import join |
|
25 from time import mktime |
|
26 from datetime import datetime |
|
27 from base64 import b64decode |
|
28 |
|
29 from Pyro.errors import PyroError, ConnectionClosedError |
24 from Pyro.errors import PyroError, ConnectionClosedError |
30 |
25 |
31 from logilab.common.configuration import REQUIRED |
26 from logilab.common.configuration import REQUIRED |
32 from logilab.common.optik_ext import check_yn |
|
33 |
27 |
34 from yams.schema import role_name |
28 from cubicweb import dbapi |
|
29 from cubicweb import ConnectionError |
|
30 from cubicweb.server.sources import ConnectionWrapper |
35 |
31 |
36 from rql.nodes import Constant |
32 from cubicweb.server.sources.remoterql import RemoteSource |
37 from rql.utils import rqlvar_maker |
|
38 |
33 |
39 from cubicweb import dbapi, server |
34 class PyroRQLSource(RemoteSource): |
40 from cubicweb import ValidationError, BadConnectionId, UnknownEid, ConnectionError |
|
41 from cubicweb.schema import VIRTUAL_RTYPES |
|
42 from cubicweb.cwconfig import register_persistent_options |
|
43 from cubicweb.server.sources import (AbstractSource, ConnectionWrapper, |
|
44 TimedCache, dbg_st_search, dbg_results) |
|
45 from cubicweb.server.msplanner import neged_relation |
|
46 |
|
47 def uidtype(union, col, etype, args): |
|
48 select, col = union.locate_subquery(col, etype, args) |
|
49 return getattr(select.selection[col], 'uidtype', None) |
|
50 |
|
51 |
|
52 class ReplaceByInOperator(Exception): |
|
53 def __init__(self, eids): |
|
54 self.eids = eids |
|
55 |
|
56 class PyroRQLSource(AbstractSource): |
|
57 """External repository source, using Pyro connection""" |
35 """External repository source, using Pyro connection""" |
58 |
36 |
59 # boolean telling if modification hooks should be called when something is |
37 CNX_TYPE = 'pyro' |
60 # modified in this source |
|
61 should_call_hooks = False |
|
62 # boolean telling if the repository should connect to this source during |
|
63 # migration |
|
64 connect_for_migration = False |
|
65 |
38 |
66 options = ( |
39 options = RemoteSource.options + ( |
67 # XXX pyro-ns host/port |
40 # XXX pyro-ns host/port |
68 ('pyro-ns-id', |
41 ('pyro-ns-id', |
69 {'type' : 'string', |
42 {'type' : 'string', |
70 'default': REQUIRED, |
43 'default': REQUIRED, |
71 'help': 'identifier of the repository in the pyro name server', |
44 'help': 'identifier of the repository in the pyro name server', |
72 'group': 'pyro-source', 'level': 0, |
45 'group': 'remote-source', 'level': 0, |
73 }), |
|
74 ('cubicweb-user', |
|
75 {'type' : 'string', |
|
76 'default': REQUIRED, |
|
77 'help': 'user to use for connection on the distant repository', |
|
78 'group': 'pyro-source', 'level': 0, |
|
79 }), |
|
80 ('cubicweb-password', |
|
81 {'type' : 'password', |
|
82 'default': '', |
|
83 'help': 'user to use for connection on the distant repository', |
|
84 'group': 'pyro-source', 'level': 0, |
|
85 }), |
|
86 ('base-url', |
|
87 {'type' : 'string', |
|
88 'default': '', |
|
89 'help': 'url of the web site for the distant repository, if you want ' |
|
90 'to generate external link to entities from this repository', |
|
91 'group': 'pyro-source', 'level': 1, |
|
92 }), |
|
93 ('skip-external-entities', |
|
94 {'type' : 'yn', |
|
95 'default': False, |
|
96 'help': 'should entities not local to the source be considered or not', |
|
97 'group': 'pyro-source', 'level': 0, |
|
98 }), |
46 }), |
99 ('pyro-ns-host', |
47 ('pyro-ns-host', |
100 {'type' : 'string', |
48 {'type' : 'string', |
101 'default': None, |
49 'default': None, |
102 'help': 'Pyro name server\'s host. If not set, default to the value \ |
50 'help': 'Pyro name server\'s host. If not set, default to the value \ |
103 from all_in_one.conf. It may contains port information using <host>:<port> notation.', |
51 from all_in_one.conf. It may contains port information using <host>:<port> notation.', |
104 'group': 'pyro-source', 'level': 1, |
52 'group': 'remote-source', 'level': 1, |
105 }), |
53 }), |
106 ('pyro-ns-group', |
54 ('pyro-ns-group', |
107 {'type' : 'string', |
55 {'type' : 'string', |
108 'default': None, |
56 'default': None, |
109 'help': 'Pyro name server\'s group where the repository will be \ |
57 'help': 'Pyro name server\'s group where the repository will be \ |
110 registered. If not set, default to the value from all_in_one.conf.', |
58 registered. If not set, default to the value from all_in_one.conf.', |
111 'group': 'pyro-source', 'level': 2, |
59 'group': 'remote-source', 'level': 2, |
112 }), |
60 }), |
113 ('synchronization-interval', |
|
114 {'type' : 'time', |
|
115 'default': '5min', |
|
116 'help': 'interval between synchronization with the external \ |
|
117 repository (default to 5 minutes).', |
|
118 'group': 'pyro-source', 'level': 2, |
|
119 }), |
|
120 |
|
121 ) |
61 ) |
122 |
|
123 PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',) |
|
124 _conn = None |
|
125 |
|
126 def __init__(self, repo, source_config, eid=None): |
|
127 AbstractSource.__init__(self, repo, source_config, eid) |
|
128 self.update_config(None, self.check_conf_dict(eid, source_config, |
|
129 fail_if_unknown=False)) |
|
130 self._query_cache = TimedCache(1800) |
|
131 |
|
132 def update_config(self, source_entity, processed_config): |
|
133 """update configuration from source entity""" |
|
134 # XXX get it through pyro if unset |
|
135 baseurl = processed_config.get('base-url') |
|
136 if baseurl and not baseurl.endswith('/'): |
|
137 processed_config['base-url'] += '/' |
|
138 self.config = processed_config |
|
139 self._skip_externals = processed_config['skip-external-entities'] |
|
140 if source_entity is not None: |
|
141 self.latest_retrieval = source_entity.latest_retrieval |
|
142 |
|
143 def reset_caches(self): |
|
144 """method called during test to reset potential source caches""" |
|
145 self._query_cache = TimedCache(1800) |
|
146 |
|
147 def init(self, activated, source_entity): |
|
148 """method called by the repository once ready to handle request""" |
|
149 self.load_mapping(source_entity._cw) |
|
150 if activated: |
|
151 interval = self.config['synchronization-interval'] |
|
152 self.repo.looping_task(interval, self.synchronize) |
|
153 self.repo.looping_task(self._query_cache.ttl.seconds/10, |
|
154 self._query_cache.clear_expired) |
|
155 self.latest_retrieval = source_entity.latest_retrieval |
|
156 |
|
157 def load_mapping(self, session=None): |
|
158 self.support_entities = {} |
|
159 self.support_relations = {} |
|
160 self.dont_cross_relations = set(('owned_by', 'created_by')) |
|
161 self.cross_relations = set() |
|
162 assert self.eid is not None |
|
163 self._schemacfg_idx = {} |
|
164 self._load_mapping(session) |
|
165 |
|
166 etype_options = set(('write',)) |
|
167 rtype_options = set(('maycross', 'dontcross', 'write',)) |
|
168 |
|
169 def _check_options(self, schemacfg, allowedoptions): |
|
170 if schemacfg.options: |
|
171 options = set(w.strip() for w in schemacfg.options.split(':')) |
|
172 else: |
|
173 options = set() |
|
174 if options - allowedoptions: |
|
175 options = ', '.join(sorted(options - allowedoptions)) |
|
176 msg = _('unknown option(s): %s' % options) |
|
177 raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg}) |
|
178 return options |
|
179 |
|
180 def add_schema_config(self, schemacfg, checkonly=False): |
|
181 """added CWSourceSchemaConfig, modify mapping accordingly""" |
|
182 try: |
|
183 ertype = schemacfg.schema.name |
|
184 except AttributeError: |
|
185 msg = schemacfg._cw._("attribute/relation can't be mapped, only " |
|
186 "entity and relation types") |
|
187 raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg}) |
|
188 if schemacfg.schema.__regid__ == 'CWEType': |
|
189 options = self._check_options(schemacfg, self.etype_options) |
|
190 if not checkonly: |
|
191 self.support_entities[ertype] = 'write' in options |
|
192 else: # CWRType |
|
193 if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES: |
|
194 msg = schemacfg._cw._('%s relation should not be in mapped') % ertype |
|
195 raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg}) |
|
196 options = self._check_options(schemacfg, self.rtype_options) |
|
197 if 'dontcross' in options: |
|
198 if 'maycross' in options: |
|
199 msg = schemacfg._("can't mix dontcross and maycross options") |
|
200 raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg}) |
|
201 if 'write' in options: |
|
202 msg = schemacfg._("can't mix dontcross and write options") |
|
203 raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg}) |
|
204 if not checkonly: |
|
205 self.dont_cross_relations.add(ertype) |
|
206 elif not checkonly: |
|
207 self.support_relations[ertype] = 'write' in options |
|
208 if 'maycross' in options: |
|
209 self.cross_relations.add(ertype) |
|
210 if not checkonly: |
|
211 # add to an index to ease deletion handling |
|
212 self._schemacfg_idx[schemacfg.eid] = ertype |
|
213 |
|
214 def del_schema_config(self, schemacfg, checkonly=False): |
|
215 """deleted CWSourceSchemaConfig, modify mapping accordingly""" |
|
216 if checkonly: |
|
217 return |
|
218 try: |
|
219 ertype = self._schemacfg_idx[schemacfg.eid] |
|
220 if ertype[0].isupper(): |
|
221 del self.support_entities[ertype] |
|
222 else: |
|
223 if ertype in self.support_relations: |
|
224 del self.support_relations[ertype] |
|
225 if ertype in self.cross_relations: |
|
226 self.cross_relations.remove(ertype) |
|
227 else: |
|
228 self.dont_cross_relations.remove(ertype) |
|
229 except Exception: |
|
230 self.error('while updating mapping consequently to removal of %s', |
|
231 schemacfg) |
|
232 |
|
233 def local_eid(self, cnx, extid, session): |
|
234 etype, dexturi, dextid = cnx.describe(extid) |
|
235 if dexturi == 'system' or not ( |
|
236 dexturi in self.repo.sources_by_uri or self._skip_externals): |
|
237 assert etype in self.support_entities, etype |
|
238 eid = self.repo.extid2eid(self, str(extid), etype, session) |
|
239 if eid > 0: |
|
240 return eid, True |
|
241 elif dexturi in self.repo.sources_by_uri: |
|
242 source = self.repo.sources_by_uri[dexturi] |
|
243 cnx = session.cnxset.connection(source.uri) |
|
244 eid = source.local_eid(cnx, dextid, session)[0] |
|
245 return eid, False |
|
246 return None, None |
|
247 |
|
248 def synchronize(self, mtime=None): |
|
249 """synchronize content known by this repository with content in the |
|
250 external repository |
|
251 """ |
|
252 self.info('synchronizing pyro source %s', self.uri) |
|
253 cnx = self.get_connection() |
|
254 try: |
|
255 extrepo = cnx._repo |
|
256 except AttributeError: |
|
257 # fake connection wrapper returned when we can't connect to the |
|
258 # external source (hence we've no chance to synchronize...) |
|
259 return |
|
260 etypes = self.support_entities.keys() |
|
261 if mtime is None: |
|
262 mtime = self.latest_retrieval |
|
263 updatetime, modified, deleted = extrepo.entities_modified_since( |
|
264 etypes, mtime) |
|
265 self._query_cache.clear() |
|
266 repo = self.repo |
|
267 session = repo.internal_session() |
|
268 source = repo.system_source |
|
269 try: |
|
270 for etype, extid in modified: |
|
271 try: |
|
272 eid = self.local_eid(cnx, extid, session)[0] |
|
273 if eid is not None: |
|
274 rset = session.eid_rset(eid, etype) |
|
275 entity = rset.get_entity(0, 0) |
|
276 entity.complete(entity.e_schema.indexable_attributes()) |
|
277 source.index_entity(session, entity) |
|
278 except Exception: |
|
279 self.exception('while updating %s with external id %s of source %s', |
|
280 etype, extid, self.uri) |
|
281 continue |
|
282 for etype, extid in deleted: |
|
283 try: |
|
284 eid = self.repo.extid2eid(self, str(extid), etype, session, |
|
285 insert=False) |
|
286 # entity has been deleted from external repository but is not known here |
|
287 if eid is not None: |
|
288 entity = session.entity_from_eid(eid, etype) |
|
289 repo.delete_info(session, entity, self.uri, |
|
290 scleanup=self.eid) |
|
291 except Exception: |
|
292 if self.repo.config.mode == 'test': |
|
293 raise |
|
294 self.exception('while updating %s with external id %s of source %s', |
|
295 etype, extid, self.uri) |
|
296 continue |
|
297 self.latest_retrieval = updatetime |
|
298 session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s', |
|
299 {'x': self.eid, 'date': self.latest_retrieval}) |
|
300 session.commit() |
|
301 finally: |
|
302 session.close() |
|
303 |
62 |
304 def _get_connection(self): |
63 def _get_connection(self): |
305 """open and return a connection to the source""" |
64 """open and return a connection to the source""" |
306 nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host'] |
65 nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host'] |
307 nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group'] |
66 nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group'] |
308 self.info('connecting to instance :%s.%s for user %s', |
67 self.info('connecting to instance :%s.%s for user %s', |
309 nsgroup, self.config['pyro-ns-id'], self.config['cubicweb-user']) |
68 nsgroup, self.config['pyro-ns-id'], self.config['cubicweb-user']) |
310 #cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type']) |
|
311 return dbapi.connect(database=self.config['pyro-ns-id'], |
69 return dbapi.connect(database=self.config['pyro-ns-id'], |
312 login=self.config['cubicweb-user'], |
70 login=self.config['cubicweb-user'], |
313 password=self.config['cubicweb-password'], |
71 password=self.config['cubicweb-password'], |
314 host=nshost, group=nsgroup, |
72 host=nshost, group=nsgroup, |
315 setvreg=False) #cnxprops=cnxprops) |
73 setvreg=False) |
316 |
74 |
317 def get_connection(self): |
75 def get_connection(self): |
318 try: |
76 try: |
319 return self._get_connection() |
77 return self._get_connection() |
320 except (ConnectionError, PyroError), ex: |
78 except (ConnectionError, PyroError), ex: |
331 try: |
89 try: |
332 cnx._repo._transferThread(threading.currentThread()) |
90 cnx._repo._transferThread(threading.currentThread()) |
333 except AttributeError: |
91 except AttributeError: |
334 # inmemory connection |
92 # inmemory connection |
335 pass |
93 pass |
336 if not isinstance(cnx, ConnectionWrapper): |
94 return super(PyroRQLSource, self).check_connection(cnx) |
337 try: |
|
338 cnx.check() |
|
339 return # ok |
|
340 except (BadConnectionId, ConnectionClosedError): |
|
341 pass |
|
342 # try to reconnect |
|
343 return self.get_connection() |
|
344 |
95 |
345 def syntax_tree_search(self, session, union, args=None, cachekey=None, |
|
346 varmap=None): |
|
347 assert dbg_st_search(self.uri, union, varmap, args, cachekey) |
|
348 rqlkey = union.as_string(kwargs=args) |
|
349 try: |
|
350 results = self._query_cache[rqlkey] |
|
351 except KeyError: |
|
352 results = self._syntax_tree_search(session, union, args) |
|
353 self._query_cache[rqlkey] = results |
|
354 assert dbg_results(results) |
|
355 return results |
|
356 |
|
357 def _syntax_tree_search(self, session, union, args): |
|
358 """return result from this source for a rql query (actually from a rql |
|
359 syntax tree and a solution dictionary mapping each used variable to a |
|
360 possible type). If cachekey is given, the query necessary to fetch the |
|
361 results (but not the results themselves) may be cached using this key. |
|
362 """ |
|
363 if not args is None: |
|
364 args = args.copy() |
|
365 # get cached cursor anyway |
|
366 cu = session.cnxset[self.uri] |
|
367 if cu is None: |
|
368 # this is a ConnectionWrapper instance |
|
369 msg = session._("can't connect to source %s, some data may be missing") |
|
370 session.set_shared_data('sources_error', msg % self.uri, txdata=True) |
|
371 return [] |
|
372 translator = RQL2RQL(self) |
|
373 try: |
|
374 rql = translator.generate(session, union, args) |
|
375 except UnknownEid, ex: |
|
376 if server.DEBUG: |
|
377 print ' unknown eid', ex, 'no results' |
|
378 return [] |
|
379 if server.DEBUG & server.DBG_RQL: |
|
380 print ' translated rql', rql |
|
381 try: |
|
382 rset = cu.execute(rql, args) |
|
383 except Exception, ex: |
|
384 self.exception(str(ex)) |
|
385 msg = session._("error while querying source %s, some data may be missing") |
|
386 session.set_shared_data('sources_error', msg % self.uri, txdata=True) |
|
387 return [] |
|
388 descr = rset.description |
|
389 if rset: |
|
390 needtranslation = [] |
|
391 rows = rset.rows |
|
392 for i, etype in enumerate(descr[0]): |
|
393 if (etype is None or not self.schema.eschema(etype).final |
|
394 or uidtype(union, i, etype, args)): |
|
395 needtranslation.append(i) |
|
396 if needtranslation: |
|
397 cnx = session.cnxset.connection(self.uri) |
|
398 for rowindex in xrange(rset.rowcount - 1, -1, -1): |
|
399 row = rows[rowindex] |
|
400 localrow = False |
|
401 for colindex in needtranslation: |
|
402 if row[colindex] is not None: # optional variable |
|
403 eid, local = self.local_eid(cnx, row[colindex], session) |
|
404 if local: |
|
405 localrow = True |
|
406 if eid is not None: |
|
407 row[colindex] = eid |
|
408 else: |
|
409 # skip this row |
|
410 del rows[rowindex] |
|
411 del descr[rowindex] |
|
412 break |
|
413 else: |
|
414 # skip row if it only contains eids of entities which |
|
415 # are actually from a source we also know locally, |
|
416 # except if some args specified (XXX should actually |
|
417 # check if there are some args local to the source) |
|
418 if not (translator.has_local_eid or localrow): |
|
419 del rows[rowindex] |
|
420 del descr[rowindex] |
|
421 results = rows |
|
422 else: |
|
423 results = [] |
|
424 return results |
|
425 |
|
426 def _entity_relations_and_kwargs(self, session, entity): |
|
427 relations = [] |
|
428 kwargs = {'x': self.repo.eid2extid(self, entity.eid, session)} |
|
429 for key, val in entity.cw_attr_cache.iteritems(): |
|
430 relations.append('X %s %%(%s)s' % (key, key)) |
|
431 kwargs[key] = val |
|
432 return relations, kwargs |
|
433 |
|
434 def add_entity(self, session, entity): |
|
435 """add a new entity to the source""" |
|
436 raise NotImplementedError() |
|
437 |
|
438 def update_entity(self, session, entity): |
|
439 """update an entity in the source""" |
|
440 relations, kwargs = self._entity_relations_and_kwargs(session, entity) |
|
441 cu = session.cnxset[self.uri] |
|
442 cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs) |
|
443 self._query_cache.clear() |
|
444 entity.cw_clear_all_caches() |
|
445 |
|
446 def delete_entity(self, session, entity): |
|
447 """delete an entity from the source""" |
|
448 if session.deleted_in_transaction(self.eid): |
|
449 # source is being deleted, don't propagate |
|
450 self._query_cache.clear() |
|
451 return |
|
452 cu = session.cnxset[self.uri] |
|
453 cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__, |
|
454 {'x': self.repo.eid2extid(self, entity.eid, session)}) |
|
455 self._query_cache.clear() |
|
456 |
|
457 def add_relation(self, session, subject, rtype, object): |
|
458 """add a relation to the source""" |
|
459 cu = session.cnxset[self.uri] |
|
460 cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
|
461 {'x': self.repo.eid2extid(self, subject, session), |
|
462 'y': self.repo.eid2extid(self, object, session)}) |
|
463 self._query_cache.clear() |
|
464 session.entity_from_eid(subject).cw_clear_all_caches() |
|
465 session.entity_from_eid(object).cw_clear_all_caches() |
|
466 |
|
467 def delete_relation(self, session, subject, rtype, object): |
|
468 """delete a relation from the source""" |
|
469 if session.deleted_in_transaction(self.eid): |
|
470 # source is being deleted, don't propagate |
|
471 self._query_cache.clear() |
|
472 return |
|
473 cu = session.cnxset[self.uri] |
|
474 cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype, |
|
475 {'x': self.repo.eid2extid(self, subject, session), |
|
476 'y': self.repo.eid2extid(self, object, session)}) |
|
477 self._query_cache.clear() |
|
478 session.entity_from_eid(subject).cw_clear_all_caches() |
|
479 session.entity_from_eid(object).cw_clear_all_caches() |
|
480 |
|
481 |
|
482 class RQL2RQL(object): |
|
483 """translate a local rql query to be executed on a distant repository""" |
|
484 def __init__(self, source): |
|
485 self.source = source |
|
486 self.repo = source.repo |
|
487 self.current_operator = None |
|
488 |
|
489 def _accept_children(self, node): |
|
490 res = [] |
|
491 for child in node.children: |
|
492 rql = child.accept(self) |
|
493 if rql is not None: |
|
494 res.append(rql) |
|
495 return res |
|
496 |
|
497 def generate(self, session, rqlst, args): |
|
498 self._session = session |
|
499 self.kwargs = args |
|
500 self.need_translation = False |
|
501 self.has_local_eid = False |
|
502 return self.visit_union(rqlst) |
|
503 |
|
504 def visit_union(self, node): |
|
505 s = self._accept_children(node) |
|
506 if len(s) > 1: |
|
507 return ' UNION '.join('(%s)' % q for q in s) |
|
508 return s[0] |
|
509 |
|
510 def visit_select(self, node): |
|
511 """return the tree as an encoded rql string""" |
|
512 self._varmaker = rqlvar_maker(defined=node.defined_vars.copy()) |
|
513 self._const_var = {} |
|
514 if node.distinct: |
|
515 base = 'DISTINCT Any' |
|
516 else: |
|
517 base = 'Any' |
|
518 s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))] |
|
519 if node.groupby: |
|
520 s.append('GROUPBY %s' % ', '.join(group.accept(self) |
|
521 for group in node.groupby)) |
|
522 if node.orderby: |
|
523 s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term) |
|
524 for term in node.orderby)) |
|
525 if node.limit is not None: |
|
526 s.append('LIMIT %s' % node.limit) |
|
527 if node.offset: |
|
528 s.append('OFFSET %s' % node.offset) |
|
529 restrictions = [] |
|
530 if node.where is not None: |
|
531 nr = node.where.accept(self) |
|
532 if nr is not None: |
|
533 restrictions.append(nr) |
|
534 if restrictions: |
|
535 s.append('WHERE %s' % ','.join(restrictions)) |
|
536 |
|
537 if node.having: |
|
538 s.append('HAVING %s' % ', '.join(term.accept(self) |
|
539 for term in node.having)) |
|
540 subqueries = [] |
|
541 for subquery in node.with_: |
|
542 subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases), |
|
543 self.visit_union(subquery.query))) |
|
544 if subqueries: |
|
545 s.append('WITH %s' % (','.join(subqueries))) |
|
546 return ' '.join(s) |
|
547 |
|
548 def visit_and(self, node): |
|
549 res = self._accept_children(node) |
|
550 if res: |
|
551 return ', '.join(res) |
|
552 return |
|
553 |
|
554 def visit_or(self, node): |
|
555 res = self._accept_children(node) |
|
556 if len(res) > 1: |
|
557 return ' OR '.join('(%s)' % rql for rql in res) |
|
558 elif res: |
|
559 return res[0] |
|
560 return |
|
561 |
|
562 def visit_not(self, node): |
|
563 rql = node.children[0].accept(self) |
|
564 if rql: |
|
565 return 'NOT (%s)' % rql |
|
566 return |
|
567 |
|
568 def visit_exists(self, node): |
|
569 rql = node.children[0].accept(self) |
|
570 if rql: |
|
571 return 'EXISTS(%s)' % rql |
|
572 return |
|
573 |
|
574 def visit_relation(self, node): |
|
575 try: |
|
576 if isinstance(node.children[0], Constant): |
|
577 # simplified rqlst, reintroduce eid relation |
|
578 try: |
|
579 restr, lhs = self.process_eid_const(node.children[0]) |
|
580 except UnknownEid: |
|
581 # can safely skip not relation with an unsupported eid |
|
582 if neged_relation(node): |
|
583 return |
|
584 raise |
|
585 else: |
|
586 lhs = node.children[0].accept(self) |
|
587 restr = None |
|
588 except UnknownEid: |
|
589 # can safely skip not relation with an unsupported eid |
|
590 if neged_relation(node): |
|
591 return |
|
592 # XXX what about optional relation or outer NOT EXISTS() |
|
593 raise |
|
594 if node.optional in ('left', 'both'): |
|
595 lhs += '?' |
|
596 if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final: |
|
597 self.need_translation = True |
|
598 self.current_operator = node.operator() |
|
599 if isinstance(node.children[0], Constant): |
|
600 self.current_etypes = (node.children[0].uidtype,) |
|
601 else: |
|
602 self.current_etypes = node.children[0].variable.stinfo['possibletypes'] |
|
603 try: |
|
604 rhs = node.children[1].accept(self) |
|
605 except UnknownEid: |
|
606 # can safely skip not relation with an unsupported eid |
|
607 if neged_relation(node): |
|
608 return |
|
609 # XXX what about optional relation or outer NOT EXISTS() |
|
610 raise |
|
611 except ReplaceByInOperator, ex: |
|
612 rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids) |
|
613 self.need_translation = False |
|
614 self.current_operator = None |
|
615 if node.optional in ('right', 'both'): |
|
616 rhs += '?' |
|
617 if restr is not None: |
|
618 return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr) |
|
619 return '%s %s %s' % (lhs, node.r_type, rhs) |
|
620 |
|
621 def visit_comparison(self, node): |
|
622 if node.operator in ('=', 'IS'): |
|
623 return node.children[0].accept(self) |
|
624 return '%s %s' % (node.operator.encode(), |
|
625 node.children[0].accept(self)) |
|
626 |
|
627 def visit_mathexpression(self, node): |
|
628 return '(%s %s %s)' % (node.children[0].accept(self), |
|
629 node.operator.encode(), |
|
630 node.children[1].accept(self)) |
|
631 |
|
632 def visit_function(self, node): |
|
633 #if node.name == 'IN': |
|
634 res = [] |
|
635 for child in node.children: |
|
636 try: |
|
637 rql = child.accept(self) |
|
638 except UnknownEid, ex: |
|
639 continue |
|
640 res.append(rql) |
|
641 if not res: |
|
642 raise ex |
|
643 return '%s(%s)' % (node.name, ', '.join(res)) |
|
644 |
|
645 def visit_constant(self, node): |
|
646 if self.need_translation or node.uidtype: |
|
647 if node.type == 'Int': |
|
648 self.has_local_eid = True |
|
649 return str(self.eid2extid(node.value)) |
|
650 if node.type == 'Substitute': |
|
651 key = node.value |
|
652 # ensure we have not yet translated the value... |
|
653 if not key in self._const_var: |
|
654 self.kwargs[key] = self.eid2extid(self.kwargs[key]) |
|
655 self._const_var[key] = None |
|
656 self.has_local_eid = True |
|
657 return node.as_string() |
|
658 |
|
659 def visit_variableref(self, node): |
|
660 """get the sql name for a variable reference""" |
|
661 return node.name |
|
662 |
|
663 def visit_sortterm(self, node): |
|
664 if node.asc: |
|
665 return node.term.accept(self) |
|
666 return '%s DESC' % node.term.accept(self) |
|
667 |
|
668 def process_eid_const(self, const): |
|
669 value = const.eval(self.kwargs) |
|
670 try: |
|
671 return None, self._const_var[value] |
|
672 except Exception: |
|
673 var = self._varmaker.next() |
|
674 self.need_translation = True |
|
675 restr = '%s eid %s' % (var, self.visit_constant(const)) |
|
676 self.need_translation = False |
|
677 self._const_var[value] = var |
|
678 return restr, var |
|
679 |
|
680 def eid2extid(self, eid): |
|
681 try: |
|
682 return self.repo.eid2extid(self.source, eid, self._session) |
|
683 except UnknownEid: |
|
684 operator = self.current_operator |
|
685 if operator is not None and operator != '=': |
|
686 # deal with query like "X eid > 12" |
|
687 # |
|
688 # The problem is that eid order in the external source may |
|
689 # differ from the local source |
|
690 # |
|
691 # So search for all eids from this source matching the condition |
|
692 # locally and then to replace the "> 12" branch by "IN (eids)" |
|
693 # |
|
694 # XXX we may have to insert a huge number of eids...) |
|
695 sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s" |
|
696 etypes = ','.join("'%s'" % etype for etype in self.current_etypes) |
|
697 cu = self._session.system_sql(sql % (self.source.uri, etypes, |
|
698 operator, eid)) |
|
699 # XXX buggy cu.rowcount which may be zero while there are some |
|
700 # results |
|
701 rows = cu.fetchall() |
|
702 if rows: |
|
703 raise ReplaceByInOperator((b64decode(r[0]) for r in rows)) |
|
704 raise |
|
705 |
|