server/sources/remoterql.py
changeset 8354 a9984ceebc26
child 8356 e6688dd9fb52
equal deleted inserted replaced
8353:c1cc2f1cd177 8354:a9984ceebc26
       
     1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     3 #
       
     4 # This file is part of CubicWeb.
       
     5 #
       
     6 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     7 # terms of the GNU Lesser General Public License as published by the Free
       
     8 # Software Foundation, either version 2.1 of the License, or (at your option)
       
     9 # any later version.
       
    10 #
       
    11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    13 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    14 # details.
       
    15 #
       
    16 # You should have received a copy of the GNU Lesser General Public License along
       
    17 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    18 """Source to query another RQL remote repository"""
       
    19 
       
    20 __docformat__ = "restructuredtext en"
       
    21 _ = unicode
       
    22 
       
    23 from os.path import join
       
    24 from base64 import b64decode
       
    25 
       
    26 from logilab.common.configuration import REQUIRED
       
    27 
       
    28 from yams.schema import role_name
       
    29 
       
    30 from rql.nodes import Constant
       
    31 from rql.utils import rqlvar_maker
       
    32 
       
    33 from cubicweb import dbapi, server
       
    34 from cubicweb import ValidationError, BadConnectionId, UnknownEid
       
    35 from cubicweb.schema import VIRTUAL_RTYPES
       
    36 from cubicweb.server.sources import (AbstractSource, ConnectionWrapper,
       
    37                                      TimedCache, dbg_st_search, dbg_results)
       
    38 from cubicweb.server.msplanner import neged_relation
       
    39 
       
    40 def uidtype(union, col, etype, args):
       
    41     select, col = union.locate_subquery(col, etype, args)
       
    42     return getattr(select.selection[col], 'uidtype', None)
       
    43 
       
    44 
       
    45 class ReplaceByInOperator(Exception):
       
    46     def __init__(self, eids):
       
    47         self.eids = eids
       
    48 
       
    49 class RemoteSource(AbstractSource):
       
    50     """Generic external repository source"""
       
    51 
       
    52     CNX_TYPE = None # Must be ovewritted !
       
    53 
       
    54     # boolean telling if modification hooks should be called when something is
       
    55     # modified in this source
       
    56     should_call_hooks = False
       
    57     # boolean telling if the repository should connect to this source during
       
    58     # migration
       
    59     connect_for_migration = False
       
    60 
       
    61     options = (
       
    62 
       
    63         ('cubicweb-user',
       
    64          {'type' : 'string',
       
    65           'default': REQUIRED,
       
    66           'help': 'user to use for connection on the distant repository',
       
    67           'group': 'remote-source', 'level': 0,
       
    68           }),
       
    69         ('cubicweb-password',
       
    70          {'type' : 'password',
       
    71           'default': '',
       
    72           'help': 'user to use for connection on the distant repository',
       
    73           'group': 'remote-source', 'level': 0,
       
    74           }),
       
    75         ('base-url',
       
    76          {'type' : 'string',
       
    77           'default': '',
       
    78           'help': 'url of the web site for the distant repository, if you want '
       
    79           'to generate external link to entities from this repository',
       
    80           'group': 'remote-source', 'level': 1,
       
    81           }),
       
    82         ('skip-external-entities',
       
    83          {'type' : 'yn',
       
    84           'default': False,
       
    85           'help': 'should entities not local to the source be considered or not',
       
    86           'group': 'remote-source', 'level': 0,
       
    87           }),
       
    88         ('synchronization-interval',
       
    89          {'type' : 'time',
       
    90           'default': '5min',
       
    91           'help': 'interval between synchronization with the external \
       
    92 repository (default to 5 minutes).',
       
    93           'group': 'remote-source', 'level': 2,
       
    94           }))
       
    95 
       
    96     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
       
    97 
       
    98     _conn = None
       
    99 
       
   100     def __init__(self, repo, source_config, eid=None):
       
   101         super(AbstractSource, self).__init__(repo, source_config, eid)
       
   102         self.update_config(None, self.check_conf_dict(eid, source_config,
       
   103                                                       fail_if_unknown=False))
       
   104         self._query_cache = TimedCache(1800)
       
   105 
       
   106     def update_config(self, source_entity, processed_config):
       
   107         """update configuration from source entity"""
       
   108         baseurl = processed_config.get('base-url')
       
   109         if baseurl and not baseurl.endswith('/'):
       
   110             processed_config['base-url'] += '/'
       
   111         self.config = processed_config
       
   112         self._skip_externals = processed_config['skip-external-entities']
       
   113         if source_entity is not None:
       
   114             self.latest_retrieval = source_entity.latest_retrieval
       
   115 
       
   116     def _get_connection(self):
       
   117         """open and return a connection to the source"""
       
   118         self.info('connecting to source %(base-url)s with user %(cubicweb-user)s',
       
   119                   self.config)
       
   120         cnxprops = ConnectionProperties(cnxtype=self.CNX_TYPE)
       
   121         return dbapi.connect(login=self.config['cubicweb-user'],
       
   122                              password=self.config['cubicweb-password'],
       
   123                              cnxprops=cnxprops)
       
   124 
       
   125     def get_connection(self):
       
   126         try:
       
   127             return self._get_connection()
       
   128         except ConnectionError, ex:
       
   129             self.critical("can't get connection to source %s: %s", self.uri, ex)
       
   130             return ConnectionWrapper()
       
   131 
       
   132 
       
   133     def reset_caches(self):
       
   134         """method called during test to reset potential source caches"""
       
   135         self._query_cache = TimedCache(1800)
       
   136 
       
   137     def init(self, activated, source_entity):
       
   138         """method called by the repository once ready to handle request"""
       
   139         self.load_mapping(source_entity._cw)
       
   140         if activated:
       
   141             interval = self.config['synchronization-interval']
       
   142             self.repo.looping_task(interval, self.synchronize)
       
   143             self.repo.looping_task(self._query_cache.ttl.seconds/10,
       
   144                                    self._query_cache.clear_expired)
       
   145             self.latest_retrieval = source_entity.latest_retrieval
       
   146 
       
   147     def load_mapping(self, session=None):
       
   148         self.support_entities = {}
       
   149         self.support_relations = {}
       
   150         self.dont_cross_relations = set(('owned_by', 'created_by'))
       
   151         self.cross_relations = set()
       
   152         assert self.eid is not None
       
   153         self._schemacfg_idx = {}
       
   154         self._load_mapping(session)
       
   155 
       
   156     etype_options = set(('write',))
       
   157     rtype_options = set(('maycross', 'dontcross', 'write',))
       
   158 
       
   159     def _check_options(self, schemacfg, allowedoptions):
       
   160         if schemacfg.options:
       
   161             options = set(w.strip() for w in schemacfg.options.split(':'))
       
   162         else:
       
   163             options = set()
       
   164         if options - allowedoptions:
       
   165             options = ', '.join(sorted(options - allowedoptions))
       
   166             msg = _('unknown option(s): %s' % options)
       
   167             raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
       
   168         return options
       
   169 
       
   170     def add_schema_config(self, schemacfg, checkonly=False):
       
   171         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   172         try:
       
   173             ertype = schemacfg.schema.name
       
   174         except AttributeError:
       
   175             msg = schemacfg._cw._("attribute/relation can't be mapped, only "
       
   176                                   "entity and relation types")
       
   177             raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
       
   178         if schemacfg.schema.__regid__ == 'CWEType':
       
   179             options = self._check_options(schemacfg, self.etype_options)
       
   180             if not checkonly:
       
   181                 self.support_entities[ertype] = 'write' in options
       
   182         else: # CWRType
       
   183             if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES:
       
   184                 msg = schemacfg._cw._('%s relation should not be in mapped') % ertype
       
   185                 raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
       
   186             options = self._check_options(schemacfg, self.rtype_options)
       
   187             if 'dontcross' in options:
       
   188                 if 'maycross' in options:
       
   189                     msg = schemacfg._("can't mix dontcross and maycross options")
       
   190                     raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
       
   191                 if 'write' in options:
       
   192                     msg = schemacfg._("can't mix dontcross and write options")
       
   193                     raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
       
   194                 if not checkonly:
       
   195                     self.dont_cross_relations.add(ertype)
       
   196             elif not checkonly:
       
   197                 self.support_relations[ertype] = 'write' in options
       
   198                 if 'maycross' in options:
       
   199                     self.cross_relations.add(ertype)
       
   200         if not checkonly:
       
   201             # add to an index to ease deletion handling
       
   202             self._schemacfg_idx[schemacfg.eid] = ertype
       
   203 
       
   204     def del_schema_config(self, schemacfg, checkonly=False):
       
   205         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   206         if checkonly:
       
   207             return
       
   208         try:
       
   209             ertype = self._schemacfg_idx[schemacfg.eid]
       
   210             if ertype[0].isupper():
       
   211                 del self.support_entities[ertype]
       
   212             else:
       
   213                 if ertype in self.support_relations:
       
   214                     del self.support_relations[ertype]
       
   215                     if ertype in self.cross_relations:
       
   216                         self.cross_relations.remove(ertype)
       
   217                 else:
       
   218                     self.dont_cross_relations.remove(ertype)
       
   219         except Exception:
       
   220             self.error('while updating mapping consequently to removal of %s',
       
   221                        schemacfg)
       
   222 
       
   223     def local_eid(self, cnx, extid, session):
       
   224         etype, dexturi, dextid = cnx.describe(extid)
       
   225         if dexturi == 'system' or not (
       
   226             dexturi in self.repo.sources_by_uri or self._skip_externals):
       
   227             assert etype in self.support_entities, etype
       
   228             eid = self.repo.extid2eid(self, str(extid), etype, session)
       
   229             if eid > 0:
       
   230                 return eid, True
       
   231         elif dexturi in self.repo.sources_by_uri:
       
   232             source = self.repo.sources_by_uri[dexturi]
       
   233             cnx = session.cnxset.connection(source.uri)
       
   234             eid = source.local_eid(cnx, dextid, session)[0]
       
   235             return eid, False
       
   236         return None, None
       
   237 
       
   238     def synchronize(self, mtime=None):
       
   239         """synchronize content known by this repository with content in the
       
   240         external repository
       
   241         """
       
   242         self.info('synchronizing remote %s source %s', (self.CNX_TYPE, self.uri))
       
   243         cnx = self.get_connection()
       
   244         try:
       
   245             extrepo = cnx._repo
       
   246         except AttributeError:
       
   247             # fake connection wrapper returned when we can't connect to the
       
   248             # external source (hence we've no chance to synchronize...)
       
   249             return
       
   250         etypes = self.support_entities.keys()
       
   251         if mtime is None:
       
   252             mtime = self.latest_retrieval
       
   253         updatetime, modified, deleted = extrepo.entities_modified_since(
       
   254             etypes, mtime)
       
   255         self._query_cache.clear()
       
   256         repo = self.repo
       
   257         session = repo.internal_session()
       
   258         source = repo.system_source
       
   259         try:
       
   260             for etype, extid in modified:
       
   261                 try:
       
   262                     eid = self.local_eid(cnx, extid, session)[0]
       
   263                     if eid is not None:
       
   264                         rset = session.eid_rset(eid, etype)
       
   265                         entity = rset.get_entity(0, 0)
       
   266                         entity.complete(entity.e_schema.indexable_attributes())
       
   267                         source.index_entity(session, entity)
       
   268                 except Exception:
       
   269                     self.exception('while updating %s with external id %s of source %s',
       
   270                                    etype, extid, self.uri)
       
   271                     continue
       
   272             for etype, extid in deleted:
       
   273                 try:
       
   274                     eid = self.repo.extid2eid(self, str(extid), etype, session,
       
   275                                               insert=False)
       
   276                     # entity has been deleted from external repository but is not known here
       
   277                     if eid is not None:
       
   278                         entity = session.entity_from_eid(eid, etype)
       
   279                         repo.delete_info(session, entity, self.uri,
       
   280                                          scleanup=self.eid)
       
   281                 except Exception:
       
   282                     if self.repo.config.mode == 'test':
       
   283                         raise
       
   284                     self.exception('while updating %s with external id %s of source %s',
       
   285                                    etype, extid, self.uri)
       
   286                     continue
       
   287             self.latest_retrieval = updatetime
       
   288             session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   289                             {'x': self.eid, 'date': self.latest_retrieval})
       
   290             session.commit()
       
   291         finally:
       
   292             session.close()
       
   293 
       
   294     def get_connection(self):
       
   295         raise NotImplementedError()
       
   296 
       
   297     def check_connection(self, cnx):
       
   298         """check connection validity, return None if the connection is still valid
       
   299         else a new connection
       
   300         """
       
   301         if not isinstance(cnx, ConnectionWrapper):
       
   302             try:
       
   303                 cnx.check()
       
   304                 return # ok
       
   305             except (BadConnectionId, ConnectionClosedError):
       
   306                 pass
       
   307         # try to reconnect
       
   308         return self.get_connection()
       
   309 
       
   310     def syntax_tree_search(self, session, union, args=None, cachekey=None,
       
   311                            varmap=None):
       
   312         assert dbg_st_search(self.uri, union, varmap, args, cachekey)
       
   313         rqlkey = union.as_string(kwargs=args)
       
   314         try:
       
   315             results = self._query_cache[rqlkey]
       
   316         except KeyError:
       
   317             results = self._syntax_tree_search(session, union, args)
       
   318             self._query_cache[rqlkey] = results
       
   319         assert dbg_results(results)
       
   320         return results
       
   321 
       
   322     def _syntax_tree_search(self, session, union, args):
       
   323         """return result from this source for a rql query (actually from a rql
       
   324         syntax tree and a solution dictionary mapping each used variable to a
       
   325         possible type). If cachekey is given, the query necessary to fetch the
       
   326         results (but not the results themselves) may be cached using this key.
       
   327         """
       
   328         if not args is None:
       
   329             args = args.copy()
       
   330         # get cached cursor anyway
       
   331         cu = session.cnxset[self.uri]
       
   332         if cu is None:
       
   333             # this is a ConnectionWrapper instance
       
   334             msg = session._("can't connect to source %s, some data may be missing")
       
   335             session.set_shared_data('sources_error', msg % self.uri, txdata=True)
       
   336             return []
       
   337         translator = RQL2RQL(self)
       
   338         try:
       
   339             rql = translator.generate(session, union, args)
       
   340         except UnknownEid, ex:
       
   341             if server.DEBUG:
       
   342                 print '  unknown eid', ex, 'no results'
       
   343             return []
       
   344         if server.DEBUG & server.DBG_RQL:
       
   345             print '  translated rql', rql
       
   346         try:
       
   347             rset = cu.execute(rql, args)
       
   348         except Exception, ex:
       
   349             self.exception(str(ex))
       
   350             msg = session._("error while querying source %s, some data may be missing")
       
   351             session.set_shared_data('sources_error', msg % self.uri, txdata=True)
       
   352             return []
       
   353         descr = rset.description
       
   354         if rset:
       
   355             needtranslation = []
       
   356             rows = rset.rows
       
   357             for i, etype in enumerate(descr[0]):
       
   358                 if (etype is None or not self.schema.eschema(etype).final
       
   359                     or uidtype(union, i, etype, args)):
       
   360                     needtranslation.append(i)
       
   361             if needtranslation:
       
   362                 cnx = session.cnxset.connection(self.uri)
       
   363                 for rowindex in xrange(rset.rowcount - 1, -1, -1):
       
   364                     row = rows[rowindex]
       
   365                     localrow = False
       
   366                     for colindex in needtranslation:
       
   367                         if row[colindex] is not None: # optional variable
       
   368                             eid, local = self.local_eid(cnx, row[colindex], session)
       
   369                             if local:
       
   370                                 localrow = True
       
   371                             if eid is not None:
       
   372                                 row[colindex] = eid
       
   373                             else:
       
   374                                 # skip this row
       
   375                                 del rows[rowindex]
       
   376                                 del descr[rowindex]
       
   377                                 break
       
   378                     else:
       
   379                         # skip row if it only contains eids of entities which
       
   380                         # are actually from a source we also know locally,
       
   381                         # except if some args specified (XXX should actually
       
   382                         # check if there are some args local to the source)
       
   383                         if not (translator.has_local_eid or localrow):
       
   384                             del rows[rowindex]
       
   385                             del descr[rowindex]
       
   386             results = rows
       
   387         else:
       
   388             results = []
       
   389         return results
       
   390 
       
   391     def _entity_relations_and_kwargs(self, session, entity):
       
   392         relations = []
       
   393         kwargs = {'x': self.repo.eid2extid(self, entity.eid, session)}
       
   394         for key, val in entity.cw_attr_cache.iteritems():
       
   395             relations.append('X %s %%(%s)s' % (key, key))
       
   396             kwargs[key] = val
       
   397         return relations, kwargs
       
   398 
       
   399     def add_entity(self, session, entity):
       
   400         """add a new entity to the source"""
       
   401         raise NotImplementedError()
       
   402 
       
   403     def update_entity(self, session, entity):
       
   404         """update an entity in the source"""
       
   405         relations, kwargs = self._entity_relations_and_kwargs(session, entity)
       
   406         cu = session.cnxset[self.uri]
       
   407         cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs)
       
   408         self._query_cache.clear()
       
   409         entity.cw_clear_all_caches()
       
   410 
       
   411     def delete_entity(self, session, entity):
       
   412         """delete an entity from the source"""
       
   413         if session.deleted_in_transaction(self.eid):
       
   414             # source is being deleted, don't propagate
       
   415             self._query_cache.clear()
       
   416             return
       
   417         cu = session.cnxset[self.uri]
       
   418         cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__,
       
   419                    {'x': self.repo.eid2extid(self, entity.eid, session)})
       
   420         self._query_cache.clear()
       
   421 
       
   422     def add_relation(self, session, subject, rtype, object):
       
   423         """add a relation to the source"""
       
   424         cu = session.cnxset[self.uri]
       
   425         cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   426                    {'x': self.repo.eid2extid(self, subject, session),
       
   427                     'y': self.repo.eid2extid(self, object, session)})
       
   428         self._query_cache.clear()
       
   429         session.entity_from_eid(subject).cw_clear_all_caches()
       
   430         session.entity_from_eid(object).cw_clear_all_caches()
       
   431 
       
   432     def delete_relation(self, session, subject, rtype, object):
       
   433         """delete a relation from the source"""
       
   434         if session.deleted_in_transaction(self.eid):
       
   435             # source is being deleted, don't propagate
       
   436             self._query_cache.clear()
       
   437             return
       
   438         cu = session.cnxset[self.uri]
       
   439         cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   440                    {'x': self.repo.eid2extid(self, subject, session),
       
   441                     'y': self.repo.eid2extid(self, object, session)})
       
   442         self._query_cache.clear()
       
   443         session.entity_from_eid(subject).cw_clear_all_caches()
       
   444         session.entity_from_eid(object).cw_clear_all_caches()
       
   445 
       
   446 
       
   447 class RQL2RQL(object):
       
   448     """translate a local rql query to be executed on a distant repository"""
       
   449     def __init__(self, source):
       
   450         self.source = source
       
   451         self.repo = source.repo
       
   452         self.current_operator = None
       
   453 
       
   454     def _accept_children(self, node):
       
   455         res = []
       
   456         for child in node.children:
       
   457             rql = child.accept(self)
       
   458             if rql is not None:
       
   459                 res.append(rql)
       
   460         return res
       
   461 
       
   462     def generate(self, session, rqlst, args):
       
   463         self._session = session
       
   464         self.kwargs = args
       
   465         self.need_translation = False
       
   466         self.has_local_eid = False
       
   467         return self.visit_union(rqlst)
       
   468 
       
   469     def visit_union(self, node):
       
   470         s = self._accept_children(node)
       
   471         if len(s) > 1:
       
   472             return ' UNION '.join('(%s)' % q for q in s)
       
   473         return s[0]
       
   474 
       
   475     def visit_select(self, node):
       
   476         """return the tree as an encoded rql string"""
       
   477         self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
       
   478         self._const_var = {}
       
   479         if node.distinct:
       
   480             base = 'DISTINCT Any'
       
   481         else:
       
   482             base = 'Any'
       
   483         s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))]
       
   484         if node.groupby:
       
   485             s.append('GROUPBY %s' % ', '.join(group.accept(self)
       
   486                                               for group in node.groupby))
       
   487         if node.orderby:
       
   488             s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term)
       
   489                                               for term in node.orderby))
       
   490         if node.limit is not None:
       
   491             s.append('LIMIT %s' % node.limit)
       
   492         if node.offset:
       
   493             s.append('OFFSET %s' % node.offset)
       
   494         restrictions = []
       
   495         if node.where is not None:
       
   496             nr = node.where.accept(self)
       
   497             if nr is not None:
       
   498                 restrictions.append(nr)
       
   499         if restrictions:
       
   500             s.append('WHERE %s' % ','.join(restrictions))
       
   501 
       
   502         if node.having:
       
   503             s.append('HAVING %s' % ', '.join(term.accept(self)
       
   504                                              for term in node.having))
       
   505         subqueries = []
       
   506         for subquery in node.with_:
       
   507             subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
       
   508                                                  self.visit_union(subquery.query)))
       
   509         if subqueries:
       
   510             s.append('WITH %s' % (','.join(subqueries)))
       
   511         return ' '.join(s)
       
   512 
       
   513     def visit_and(self, node):
       
   514         res = self._accept_children(node)
       
   515         if res:
       
   516             return ', '.join(res)
       
   517         return
       
   518 
       
   519     def visit_or(self, node):
       
   520         res = self._accept_children(node)
       
   521         if len(res) > 1:
       
   522             return ' OR '.join('(%s)' % rql for rql in res)
       
   523         elif res:
       
   524             return res[0]
       
   525         return
       
   526 
       
   527     def visit_not(self, node):
       
   528         rql = node.children[0].accept(self)
       
   529         if rql:
       
   530             return 'NOT (%s)' % rql
       
   531         return
       
   532 
       
   533     def visit_exists(self, node):
       
   534         rql = node.children[0].accept(self)
       
   535         if rql:
       
   536             return 'EXISTS(%s)' % rql
       
   537         return
       
   538 
       
   539     def visit_relation(self, node):
       
   540         try:
       
   541             if isinstance(node.children[0], Constant):
       
   542                 # simplified rqlst, reintroduce eid relation
       
   543                 try:
       
   544                     restr, lhs = self.process_eid_const(node.children[0])
       
   545                 except UnknownEid:
       
   546                     # can safely skip not relation with an unsupported eid
       
   547                     if neged_relation(node):
       
   548                         return
       
   549                     raise
       
   550             else:
       
   551                 lhs = node.children[0].accept(self)
       
   552                 restr = None
       
   553         except UnknownEid:
       
   554             # can safely skip not relation with an unsupported eid
       
   555             if neged_relation(node):
       
   556                 return
       
   557             # XXX what about optional relation or outer NOT EXISTS()
       
   558             raise
       
   559         if node.optional in ('left', 'both'):
       
   560             lhs += '?'
       
   561         if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final:
       
   562             self.need_translation = True
       
   563             self.current_operator = node.operator()
       
   564             if isinstance(node.children[0], Constant):
       
   565                 self.current_etypes = (node.children[0].uidtype,)
       
   566             else:
       
   567                 self.current_etypes = node.children[0].variable.stinfo['possibletypes']
       
   568         try:
       
   569             rhs = node.children[1].accept(self)
       
   570         except UnknownEid:
       
   571             # can safely skip not relation with an unsupported eid
       
   572             if neged_relation(node):
       
   573                 return
       
   574             # XXX what about optional relation or outer NOT EXISTS()
       
   575             raise
       
   576         except ReplaceByInOperator, ex:
       
   577             rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids)
       
   578         self.need_translation = False
       
   579         self.current_operator = None
       
   580         if node.optional in ('right', 'both'):
       
   581             rhs += '?'
       
   582         if restr is not None:
       
   583             return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
       
   584         return '%s %s %s' % (lhs, node.r_type, rhs)
       
   585 
       
   586     def visit_comparison(self, node):
       
   587         if node.operator in ('=', 'IS'):
       
   588             return node.children[0].accept(self)
       
   589         return '%s %s' % (node.operator.encode(),
       
   590                           node.children[0].accept(self))
       
   591 
       
   592     def visit_mathexpression(self, node):
       
   593         return '(%s %s %s)' % (node.children[0].accept(self),
       
   594                                node.operator.encode(),
       
   595                                node.children[1].accept(self))
       
   596 
       
   597     def visit_function(self, node):
       
   598         #if node.name == 'IN':
       
   599         res = []
       
   600         for child in node.children:
       
   601             try:
       
   602                 rql = child.accept(self)
       
   603             except UnknownEid, ex:
       
   604                 continue
       
   605             res.append(rql)
       
   606         if not res:
       
   607             raise ex
       
   608         return '%s(%s)' % (node.name, ', '.join(res))
       
   609 
       
   610     def visit_constant(self, node):
       
   611         if self.need_translation or node.uidtype:
       
   612             if node.type == 'Int':
       
   613                 self.has_local_eid = True
       
   614                 return str(self.eid2extid(node.value))
       
   615             if node.type == 'Substitute':
       
   616                 key = node.value
       
   617                 # ensure we have not yet translated the value...
       
   618                 if not key in self._const_var:
       
   619                     self.kwargs[key] = self.eid2extid(self.kwargs[key])
       
   620                     self._const_var[key] = None
       
   621                     self.has_local_eid = True
       
   622         return node.as_string()
       
   623 
       
   624     def visit_variableref(self, node):
       
   625         """get the sql name for a variable reference"""
       
   626         return node.name
       
   627 
       
   628     def visit_sortterm(self, node):
       
   629         if node.asc:
       
   630             return node.term.accept(self)
       
   631         return '%s DESC' % node.term.accept(self)
       
   632 
       
   633     def process_eid_const(self, const):
       
   634         value = const.eval(self.kwargs)
       
   635         try:
       
   636             return None, self._const_var[value]
       
   637         except Exception:
       
   638             var = self._varmaker.next()
       
   639             self.need_translation = True
       
   640             restr = '%s eid %s' % (var, self.visit_constant(const))
       
   641             self.need_translation = False
       
   642             self._const_var[value] = var
       
   643             return restr, var
       
   644 
       
   645     def eid2extid(self, eid):
       
   646         try:
       
   647             return self.repo.eid2extid(self.source, eid, self._session)
       
   648         except UnknownEid:
       
   649             operator = self.current_operator
       
   650             if operator is not None and operator != '=':
       
   651                 # deal with query like "X eid > 12"
       
   652                 #
       
   653                 # The problem is that eid order in the external source may
       
   654                 # differ from the local source
       
   655                 #
       
   656                 # So search for all eids from this source matching the condition
       
   657                 # locally and then to replace the "> 12" branch by "IN (eids)"
       
   658                 #
       
   659                 # XXX we may have to insert a huge number of eids...)
       
   660                 sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"
       
   661                 etypes = ','.join("'%s'" % etype for etype in self.current_etypes)
       
   662                 cu = self._session.system_sql(sql % (self.source.uri, etypes,
       
   663                                                       operator, eid))
       
   664                 # XXX buggy cu.rowcount which may be zero while there are some
       
   665                 # results
       
   666                 rows = cu.fetchall()
       
   667                 if rows:
       
   668                     raise ReplaceByInOperator((b64decode(r[0]) for r in rows))
       
   669             raise
       
   670