server/sources/pyrorql.py
changeset 8354 a9984ceebc26
parent 8244 c7d89541e3c5
child 8545 eb7a171cec72
equal deleted inserted replaced
8353:c1cc2f1cd177 8354:a9984ceebc26
    19 
    19 
    20 __docformat__ = "restructuredtext en"
    20 __docformat__ = "restructuredtext en"
    21 _ = unicode
    21 _ = unicode
    22 
    22 
    23 import threading
    23 import threading
    24 from os.path import join
       
    25 from time import mktime
       
    26 from datetime import datetime
       
    27 from base64 import b64decode
       
    28 
       
    29 from Pyro.errors import PyroError, ConnectionClosedError
    24 from Pyro.errors import PyroError, ConnectionClosedError
    30 
    25 
    31 from logilab.common.configuration import REQUIRED
    26 from logilab.common.configuration import REQUIRED
    32 from logilab.common.optik_ext import check_yn
       
    33 
    27 
    34 from yams.schema import role_name
    28 from cubicweb import dbapi
       
    29 from cubicweb import ConnectionError
       
    30 from cubicweb.server.sources import ConnectionWrapper
    35 
    31 
    36 from rql.nodes import Constant
    32 from cubicweb.server.sources.remoterql import RemoteSource
    37 from rql.utils import rqlvar_maker
       
    38 
    33 
    39 from cubicweb import dbapi, server
    34 class PyroRQLSource(RemoteSource):
    40 from cubicweb import ValidationError, BadConnectionId, UnknownEid, ConnectionError
       
    41 from cubicweb.schema import VIRTUAL_RTYPES
       
    42 from cubicweb.cwconfig import register_persistent_options
       
    43 from cubicweb.server.sources import (AbstractSource, ConnectionWrapper,
       
    44                                      TimedCache, dbg_st_search, dbg_results)
       
    45 from cubicweb.server.msplanner import neged_relation
       
    46 
       
    47 def uidtype(union, col, etype, args):
       
    48     select, col = union.locate_subquery(col, etype, args)
       
    49     return getattr(select.selection[col], 'uidtype', None)
       
    50 
       
    51 
       
    52 class ReplaceByInOperator(Exception):
       
    53     def __init__(self, eids):
       
    54         self.eids = eids
       
    55 
       
    56 class PyroRQLSource(AbstractSource):
       
    57     """External repository source, using Pyro connection"""
    35     """External repository source, using Pyro connection"""
    58 
    36 
    59     # boolean telling if modification hooks should be called when something is
    37     CNX_TYPE = 'pyro'
    60     # modified in this source
       
    61     should_call_hooks = False
       
    62     # boolean telling if the repository should connect to this source during
       
    63     # migration
       
    64     connect_for_migration = False
       
    65 
    38 
    66     options = (
    39     options = RemoteSource.options + (
    67         # XXX pyro-ns host/port
    40         # XXX pyro-ns host/port
    68         ('pyro-ns-id',
    41         ('pyro-ns-id',
    69          {'type' : 'string',
    42          {'type' : 'string',
    70           'default': REQUIRED,
    43           'default': REQUIRED,
    71           'help': 'identifier of the repository in the pyro name server',
    44           'help': 'identifier of the repository in the pyro name server',
    72           'group': 'pyro-source', 'level': 0,
    45           'group': 'remote-source', 'level': 0,
    73           }),
       
    74         ('cubicweb-user',
       
    75          {'type' : 'string',
       
    76           'default': REQUIRED,
       
    77           'help': 'user to use for connection on the distant repository',
       
    78           'group': 'pyro-source', 'level': 0,
       
    79           }),
       
    80         ('cubicweb-password',
       
    81          {'type' : 'password',
       
    82           'default': '',
       
    83           'help': 'user to use for connection on the distant repository',
       
    84           'group': 'pyro-source', 'level': 0,
       
    85           }),
       
    86         ('base-url',
       
    87          {'type' : 'string',
       
    88           'default': '',
       
    89           'help': 'url of the web site for the distant repository, if you want '
       
    90           'to generate external link to entities from this repository',
       
    91           'group': 'pyro-source', 'level': 1,
       
    92           }),
       
    93         ('skip-external-entities',
       
    94          {'type' : 'yn',
       
    95           'default': False,
       
    96           'help': 'should entities not local to the source be considered or not',
       
    97           'group': 'pyro-source', 'level': 0,
       
    98           }),
    46           }),
    99         ('pyro-ns-host',
    47         ('pyro-ns-host',
   100          {'type' : 'string',
    48          {'type' : 'string',
   101           'default': None,
    49           'default': None,
   102           'help': 'Pyro name server\'s host. If not set, default to the value \
    50           'help': 'Pyro name server\'s host. If not set, default to the value \
   103 from all_in_one.conf. It may contains port information using <host>:<port> notation.',
    51 from all_in_one.conf. It may contains port information using <host>:<port> notation.',
   104           'group': 'pyro-source', 'level': 1,
    52           'group': 'remote-source', 'level': 1,
   105           }),
    53           }),
   106         ('pyro-ns-group',
    54         ('pyro-ns-group',
   107          {'type' : 'string',
    55          {'type' : 'string',
   108           'default': None,
    56           'default': None,
   109           'help': 'Pyro name server\'s group where the repository will be \
    57           'help': 'Pyro name server\'s group where the repository will be \
   110 registered. If not set, default to the value from all_in_one.conf.',
    58 registered. If not set, default to the value from all_in_one.conf.',
   111           'group': 'pyro-source', 'level': 2,
    59           'group': 'remote-source', 'level': 2,
   112           }),
    60           }),
   113         ('synchronization-interval',
       
   114          {'type' : 'time',
       
   115           'default': '5min',
       
   116           'help': 'interval between synchronization with the external \
       
   117 repository (default to 5 minutes).',
       
   118           'group': 'pyro-source', 'level': 2,
       
   119           }),
       
   120 
       
   121     )
    61     )
   122 
       
   123     PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
       
   124     _conn = None
       
   125 
       
   126     def __init__(self, repo, source_config, eid=None):
       
   127         AbstractSource.__init__(self, repo, source_config, eid)
       
   128         self.update_config(None, self.check_conf_dict(eid, source_config,
       
   129                                                       fail_if_unknown=False))
       
   130         self._query_cache = TimedCache(1800)
       
   131 
       
   132     def update_config(self, source_entity, processed_config):
       
   133         """update configuration from source entity"""
       
   134         # XXX get it through pyro if unset
       
   135         baseurl = processed_config.get('base-url')
       
   136         if baseurl and not baseurl.endswith('/'):
       
   137             processed_config['base-url'] += '/'
       
   138         self.config = processed_config
       
   139         self._skip_externals = processed_config['skip-external-entities']
       
   140         if source_entity is not None:
       
   141             self.latest_retrieval = source_entity.latest_retrieval
       
   142 
       
   143     def reset_caches(self):
       
   144         """method called during test to reset potential source caches"""
       
   145         self._query_cache = TimedCache(1800)
       
   146 
       
   147     def init(self, activated, source_entity):
       
   148         """method called by the repository once ready to handle request"""
       
   149         self.load_mapping(source_entity._cw)
       
   150         if activated:
       
   151             interval = self.config['synchronization-interval']
       
   152             self.repo.looping_task(interval, self.synchronize)
       
   153             self.repo.looping_task(self._query_cache.ttl.seconds/10,
       
   154                                    self._query_cache.clear_expired)
       
   155             self.latest_retrieval = source_entity.latest_retrieval
       
   156 
       
   157     def load_mapping(self, session=None):
       
   158         self.support_entities = {}
       
   159         self.support_relations = {}
       
   160         self.dont_cross_relations = set(('owned_by', 'created_by'))
       
   161         self.cross_relations = set()
       
   162         assert self.eid is not None
       
   163         self._schemacfg_idx = {}
       
   164         self._load_mapping(session)
       
   165 
       
   166     etype_options = set(('write',))
       
   167     rtype_options = set(('maycross', 'dontcross', 'write',))
       
   168 
       
   169     def _check_options(self, schemacfg, allowedoptions):
       
   170         if schemacfg.options:
       
   171             options = set(w.strip() for w in schemacfg.options.split(':'))
       
   172         else:
       
   173             options = set()
       
   174         if options - allowedoptions:
       
   175             options = ', '.join(sorted(options - allowedoptions))
       
   176             msg = _('unknown option(s): %s' % options)
       
   177             raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
       
   178         return options
       
   179 
       
   180     def add_schema_config(self, schemacfg, checkonly=False):
       
   181         """added CWSourceSchemaConfig, modify mapping accordingly"""
       
   182         try:
       
   183             ertype = schemacfg.schema.name
       
   184         except AttributeError:
       
   185             msg = schemacfg._cw._("attribute/relation can't be mapped, only "
       
   186                                   "entity and relation types")
       
   187             raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
       
   188         if schemacfg.schema.__regid__ == 'CWEType':
       
   189             options = self._check_options(schemacfg, self.etype_options)
       
   190             if not checkonly:
       
   191                 self.support_entities[ertype] = 'write' in options
       
   192         else: # CWRType
       
   193             if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES:
       
   194                 msg = schemacfg._cw._('%s relation should not be in mapped') % ertype
       
   195                 raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
       
   196             options = self._check_options(schemacfg, self.rtype_options)
       
   197             if 'dontcross' in options:
       
   198                 if 'maycross' in options:
       
   199                     msg = schemacfg._("can't mix dontcross and maycross options")
       
   200                     raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
       
   201                 if 'write' in options:
       
   202                     msg = schemacfg._("can't mix dontcross and write options")
       
   203                     raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
       
   204                 if not checkonly:
       
   205                     self.dont_cross_relations.add(ertype)
       
   206             elif not checkonly:
       
   207                 self.support_relations[ertype] = 'write' in options
       
   208                 if 'maycross' in options:
       
   209                     self.cross_relations.add(ertype)
       
   210         if not checkonly:
       
   211             # add to an index to ease deletion handling
       
   212             self._schemacfg_idx[schemacfg.eid] = ertype
       
   213 
       
   214     def del_schema_config(self, schemacfg, checkonly=False):
       
   215         """deleted CWSourceSchemaConfig, modify mapping accordingly"""
       
   216         if checkonly:
       
   217             return
       
   218         try:
       
   219             ertype = self._schemacfg_idx[schemacfg.eid]
       
   220             if ertype[0].isupper():
       
   221                 del self.support_entities[ertype]
       
   222             else:
       
   223                 if ertype in self.support_relations:
       
   224                     del self.support_relations[ertype]
       
   225                     if ertype in self.cross_relations:
       
   226                         self.cross_relations.remove(ertype)
       
   227                 else:
       
   228                     self.dont_cross_relations.remove(ertype)
       
   229         except Exception:
       
   230             self.error('while updating mapping consequently to removal of %s',
       
   231                        schemacfg)
       
   232 
       
   233     def local_eid(self, cnx, extid, session):
       
   234         etype, dexturi, dextid = cnx.describe(extid)
       
   235         if dexturi == 'system' or not (
       
   236             dexturi in self.repo.sources_by_uri or self._skip_externals):
       
   237             assert etype in self.support_entities, etype
       
   238             eid = self.repo.extid2eid(self, str(extid), etype, session)
       
   239             if eid > 0:
       
   240                 return eid, True
       
   241         elif dexturi in self.repo.sources_by_uri:
       
   242             source = self.repo.sources_by_uri[dexturi]
       
   243             cnx = session.cnxset.connection(source.uri)
       
   244             eid = source.local_eid(cnx, dextid, session)[0]
       
   245             return eid, False
       
   246         return None, None
       
   247 
       
   248     def synchronize(self, mtime=None):
       
   249         """synchronize content known by this repository with content in the
       
   250         external repository
       
   251         """
       
   252         self.info('synchronizing pyro source %s', self.uri)
       
   253         cnx = self.get_connection()
       
   254         try:
       
   255             extrepo = cnx._repo
       
   256         except AttributeError:
       
   257             # fake connection wrapper returned when we can't connect to the
       
   258             # external source (hence we've no chance to synchronize...)
       
   259             return
       
   260         etypes = self.support_entities.keys()
       
   261         if mtime is None:
       
   262             mtime = self.latest_retrieval
       
   263         updatetime, modified, deleted = extrepo.entities_modified_since(
       
   264             etypes, mtime)
       
   265         self._query_cache.clear()
       
   266         repo = self.repo
       
   267         session = repo.internal_session()
       
   268         source = repo.system_source
       
   269         try:
       
   270             for etype, extid in modified:
       
   271                 try:
       
   272                     eid = self.local_eid(cnx, extid, session)[0]
       
   273                     if eid is not None:
       
   274                         rset = session.eid_rset(eid, etype)
       
   275                         entity = rset.get_entity(0, 0)
       
   276                         entity.complete(entity.e_schema.indexable_attributes())
       
   277                         source.index_entity(session, entity)
       
   278                 except Exception:
       
   279                     self.exception('while updating %s with external id %s of source %s',
       
   280                                    etype, extid, self.uri)
       
   281                     continue
       
   282             for etype, extid in deleted:
       
   283                 try:
       
   284                     eid = self.repo.extid2eid(self, str(extid), etype, session,
       
   285                                               insert=False)
       
   286                     # entity has been deleted from external repository but is not known here
       
   287                     if eid is not None:
       
   288                         entity = session.entity_from_eid(eid, etype)
       
   289                         repo.delete_info(session, entity, self.uri,
       
   290                                          scleanup=self.eid)
       
   291                 except Exception:
       
   292                     if self.repo.config.mode == 'test':
       
   293                         raise
       
   294                     self.exception('while updating %s with external id %s of source %s',
       
   295                                    etype, extid, self.uri)
       
   296                     continue
       
   297             self.latest_retrieval = updatetime
       
   298             session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
       
   299                             {'x': self.eid, 'date': self.latest_retrieval})
       
   300             session.commit()
       
   301         finally:
       
   302             session.close()
       
   303 
    62 
   304     def _get_connection(self):
    63     def _get_connection(self):
   305         """open and return a connection to the source"""
    64         """open and return a connection to the source"""
   306         nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
    65         nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
   307         nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
    66         nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
   308         self.info('connecting to instance :%s.%s for user %s',
    67         self.info('connecting to instance :%s.%s for user %s',
   309                   nsgroup, self.config['pyro-ns-id'], self.config['cubicweb-user'])
    68                   nsgroup, self.config['pyro-ns-id'], self.config['cubicweb-user'])
   310         #cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])
       
   311         return dbapi.connect(database=self.config['pyro-ns-id'],
    69         return dbapi.connect(database=self.config['pyro-ns-id'],
   312                              login=self.config['cubicweb-user'],
    70                              login=self.config['cubicweb-user'],
   313                              password=self.config['cubicweb-password'],
    71                              password=self.config['cubicweb-password'],
   314                              host=nshost, group=nsgroup,
    72                              host=nshost, group=nsgroup,
   315                              setvreg=False) #cnxprops=cnxprops)
    73                              setvreg=False)
   316 
    74 
   317     def get_connection(self):
    75     def get_connection(self):
   318         try:
    76         try:
   319             return self._get_connection()
    77             return self._get_connection()
   320         except (ConnectionError, PyroError), ex:
    78         except (ConnectionError, PyroError), ex:
   331         try:
    89         try:
   332             cnx._repo._transferThread(threading.currentThread())
    90             cnx._repo._transferThread(threading.currentThread())
   333         except AttributeError:
    91         except AttributeError:
   334             # inmemory connection
    92             # inmemory connection
   335             pass
    93             pass
   336         if not isinstance(cnx, ConnectionWrapper):
    94         return super(PyroRQLSource, self).check_connection(cnx)
   337             try:
       
   338                 cnx.check()
       
   339                 return # ok
       
   340             except (BadConnectionId, ConnectionClosedError):
       
   341                 pass
       
   342         # try to reconnect
       
   343         return self.get_connection()
       
   344 
    95 
   345     def syntax_tree_search(self, session, union, args=None, cachekey=None,
       
   346                            varmap=None):
       
   347         assert dbg_st_search(self.uri, union, varmap, args, cachekey)
       
   348         rqlkey = union.as_string(kwargs=args)
       
   349         try:
       
   350             results = self._query_cache[rqlkey]
       
   351         except KeyError:
       
   352             results = self._syntax_tree_search(session, union, args)
       
   353             self._query_cache[rqlkey] = results
       
   354         assert dbg_results(results)
       
   355         return results
       
   356 
       
   357     def _syntax_tree_search(self, session, union, args):
       
   358         """return result from this source for a rql query (actually from a rql
       
   359         syntax tree and a solution dictionary mapping each used variable to a
       
   360         possible type). If cachekey is given, the query necessary to fetch the
       
   361         results (but not the results themselves) may be cached using this key.
       
   362         """
       
   363         if not args is None:
       
   364             args = args.copy()
       
   365         # get cached cursor anyway
       
   366         cu = session.cnxset[self.uri]
       
   367         if cu is None:
       
   368             # this is a ConnectionWrapper instance
       
   369             msg = session._("can't connect to source %s, some data may be missing")
       
   370             session.set_shared_data('sources_error', msg % self.uri, txdata=True)
       
   371             return []
       
   372         translator = RQL2RQL(self)
       
   373         try:
       
   374             rql = translator.generate(session, union, args)
       
   375         except UnknownEid, ex:
       
   376             if server.DEBUG:
       
   377                 print '  unknown eid', ex, 'no results'
       
   378             return []
       
   379         if server.DEBUG & server.DBG_RQL:
       
   380             print '  translated rql', rql
       
   381         try:
       
   382             rset = cu.execute(rql, args)
       
   383         except Exception, ex:
       
   384             self.exception(str(ex))
       
   385             msg = session._("error while querying source %s, some data may be missing")
       
   386             session.set_shared_data('sources_error', msg % self.uri, txdata=True)
       
   387             return []
       
   388         descr = rset.description
       
   389         if rset:
       
   390             needtranslation = []
       
   391             rows = rset.rows
       
   392             for i, etype in enumerate(descr[0]):
       
   393                 if (etype is None or not self.schema.eschema(etype).final
       
   394                     or uidtype(union, i, etype, args)):
       
   395                     needtranslation.append(i)
       
   396             if needtranslation:
       
   397                 cnx = session.cnxset.connection(self.uri)
       
   398                 for rowindex in xrange(rset.rowcount - 1, -1, -1):
       
   399                     row = rows[rowindex]
       
   400                     localrow = False
       
   401                     for colindex in needtranslation:
       
   402                         if row[colindex] is not None: # optional variable
       
   403                             eid, local = self.local_eid(cnx, row[colindex], session)
       
   404                             if local:
       
   405                                 localrow = True
       
   406                             if eid is not None:
       
   407                                 row[colindex] = eid
       
   408                             else:
       
   409                                 # skip this row
       
   410                                 del rows[rowindex]
       
   411                                 del descr[rowindex]
       
   412                                 break
       
   413                     else:
       
   414                         # skip row if it only contains eids of entities which
       
   415                         # are actually from a source we also know locally,
       
   416                         # except if some args specified (XXX should actually
       
   417                         # check if there are some args local to the source)
       
   418                         if not (translator.has_local_eid or localrow):
       
   419                             del rows[rowindex]
       
   420                             del descr[rowindex]
       
   421             results = rows
       
   422         else:
       
   423             results = []
       
   424         return results
       
   425 
       
   426     def _entity_relations_and_kwargs(self, session, entity):
       
   427         relations = []
       
   428         kwargs = {'x': self.repo.eid2extid(self, entity.eid, session)}
       
   429         for key, val in entity.cw_attr_cache.iteritems():
       
   430             relations.append('X %s %%(%s)s' % (key, key))
       
   431             kwargs[key] = val
       
   432         return relations, kwargs
       
   433 
       
   434     def add_entity(self, session, entity):
       
   435         """add a new entity to the source"""
       
   436         raise NotImplementedError()
       
   437 
       
   438     def update_entity(self, session, entity):
       
   439         """update an entity in the source"""
       
   440         relations, kwargs = self._entity_relations_and_kwargs(session, entity)
       
   441         cu = session.cnxset[self.uri]
       
   442         cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs)
       
   443         self._query_cache.clear()
       
   444         entity.cw_clear_all_caches()
       
   445 
       
   446     def delete_entity(self, session, entity):
       
   447         """delete an entity from the source"""
       
   448         if session.deleted_in_transaction(self.eid):
       
   449             # source is being deleted, don't propagate
       
   450             self._query_cache.clear()
       
   451             return
       
   452         cu = session.cnxset[self.uri]
       
   453         cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__,
       
   454                    {'x': self.repo.eid2extid(self, entity.eid, session)})
       
   455         self._query_cache.clear()
       
   456 
       
   457     def add_relation(self, session, subject, rtype, object):
       
   458         """add a relation to the source"""
       
   459         cu = session.cnxset[self.uri]
       
   460         cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   461                    {'x': self.repo.eid2extid(self, subject, session),
       
   462                     'y': self.repo.eid2extid(self, object, session)})
       
   463         self._query_cache.clear()
       
   464         session.entity_from_eid(subject).cw_clear_all_caches()
       
   465         session.entity_from_eid(object).cw_clear_all_caches()
       
   466 
       
   467     def delete_relation(self, session, subject, rtype, object):
       
   468         """delete a relation from the source"""
       
   469         if session.deleted_in_transaction(self.eid):
       
   470             # source is being deleted, don't propagate
       
   471             self._query_cache.clear()
       
   472             return
       
   473         cu = session.cnxset[self.uri]
       
   474         cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
       
   475                    {'x': self.repo.eid2extid(self, subject, session),
       
   476                     'y': self.repo.eid2extid(self, object, session)})
       
   477         self._query_cache.clear()
       
   478         session.entity_from_eid(subject).cw_clear_all_caches()
       
   479         session.entity_from_eid(object).cw_clear_all_caches()
       
   480 
       
   481 
       
   482 class RQL2RQL(object):
       
   483     """translate a local rql query to be executed on a distant repository"""
       
   484     def __init__(self, source):
       
   485         self.source = source
       
   486         self.repo = source.repo
       
   487         self.current_operator = None
       
   488 
       
   489     def _accept_children(self, node):
       
   490         res = []
       
   491         for child in node.children:
       
   492             rql = child.accept(self)
       
   493             if rql is not None:
       
   494                 res.append(rql)
       
   495         return res
       
   496 
       
   497     def generate(self, session, rqlst, args):
       
   498         self._session = session
       
   499         self.kwargs = args
       
   500         self.need_translation = False
       
   501         self.has_local_eid = False
       
   502         return self.visit_union(rqlst)
       
   503 
       
   504     def visit_union(self, node):
       
   505         s = self._accept_children(node)
       
   506         if len(s) > 1:
       
   507             return ' UNION '.join('(%s)' % q for q in s)
       
   508         return s[0]
       
   509 
       
   510     def visit_select(self, node):
       
   511         """return the tree as an encoded rql string"""
       
   512         self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
       
   513         self._const_var = {}
       
   514         if node.distinct:
       
   515             base = 'DISTINCT Any'
       
   516         else:
       
   517             base = 'Any'
       
   518         s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))]
       
   519         if node.groupby:
       
   520             s.append('GROUPBY %s' % ', '.join(group.accept(self)
       
   521                                               for group in node.groupby))
       
   522         if node.orderby:
       
   523             s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term)
       
   524                                               for term in node.orderby))
       
   525         if node.limit is not None:
       
   526             s.append('LIMIT %s' % node.limit)
       
   527         if node.offset:
       
   528             s.append('OFFSET %s' % node.offset)
       
   529         restrictions = []
       
   530         if node.where is not None:
       
   531             nr = node.where.accept(self)
       
   532             if nr is not None:
       
   533                 restrictions.append(nr)
       
   534         if restrictions:
       
   535             s.append('WHERE %s' % ','.join(restrictions))
       
   536 
       
   537         if node.having:
       
   538             s.append('HAVING %s' % ', '.join(term.accept(self)
       
   539                                              for term in node.having))
       
   540         subqueries = []
       
   541         for subquery in node.with_:
       
   542             subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
       
   543                                                  self.visit_union(subquery.query)))
       
   544         if subqueries:
       
   545             s.append('WITH %s' % (','.join(subqueries)))
       
   546         return ' '.join(s)
       
   547 
       
   548     def visit_and(self, node):
       
   549         res = self._accept_children(node)
       
   550         if res:
       
   551             return ', '.join(res)
       
   552         return
       
   553 
       
   554     def visit_or(self, node):
       
   555         res = self._accept_children(node)
       
   556         if len(res) > 1:
       
   557             return ' OR '.join('(%s)' % rql for rql in res)
       
   558         elif res:
       
   559             return res[0]
       
   560         return
       
   561 
       
   562     def visit_not(self, node):
       
   563         rql = node.children[0].accept(self)
       
   564         if rql:
       
   565             return 'NOT (%s)' % rql
       
   566         return
       
   567 
       
   568     def visit_exists(self, node):
       
   569         rql = node.children[0].accept(self)
       
   570         if rql:
       
   571             return 'EXISTS(%s)' % rql
       
   572         return
       
   573 
       
   574     def visit_relation(self, node):
       
   575         try:
       
   576             if isinstance(node.children[0], Constant):
       
   577                 # simplified rqlst, reintroduce eid relation
       
   578                 try:
       
   579                     restr, lhs = self.process_eid_const(node.children[0])
       
   580                 except UnknownEid:
       
   581                     # can safely skip not relation with an unsupported eid
       
   582                     if neged_relation(node):
       
   583                         return
       
   584                     raise
       
   585             else:
       
   586                 lhs = node.children[0].accept(self)
       
   587                 restr = None
       
   588         except UnknownEid:
       
   589             # can safely skip not relation with an unsupported eid
       
   590             if neged_relation(node):
       
   591                 return
       
   592             # XXX what about optional relation or outer NOT EXISTS()
       
   593             raise
       
   594         if node.optional in ('left', 'both'):
       
   595             lhs += '?'
       
   596         if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final:
       
   597             self.need_translation = True
       
   598             self.current_operator = node.operator()
       
   599             if isinstance(node.children[0], Constant):
       
   600                 self.current_etypes = (node.children[0].uidtype,)
       
   601             else:
       
   602                 self.current_etypes = node.children[0].variable.stinfo['possibletypes']
       
   603         try:
       
   604             rhs = node.children[1].accept(self)
       
   605         except UnknownEid:
       
   606             # can safely skip not relation with an unsupported eid
       
   607             if neged_relation(node):
       
   608                 return
       
   609             # XXX what about optional relation or outer NOT EXISTS()
       
   610             raise
       
   611         except ReplaceByInOperator, ex:
       
   612             rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids)
       
   613         self.need_translation = False
       
   614         self.current_operator = None
       
   615         if node.optional in ('right', 'both'):
       
   616             rhs += '?'
       
   617         if restr is not None:
       
   618             return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
       
   619         return '%s %s %s' % (lhs, node.r_type, rhs)
       
   620 
       
   621     def visit_comparison(self, node):
       
   622         if node.operator in ('=', 'IS'):
       
   623             return node.children[0].accept(self)
       
   624         return '%s %s' % (node.operator.encode(),
       
   625                           node.children[0].accept(self))
       
   626 
       
   627     def visit_mathexpression(self, node):
       
   628         return '(%s %s %s)' % (node.children[0].accept(self),
       
   629                                node.operator.encode(),
       
   630                                node.children[1].accept(self))
       
   631 
       
   632     def visit_function(self, node):
       
   633         #if node.name == 'IN':
       
   634         res = []
       
   635         for child in node.children:
       
   636             try:
       
   637                 rql = child.accept(self)
       
   638             except UnknownEid, ex:
       
   639                 continue
       
   640             res.append(rql)
       
   641         if not res:
       
   642             raise ex
       
   643         return '%s(%s)' % (node.name, ', '.join(res))
       
   644 
       
   645     def visit_constant(self, node):
       
   646         if self.need_translation or node.uidtype:
       
   647             if node.type == 'Int':
       
   648                 self.has_local_eid = True
       
   649                 return str(self.eid2extid(node.value))
       
   650             if node.type == 'Substitute':
       
   651                 key = node.value
       
   652                 # ensure we have not yet translated the value...
       
   653                 if not key in self._const_var:
       
   654                     self.kwargs[key] = self.eid2extid(self.kwargs[key])
       
   655                     self._const_var[key] = None
       
   656                     self.has_local_eid = True
       
   657         return node.as_string()
       
   658 
       
   659     def visit_variableref(self, node):
       
   660         """get the sql name for a variable reference"""
       
   661         return node.name
       
   662 
       
   663     def visit_sortterm(self, node):
       
   664         if node.asc:
       
   665             return node.term.accept(self)
       
   666         return '%s DESC' % node.term.accept(self)
       
   667 
       
   668     def process_eid_const(self, const):
       
   669         value = const.eval(self.kwargs)
       
   670         try:
       
   671             return None, self._const_var[value]
       
   672         except Exception:
       
   673             var = self._varmaker.next()
       
   674             self.need_translation = True
       
   675             restr = '%s eid %s' % (var, self.visit_constant(const))
       
   676             self.need_translation = False
       
   677             self._const_var[value] = var
       
   678             return restr, var
       
   679 
       
   680     def eid2extid(self, eid):
       
   681         try:
       
   682             return self.repo.eid2extid(self.source, eid, self._session)
       
   683         except UnknownEid:
       
   684             operator = self.current_operator
       
   685             if operator is not None and operator != '=':
       
   686                 # deal with query like "X eid > 12"
       
   687                 #
       
   688                 # The problem is that eid order in the external source may
       
   689                 # differ from the local source
       
   690                 #
       
   691                 # So search for all eids from this source matching the condition
       
   692                 # locally and then to replace the "> 12" branch by "IN (eids)"
       
   693                 #
       
   694                 # XXX we may have to insert a huge number of eids...)
       
   695                 sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"
       
   696                 etypes = ','.join("'%s'" % etype for etype in self.current_etypes)
       
   697                 cu = self._session.system_sql(sql % (self.source.uri, etypes,
       
   698                                                       operator, eid))
       
   699                 # XXX buggy cu.rowcount which may be zero while there are some
       
   700                 # results
       
   701                 rows = cu.fetchall()
       
   702                 if rows:
       
   703                     raise ReplaceByInOperator((b64decode(r[0]) for r in rows))
       
   704             raise
       
   705