[pyro] Refactor the pyrorql source
authorVincent Michel and Alain Leufroy
Wed, 04 Apr 2012 17:47:58 +0200
changeset 8354 a9984ceebc26
parent 8353 c1cc2f1cd177
child 8355 557295b5b68a
[pyro] Refactor the pyrorql source Extract most of the code that is not related to Pyro from PyroRQLSource in a new RemoteSource abstract class.
--- a/server/sources/pyrorql.py	Wed Apr 04 10:57:03 2012 +0200
+++ b/server/sources/pyrorql.py	Wed Apr 04 17:47:58 2012 +0200
@@ -21,298 +21,56 @@
 _ = unicode
 import threading
-from os.path import join
-from time import mktime
-from datetime import datetime
-from base64 import b64decode
 from Pyro.errors import PyroError, ConnectionClosedError
 from logilab.common.configuration import REQUIRED
-from logilab.common.optik_ext import check_yn
-from yams.schema import role_name
-from rql.nodes import Constant
-from rql.utils import rqlvar_maker
-from cubicweb import dbapi, server
-from cubicweb import ValidationError, BadConnectionId, UnknownEid, ConnectionError
-from cubicweb.schema import VIRTUAL_RTYPES
-from cubicweb.cwconfig import register_persistent_options
-from cubicweb.server.sources import (AbstractSource, ConnectionWrapper,
-                                     TimedCache, dbg_st_search, dbg_results)
-from cubicweb.server.msplanner import neged_relation
+from cubicweb import dbapi
+from cubicweb import ConnectionError
+from cubicweb.server.sources import ConnectionWrapper
-def uidtype(union, col, etype, args):
-    select, col = union.locate_subquery(col, etype, args)
-    return getattr(select.selection[col], 'uidtype', None)
+from cubicweb.server.sources.remoterql import RemoteSource
-class ReplaceByInOperator(Exception):
-    def __init__(self, eids):
-        self.eids = eids
-class PyroRQLSource(AbstractSource):
+class PyroRQLSource(RemoteSource):
     """External repository source, using Pyro connection"""
-    # boolean telling if modification hooks should be called when something is
-    # modified in this source
-    should_call_hooks = False
-    # boolean telling if the repository should connect to this source during
-    # migration
-    connect_for_migration = False
+    CNX_TYPE = 'pyro'
-    options = (
+    options = RemoteSource.options + (
         # XXX pyro-ns host/port
          {'type' : 'string',
           'default': REQUIRED,
           'help': 'identifier of the repository in the pyro name server',
-          'group': 'pyro-source', 'level': 0,
-          }),
-        ('cubicweb-user',
-         {'type' : 'string',
-          'default': REQUIRED,
-          'help': 'user to use for connection on the distant repository',
-          'group': 'pyro-source', 'level': 0,
-          }),
-        ('cubicweb-password',
-         {'type' : 'password',
-          'default': '',
-          'help': 'user to use for connection on the distant repository',
-          'group': 'pyro-source', 'level': 0,
-          }),
-        ('base-url',
-         {'type' : 'string',
-          'default': '',
-          'help': 'url of the web site for the distant repository, if you want '
-          'to generate external link to entities from this repository',
-          'group': 'pyro-source', 'level': 1,
-          }),
-        ('skip-external-entities',
-         {'type' : 'yn',
-          'default': False,
-          'help': 'should entities not local to the source be considered or not',
-          'group': 'pyro-source', 'level': 0,
+          'group': 'remote-source', 'level': 0,
          {'type' : 'string',
           'default': None,
           'help': 'Pyro name server\'s host. If not set, default to the value \
 from all_in_one.conf. It may contains port information using <host>:<port> notation.',
-          'group': 'pyro-source', 'level': 1,
+          'group': 'remote-source', 'level': 1,
          {'type' : 'string',
           'default': None,
           'help': 'Pyro name server\'s group where the repository will be \
 registered. If not set, default to the value from all_in_one.conf.',
-          'group': 'pyro-source', 'level': 2,
+          'group': 'remote-source', 'level': 2,
-        ('synchronization-interval',
-         {'type' : 'time',
-          'default': '5min',
-          'help': 'interval between synchronization with the external \
-repository (default to 5 minutes).',
-          'group': 'pyro-source', 'level': 2,
-          }),
-    PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
-    _conn = None
-    def __init__(self, repo, source_config, eid=None):
-        AbstractSource.__init__(self, repo, source_config, eid)
-        self.update_config(None, self.check_conf_dict(eid, source_config,
-                                                      fail_if_unknown=False))
-        self._query_cache = TimedCache(1800)
-    def update_config(self, source_entity, processed_config):
-        """update configuration from source entity"""
-        # XXX get it through pyro if unset
-        baseurl = processed_config.get('base-url')
-        if baseurl and not baseurl.endswith('/'):
-            processed_config['base-url'] += '/'
-        self.config = processed_config
-        self._skip_externals = processed_config['skip-external-entities']
-        if source_entity is not None:
-            self.latest_retrieval = source_entity.latest_retrieval
-    def reset_caches(self):
-        """method called during test to reset potential source caches"""
-        self._query_cache = TimedCache(1800)
-    def init(self, activated, source_entity):
-        """method called by the repository once ready to handle request"""
-        self.load_mapping(source_entity._cw)
-        if activated:
-            interval = self.config['synchronization-interval']
-            self.repo.looping_task(interval, self.synchronize)
-            self.repo.looping_task(self._query_cache.ttl.seconds/10,
-                                   self._query_cache.clear_expired)
-            self.latest_retrieval = source_entity.latest_retrieval
-    def load_mapping(self, session=None):
-        self.support_entities = {}
-        self.support_relations = {}
-        self.dont_cross_relations = set(('owned_by', 'created_by'))
-        self.cross_relations = set()
-        assert self.eid is not None
-        self._schemacfg_idx = {}
-        self._load_mapping(session)
-    etype_options = set(('write',))
-    rtype_options = set(('maycross', 'dontcross', 'write',))
-    def _check_options(self, schemacfg, allowedoptions):
-        if schemacfg.options:
-            options = set(w.strip() for w in schemacfg.options.split(':'))
-        else:
-            options = set()
-        if options - allowedoptions:
-            options = ', '.join(sorted(options - allowedoptions))
-            msg = _('unknown option(s): %s' % options)
-            raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
-        return options
-    def add_schema_config(self, schemacfg, checkonly=False):
-        """added CWSourceSchemaConfig, modify mapping accordingly"""
-        try:
-            ertype = schemacfg.schema.name
-        except AttributeError:
-            msg = schemacfg._cw._("attribute/relation can't be mapped, only "
-                                  "entity and relation types")
-            raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
-        if schemacfg.schema.__regid__ == 'CWEType':
-            options = self._check_options(schemacfg, self.etype_options)
-            if not checkonly:
-                self.support_entities[ertype] = 'write' in options
-        else: # CWRType
-            if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES:
-                msg = schemacfg._cw._('%s relation should not be in mapped') % ertype
-                raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
-            options = self._check_options(schemacfg, self.rtype_options)
-            if 'dontcross' in options:
-                if 'maycross' in options:
-                    msg = schemacfg._("can't mix dontcross and maycross options")
-                    raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
-                if 'write' in options:
-                    msg = schemacfg._("can't mix dontcross and write options")
-                    raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
-                if not checkonly:
-                    self.dont_cross_relations.add(ertype)
-            elif not checkonly:
-                self.support_relations[ertype] = 'write' in options
-                if 'maycross' in options:
-                    self.cross_relations.add(ertype)
-        if not checkonly:
-            # add to an index to ease deletion handling
-            self._schemacfg_idx[schemacfg.eid] = ertype
-    def del_schema_config(self, schemacfg, checkonly=False):
-        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
-        if checkonly:
-            return
-        try:
-            ertype = self._schemacfg_idx[schemacfg.eid]
-            if ertype[0].isupper():
-                del self.support_entities[ertype]
-            else:
-                if ertype in self.support_relations:
-                    del self.support_relations[ertype]
-                    if ertype in self.cross_relations:
-                        self.cross_relations.remove(ertype)
-                else:
-                    self.dont_cross_relations.remove(ertype)
-        except Exception:
-            self.error('while updating mapping consequently to removal of %s',
-                       schemacfg)
-    def local_eid(self, cnx, extid, session):
-        etype, dexturi, dextid = cnx.describe(extid)
-        if dexturi == 'system' or not (
-            dexturi in self.repo.sources_by_uri or self._skip_externals):
-            assert etype in self.support_entities, etype
-            eid = self.repo.extid2eid(self, str(extid), etype, session)
-            if eid > 0:
-                return eid, True
-        elif dexturi in self.repo.sources_by_uri:
-            source = self.repo.sources_by_uri[dexturi]
-            cnx = session.cnxset.connection(source.uri)
-            eid = source.local_eid(cnx, dextid, session)[0]
-            return eid, False
-        return None, None
-    def synchronize(self, mtime=None):
-        """synchronize content known by this repository with content in the
-        external repository
-        """
-        self.info('synchronizing pyro source %s', self.uri)
-        cnx = self.get_connection()
-        try:
-            extrepo = cnx._repo
-        except AttributeError:
-            # fake connection wrapper returned when we can't connect to the
-            # external source (hence we've no chance to synchronize...)
-            return
-        etypes = self.support_entities.keys()
-        if mtime is None:
-            mtime = self.latest_retrieval
-        updatetime, modified, deleted = extrepo.entities_modified_since(
-            etypes, mtime)
-        self._query_cache.clear()
-        repo = self.repo
-        session = repo.internal_session()
-        source = repo.system_source
-        try:
-            for etype, extid in modified:
-                try:
-                    eid = self.local_eid(cnx, extid, session)[0]
-                    if eid is not None:
-                        rset = session.eid_rset(eid, etype)
-                        entity = rset.get_entity(0, 0)
-                        entity.complete(entity.e_schema.indexable_attributes())
-                        source.index_entity(session, entity)
-                except Exception:
-                    self.exception('while updating %s with external id %s of source %s',
-                                   etype, extid, self.uri)
-                    continue
-            for etype, extid in deleted:
-                try:
-                    eid = self.repo.extid2eid(self, str(extid), etype, session,
-                                              insert=False)
-                    # entity has been deleted from external repository but is not known here
-                    if eid is not None:
-                        entity = session.entity_from_eid(eid, etype)
-                        repo.delete_info(session, entity, self.uri,
-                                         scleanup=self.eid)
-                except Exception:
-                    if self.repo.config.mode == 'test':
-                        raise
-                    self.exception('while updating %s with external id %s of source %s',
-                                   etype, extid, self.uri)
-                    continue
-            self.latest_retrieval = updatetime
-            session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
-                            {'x': self.eid, 'date': self.latest_retrieval})
-            session.commit()
-        finally:
-            session.close()
     def _get_connection(self):
         """open and return a connection to the source"""
         nshost = self.config.get('pyro-ns-host') or self.repo.config['pyro-ns-host']
         nsgroup = self.config.get('pyro-ns-group') or self.repo.config['pyro-ns-group']
         self.info('connecting to instance :%s.%s for user %s',
                   nsgroup, self.config['pyro-ns-id'], self.config['cubicweb-user'])
-        #cnxprops = ConnectionProperties(cnxtype=self.config['cnx-type'])
         return dbapi.connect(database=self.config['pyro-ns-id'],
                              host=nshost, group=nsgroup,
-                             setvreg=False) #cnxprops=cnxprops)
+                             setvreg=False)
     def get_connection(self):
@@ -333,373 +91,5 @@
         except AttributeError:
             # inmemory connection
-        if not isinstance(cnx, ConnectionWrapper):
-            try:
-                cnx.check()
-                return # ok
-            except (BadConnectionId, ConnectionClosedError):
-                pass
-        # try to reconnect
-        return self.get_connection()
-    def syntax_tree_search(self, session, union, args=None, cachekey=None,
-                           varmap=None):
-        assert dbg_st_search(self.uri, union, varmap, args, cachekey)
-        rqlkey = union.as_string(kwargs=args)
-        try:
-            results = self._query_cache[rqlkey]
-        except KeyError:
-            results = self._syntax_tree_search(session, union, args)
-            self._query_cache[rqlkey] = results
-        assert dbg_results(results)
-        return results
-    def _syntax_tree_search(self, session, union, args):
-        """return result from this source for a rql query (actually from a rql
-        syntax tree and a solution dictionary mapping each used variable to a
-        possible type). If cachekey is given, the query necessary to fetch the
-        results (but not the results themselves) may be cached using this key.
-        """
-        if not args is None:
-            args = args.copy()
-        # get cached cursor anyway
-        cu = session.cnxset[self.uri]
-        if cu is None:
-            # this is a ConnectionWrapper instance
-            msg = session._("can't connect to source %s, some data may be missing")
-            session.set_shared_data('sources_error', msg % self.uri, txdata=True)
-            return []
-        translator = RQL2RQL(self)
-        try:
-            rql = translator.generate(session, union, args)
-        except UnknownEid, ex:
-            if server.DEBUG:
-                print '  unknown eid', ex, 'no results'
-            return []
-        if server.DEBUG & server.DBG_RQL:
-            print '  translated rql', rql
-        try:
-            rset = cu.execute(rql, args)
-        except Exception, ex:
-            self.exception(str(ex))
-            msg = session._("error while querying source %s, some data may be missing")
-            session.set_shared_data('sources_error', msg % self.uri, txdata=True)
-            return []
-        descr = rset.description
-        if rset:
-            needtranslation = []
-            rows = rset.rows
-            for i, etype in enumerate(descr[0]):
-                if (etype is None or not self.schema.eschema(etype).final
-                    or uidtype(union, i, etype, args)):
-                    needtranslation.append(i)
-            if needtranslation:
-                cnx = session.cnxset.connection(self.uri)
-                for rowindex in xrange(rset.rowcount - 1, -1, -1):
-                    row = rows[rowindex]
-                    localrow = False
-                    for colindex in needtranslation:
-                        if row[colindex] is not None: # optional variable
-                            eid, local = self.local_eid(cnx, row[colindex], session)
-                            if local:
-                                localrow = True
-                            if eid is not None:
-                                row[colindex] = eid
-                            else:
-                                # skip this row
-                                del rows[rowindex]
-                                del descr[rowindex]
-                                break
-                    else:
-                        # skip row if it only contains eids of entities which
-                        # are actually from a source we also know locally,
-                        # except if some args specified (XXX should actually
-                        # check if there are some args local to the source)
-                        if not (translator.has_local_eid or localrow):
-                            del rows[rowindex]
-                            del descr[rowindex]
-            results = rows
-        else:
-            results = []
-        return results
-    def _entity_relations_and_kwargs(self, session, entity):
-        relations = []
-        kwargs = {'x': self.repo.eid2extid(self, entity.eid, session)}
-        for key, val in entity.cw_attr_cache.iteritems():
-            relations.append('X %s %%(%s)s' % (key, key))
-            kwargs[key] = val
-        return relations, kwargs
-    def add_entity(self, session, entity):
-        """add a new entity to the source"""
-        raise NotImplementedError()
-    def update_entity(self, session, entity):
-        """update an entity in the source"""
-        relations, kwargs = self._entity_relations_and_kwargs(session, entity)
-        cu = session.cnxset[self.uri]
-        cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs)
-        self._query_cache.clear()
-        entity.cw_clear_all_caches()
-    def delete_entity(self, session, entity):
-        """delete an entity from the source"""
-        if session.deleted_in_transaction(self.eid):
-            # source is being deleted, don't propagate
-            self._query_cache.clear()
-            return
-        cu = session.cnxset[self.uri]
-        cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__,
-                   {'x': self.repo.eid2extid(self, entity.eid, session)})
-        self._query_cache.clear()
-    def add_relation(self, session, subject, rtype, object):
-        """add a relation to the source"""
-        cu = session.cnxset[self.uri]
-        cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
-                   {'x': self.repo.eid2extid(self, subject, session),
-                    'y': self.repo.eid2extid(self, object, session)})
-        self._query_cache.clear()
-        session.entity_from_eid(subject).cw_clear_all_caches()
-        session.entity_from_eid(object).cw_clear_all_caches()
-    def delete_relation(self, session, subject, rtype, object):
-        """delete a relation from the source"""
-        if session.deleted_in_transaction(self.eid):
-            # source is being deleted, don't propagate
-            self._query_cache.clear()
-            return
-        cu = session.cnxset[self.uri]
-        cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
-                   {'x': self.repo.eid2extid(self, subject, session),
-                    'y': self.repo.eid2extid(self, object, session)})
-        self._query_cache.clear()
-        session.entity_from_eid(subject).cw_clear_all_caches()
-        session.entity_from_eid(object).cw_clear_all_caches()
-class RQL2RQL(object):
-    """translate a local rql query to be executed on a distant repository"""
-    def __init__(self, source):
-        self.source = source
-        self.repo = source.repo
-        self.current_operator = None
-    def _accept_children(self, node):
-        res = []
-        for child in node.children:
-            rql = child.accept(self)
-            if rql is not None:
-                res.append(rql)
-        return res
-    def generate(self, session, rqlst, args):
-        self._session = session
-        self.kwargs = args
-        self.need_translation = False
-        self.has_local_eid = False
-        return self.visit_union(rqlst)
-    def visit_union(self, node):
-        s = self._accept_children(node)
-        if len(s) > 1:
-            return ' UNION '.join('(%s)' % q for q in s)
-        return s[0]
+        return super(PyroRQLSource, self).check_connection(cnx)
-    def visit_select(self, node):
-        """return the tree as an encoded rql string"""
-        self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
-        self._const_var = {}
-        if node.distinct:
-            base = 'DISTINCT Any'
-        else:
-            base = 'Any'
-        s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))]
-        if node.groupby:
-            s.append('GROUPBY %s' % ', '.join(group.accept(self)
-                                              for group in node.groupby))
-        if node.orderby:
-            s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term)
-                                              for term in node.orderby))
-        if node.limit is not None:
-            s.append('LIMIT %s' % node.limit)
-        if node.offset:
-            s.append('OFFSET %s' % node.offset)
-        restrictions = []
-        if node.where is not None:
-            nr = node.where.accept(self)
-            if nr is not None:
-                restrictions.append(nr)
-        if restrictions:
-            s.append('WHERE %s' % ','.join(restrictions))
-        if node.having:
-            s.append('HAVING %s' % ', '.join(term.accept(self)
-                                             for term in node.having))
-        subqueries = []
-        for subquery in node.with_:
-            subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
-                                                 self.visit_union(subquery.query)))
-        if subqueries:
-            s.append('WITH %s' % (','.join(subqueries)))
-        return ' '.join(s)
-    def visit_and(self, node):
-        res = self._accept_children(node)
-        if res:
-            return ', '.join(res)
-        return
-    def visit_or(self, node):
-        res = self._accept_children(node)
-        if len(res) > 1:
-            return ' OR '.join('(%s)' % rql for rql in res)
-        elif res:
-            return res[0]
-        return
-    def visit_not(self, node):
-        rql = node.children[0].accept(self)
-        if rql:
-            return 'NOT (%s)' % rql
-        return
-    def visit_exists(self, node):
-        rql = node.children[0].accept(self)
-        if rql:
-            return 'EXISTS(%s)' % rql
-        return
-    def visit_relation(self, node):
-        try:
-            if isinstance(node.children[0], Constant):
-                # simplified rqlst, reintroduce eid relation
-                try:
-                    restr, lhs = self.process_eid_const(node.children[0])
-                except UnknownEid:
-                    # can safely skip not relation with an unsupported eid
-                    if neged_relation(node):
-                        return
-                    raise
-            else:
-                lhs = node.children[0].accept(self)
-                restr = None
-        except UnknownEid:
-            # can safely skip not relation with an unsupported eid
-            if neged_relation(node):
-                return
-            # XXX what about optional relation or outer NOT EXISTS()
-            raise
-        if node.optional in ('left', 'both'):
-            lhs += '?'
-        if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final:
-            self.need_translation = True
-            self.current_operator = node.operator()
-            if isinstance(node.children[0], Constant):
-                self.current_etypes = (node.children[0].uidtype,)
-            else:
-                self.current_etypes = node.children[0].variable.stinfo['possibletypes']
-        try:
-            rhs = node.children[1].accept(self)
-        except UnknownEid:
-            # can safely skip not relation with an unsupported eid
-            if neged_relation(node):
-                return
-            # XXX what about optional relation or outer NOT EXISTS()
-            raise
-        except ReplaceByInOperator, ex:
-            rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids)
-        self.need_translation = False
-        self.current_operator = None
-        if node.optional in ('right', 'both'):
-            rhs += '?'
-        if restr is not None:
-            return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
-        return '%s %s %s' % (lhs, node.r_type, rhs)
-    def visit_comparison(self, node):
-        if node.operator in ('=', 'IS'):
-            return node.children[0].accept(self)
-        return '%s %s' % (node.operator.encode(),
-                          node.children[0].accept(self))
-    def visit_mathexpression(self, node):
-        return '(%s %s %s)' % (node.children[0].accept(self),
-                               node.operator.encode(),
-                               node.children[1].accept(self))
-    def visit_function(self, node):
-        #if node.name == 'IN':
-        res = []
-        for child in node.children:
-            try:
-                rql = child.accept(self)
-            except UnknownEid, ex:
-                continue
-            res.append(rql)
-        if not res:
-            raise ex
-        return '%s(%s)' % (node.name, ', '.join(res))
-    def visit_constant(self, node):
-        if self.need_translation or node.uidtype:
-            if node.type == 'Int':
-                self.has_local_eid = True
-                return str(self.eid2extid(node.value))
-            if node.type == 'Substitute':
-                key = node.value
-                # ensure we have not yet translated the value...
-                if not key in self._const_var:
-                    self.kwargs[key] = self.eid2extid(self.kwargs[key])
-                    self._const_var[key] = None
-                    self.has_local_eid = True
-        return node.as_string()
-    def visit_variableref(self, node):
-        """get the sql name for a variable reference"""
-        return node.name
-    def visit_sortterm(self, node):
-        if node.asc:
-            return node.term.accept(self)
-        return '%s DESC' % node.term.accept(self)
-    def process_eid_const(self, const):
-        value = const.eval(self.kwargs)
-        try:
-            return None, self._const_var[value]
-        except Exception:
-            var = self._varmaker.next()
-            self.need_translation = True
-            restr = '%s eid %s' % (var, self.visit_constant(const))
-            self.need_translation = False
-            self._const_var[value] = var
-            return restr, var
-    def eid2extid(self, eid):
-        try:
-            return self.repo.eid2extid(self.source, eid, self._session)
-        except UnknownEid:
-            operator = self.current_operator
-            if operator is not None and operator != '=':
-                # deal with query like "X eid > 12"
-                #
-                # The problem is that eid order in the external source may
-                # differ from the local source
-                #
-                # So search for all eids from this source matching the condition
-                # locally and then to replace the "> 12" branch by "IN (eids)"
-                #
-                # XXX we may have to insert a huge number of eids...)
-                sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"
-                etypes = ','.join("'%s'" % etype for etype in self.current_etypes)
-                cu = self._session.system_sql(sql % (self.source.uri, etypes,
-                                                      operator, eid))
-                # XXX buggy cu.rowcount which may be zero while there are some
-                # results
-                rows = cu.fetchall()
-                if rows:
-                    raise ReplaceByInOperator((b64decode(r[0]) for r in rows))
-            raise
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/server/sources/remoterql.py	Wed Apr 04 17:47:58 2012 +0200
@@ -0,0 +1,670 @@
+# copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
+# This file is part of CubicWeb.
+# CubicWeb is free software: you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License along
+# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
+"""Source to query another RQL remote repository"""
+__docformat__ = "restructuredtext en"
+_ = unicode
+from os.path import join
+from base64 import b64decode
+from logilab.common.configuration import REQUIRED
+from yams.schema import role_name
+from rql.nodes import Constant
+from rql.utils import rqlvar_maker
+from cubicweb import dbapi, server
+from cubicweb import ValidationError, BadConnectionId, UnknownEid
+from cubicweb.schema import VIRTUAL_RTYPES
+from cubicweb.server.sources import (AbstractSource, ConnectionWrapper,
+                                     TimedCache, dbg_st_search, dbg_results)
+from cubicweb.server.msplanner import neged_relation
+def uidtype(union, col, etype, args):
+    select, col = union.locate_subquery(col, etype, args)
+    return getattr(select.selection[col], 'uidtype', None)
+class ReplaceByInOperator(Exception):
+    def __init__(self, eids):
+        self.eids = eids
+class RemoteSource(AbstractSource):
+    """Generic external repository source"""
+    CNX_TYPE = None # Must be ovewritted !
+    # boolean telling if modification hooks should be called when something is
+    # modified in this source
+    should_call_hooks = False
+    # boolean telling if the repository should connect to this source during
+    # migration
+    connect_for_migration = False
+    options = (
+        ('cubicweb-user',
+         {'type' : 'string',
+          'default': REQUIRED,
+          'help': 'user to use for connection on the distant repository',
+          'group': 'remote-source', 'level': 0,
+          }),
+        ('cubicweb-password',
+         {'type' : 'password',
+          'default': '',
+          'help': 'user to use for connection on the distant repository',
+          'group': 'remote-source', 'level': 0,
+          }),
+        ('base-url',
+         {'type' : 'string',
+          'default': '',
+          'help': 'url of the web site for the distant repository, if you want '
+          'to generate external link to entities from this repository',
+          'group': 'remote-source', 'level': 1,
+          }),
+        ('skip-external-entities',
+         {'type' : 'yn',
+          'default': False,
+          'help': 'should entities not local to the source be considered or not',
+          'group': 'remote-source', 'level': 0,
+          }),
+        ('synchronization-interval',
+         {'type' : 'time',
+          'default': '5min',
+          'help': 'interval between synchronization with the external \
+repository (default to 5 minutes).',
+          'group': 'remote-source', 'level': 2,
+          }))
+    PUBLIC_KEYS = AbstractSource.PUBLIC_KEYS + ('base-url',)
+    _conn = None
+    def __init__(self, repo, source_config, eid=None):
+        super(AbstractSource, self).__init__(repo, source_config, eid)
+        self.update_config(None, self.check_conf_dict(eid, source_config,
+                                                      fail_if_unknown=False))
+        self._query_cache = TimedCache(1800)
+    def update_config(self, source_entity, processed_config):
+        """update configuration from source entity"""
+        baseurl = processed_config.get('base-url')
+        if baseurl and not baseurl.endswith('/'):
+            processed_config['base-url'] += '/'
+        self.config = processed_config
+        self._skip_externals = processed_config['skip-external-entities']
+        if source_entity is not None:
+            self.latest_retrieval = source_entity.latest_retrieval
+    def _get_connection(self):
+        """open and return a connection to the source"""
+        self.info('connecting to source %(base-url)s with user %(cubicweb-user)s',
+                  self.config)
+        cnxprops = ConnectionProperties(cnxtype=self.CNX_TYPE)
+        return dbapi.connect(login=self.config['cubicweb-user'],
+                             password=self.config['cubicweb-password'],
+                             cnxprops=cnxprops)
+    def get_connection(self):
+        try:
+            return self._get_connection()
+        except ConnectionError, ex:
+            self.critical("can't get connection to source %s: %s", self.uri, ex)
+            return ConnectionWrapper()
+    def reset_caches(self):
+        """method called during test to reset potential source caches"""
+        self._query_cache = TimedCache(1800)
+    def init(self, activated, source_entity):
+        """method called by the repository once ready to handle request"""
+        self.load_mapping(source_entity._cw)
+        if activated:
+            interval = self.config['synchronization-interval']
+            self.repo.looping_task(interval, self.synchronize)
+            self.repo.looping_task(self._query_cache.ttl.seconds/10,
+                                   self._query_cache.clear_expired)
+            self.latest_retrieval = source_entity.latest_retrieval
+    def load_mapping(self, session=None):
+        self.support_entities = {}
+        self.support_relations = {}
+        self.dont_cross_relations = set(('owned_by', 'created_by'))
+        self.cross_relations = set()
+        assert self.eid is not None
+        self._schemacfg_idx = {}
+        self._load_mapping(session)
+    etype_options = set(('write',))
+    rtype_options = set(('maycross', 'dontcross', 'write',))
+    def _check_options(self, schemacfg, allowedoptions):
+        if schemacfg.options:
+            options = set(w.strip() for w in schemacfg.options.split(':'))
+        else:
+            options = set()
+        if options - allowedoptions:
+            options = ', '.join(sorted(options - allowedoptions))
+            msg = _('unknown option(s): %s' % options)
+            raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
+        return options
+    def add_schema_config(self, schemacfg, checkonly=False):
+        """added CWSourceSchemaConfig, modify mapping accordingly"""
+        try:
+            ertype = schemacfg.schema.name
+        except AttributeError:
+            msg = schemacfg._cw._("attribute/relation can't be mapped, only "
+                                  "entity and relation types")
+            raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
+        if schemacfg.schema.__regid__ == 'CWEType':
+            options = self._check_options(schemacfg, self.etype_options)
+            if not checkonly:
+                self.support_entities[ertype] = 'write' in options
+        else: # CWRType
+            if ertype in ('is', 'is_instance_of', 'cw_source') or ertype in VIRTUAL_RTYPES:
+                msg = schemacfg._cw._('%s relation should not be in mapped') % ertype
+                raise ValidationError(schemacfg.eid, {role_name('cw_for_schema', 'subject'): msg})
+            options = self._check_options(schemacfg, self.rtype_options)
+            if 'dontcross' in options:
+                if 'maycross' in options:
+                    msg = schemacfg._("can't mix dontcross and maycross options")
+                    raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
+                if 'write' in options:
+                    msg = schemacfg._("can't mix dontcross and write options")
+                    raise ValidationError(schemacfg.eid, {role_name('options', 'subject'): msg})
+                if not checkonly:
+                    self.dont_cross_relations.add(ertype)
+            elif not checkonly:
+                self.support_relations[ertype] = 'write' in options
+                if 'maycross' in options:
+                    self.cross_relations.add(ertype)
+        if not checkonly:
+            # add to an index to ease deletion handling
+            self._schemacfg_idx[schemacfg.eid] = ertype
+    def del_schema_config(self, schemacfg, checkonly=False):
+        """deleted CWSourceSchemaConfig, modify mapping accordingly"""
+        if checkonly:
+            return
+        try:
+            ertype = self._schemacfg_idx[schemacfg.eid]
+            if ertype[0].isupper():
+                del self.support_entities[ertype]
+            else:
+                if ertype in self.support_relations:
+                    del self.support_relations[ertype]
+                    if ertype in self.cross_relations:
+                        self.cross_relations.remove(ertype)
+                else:
+                    self.dont_cross_relations.remove(ertype)
+        except Exception:
+            self.error('while updating mapping consequently to removal of %s',
+                       schemacfg)
+    def local_eid(self, cnx, extid, session):
+        etype, dexturi, dextid = cnx.describe(extid)
+        if dexturi == 'system' or not (
+            dexturi in self.repo.sources_by_uri or self._skip_externals):
+            assert etype in self.support_entities, etype
+            eid = self.repo.extid2eid(self, str(extid), etype, session)
+            if eid > 0:
+                return eid, True
+        elif dexturi in self.repo.sources_by_uri:
+            source = self.repo.sources_by_uri[dexturi]
+            cnx = session.cnxset.connection(source.uri)
+            eid = source.local_eid(cnx, dextid, session)[0]
+            return eid, False
+        return None, None
+    def synchronize(self, mtime=None):
+        """synchronize content known by this repository with content in the
+        external repository
+        """
+        self.info('synchronizing remote %s source %s', (self.CNX_TYPE, self.uri))
+        cnx = self.get_connection()
+        try:
+            extrepo = cnx._repo
+        except AttributeError:
+            # fake connection wrapper returned when we can't connect to the
+            # external source (hence we've no chance to synchronize...)
+            return
+        etypes = self.support_entities.keys()
+        if mtime is None:
+            mtime = self.latest_retrieval
+        updatetime, modified, deleted = extrepo.entities_modified_since(
+            etypes, mtime)
+        self._query_cache.clear()
+        repo = self.repo
+        session = repo.internal_session()
+        source = repo.system_source
+        try:
+            for etype, extid in modified:
+                try:
+                    eid = self.local_eid(cnx, extid, session)[0]
+                    if eid is not None:
+                        rset = session.eid_rset(eid, etype)
+                        entity = rset.get_entity(0, 0)
+                        entity.complete(entity.e_schema.indexable_attributes())
+                        source.index_entity(session, entity)
+                except Exception:
+                    self.exception('while updating %s with external id %s of source %s',
+                                   etype, extid, self.uri)
+                    continue
+            for etype, extid in deleted:
+                try:
+                    eid = self.repo.extid2eid(self, str(extid), etype, session,
+                                              insert=False)
+                    # entity has been deleted from external repository but is not known here
+                    if eid is not None:
+                        entity = session.entity_from_eid(eid, etype)
+                        repo.delete_info(session, entity, self.uri,
+                                         scleanup=self.eid)
+                except Exception:
+                    if self.repo.config.mode == 'test':
+                        raise
+                    self.exception('while updating %s with external id %s of source %s',
+                                   etype, extid, self.uri)
+                    continue
+            self.latest_retrieval = updatetime
+            session.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
+                            {'x': self.eid, 'date': self.latest_retrieval})
+            session.commit()
+        finally:
+            session.close()
+    def get_connection(self):
+        raise NotImplementedError()
+    def check_connection(self, cnx):
+        """check connection validity, return None if the connection is still valid
+        else a new connection
+        """
+        if not isinstance(cnx, ConnectionWrapper):
+            try:
+                cnx.check()
+                return # ok
+            except (BadConnectionId, ConnectionClosedError):
+                pass
+        # try to reconnect
+        return self.get_connection()
+    def syntax_tree_search(self, session, union, args=None, cachekey=None,
+                           varmap=None):
+        assert dbg_st_search(self.uri, union, varmap, args, cachekey)
+        rqlkey = union.as_string(kwargs=args)
+        try:
+            results = self._query_cache[rqlkey]
+        except KeyError:
+            results = self._syntax_tree_search(session, union, args)
+            self._query_cache[rqlkey] = results
+        assert dbg_results(results)
+        return results
+    def _syntax_tree_search(self, session, union, args):
+        """return result from this source for a rql query (actually from a rql
+        syntax tree and a solution dictionary mapping each used variable to a
+        possible type). If cachekey is given, the query necessary to fetch the
+        results (but not the results themselves) may be cached using this key.
+        """
+        if not args is None:
+            args = args.copy()
+        # get cached cursor anyway
+        cu = session.cnxset[self.uri]
+        if cu is None:
+            # this is a ConnectionWrapper instance
+            msg = session._("can't connect to source %s, some data may be missing")
+            session.set_shared_data('sources_error', msg % self.uri, txdata=True)
+            return []
+        translator = RQL2RQL(self)
+        try:
+            rql = translator.generate(session, union, args)
+        except UnknownEid, ex:
+            if server.DEBUG:
+                print '  unknown eid', ex, 'no results'
+            return []
+        if server.DEBUG & server.DBG_RQL:
+            print '  translated rql', rql
+        try:
+            rset = cu.execute(rql, args)
+        except Exception, ex:
+            self.exception(str(ex))
+            msg = session._("error while querying source %s, some data may be missing")
+            session.set_shared_data('sources_error', msg % self.uri, txdata=True)
+            return []
+        descr = rset.description
+        if rset:
+            needtranslation = []
+            rows = rset.rows
+            for i, etype in enumerate(descr[0]):
+                if (etype is None or not self.schema.eschema(etype).final
+                    or uidtype(union, i, etype, args)):
+                    needtranslation.append(i)
+            if needtranslation:
+                cnx = session.cnxset.connection(self.uri)
+                for rowindex in xrange(rset.rowcount - 1, -1, -1):
+                    row = rows[rowindex]
+                    localrow = False
+                    for colindex in needtranslation:
+                        if row[colindex] is not None: # optional variable
+                            eid, local = self.local_eid(cnx, row[colindex], session)
+                            if local:
+                                localrow = True
+                            if eid is not None:
+                                row[colindex] = eid
+                            else:
+                                # skip this row
+                                del rows[rowindex]
+                                del descr[rowindex]
+                                break
+                    else:
+                        # skip row if it only contains eids of entities which
+                        # are actually from a source we also know locally,
+                        # except if some args specified (XXX should actually
+                        # check if there are some args local to the source)
+                        if not (translator.has_local_eid or localrow):
+                            del rows[rowindex]
+                            del descr[rowindex]
+            results = rows
+        else:
+            results = []
+        return results
+    def _entity_relations_and_kwargs(self, session, entity):
+        relations = []
+        kwargs = {'x': self.repo.eid2extid(self, entity.eid, session)}
+        for key, val in entity.cw_attr_cache.iteritems():
+            relations.append('X %s %%(%s)s' % (key, key))
+            kwargs[key] = val
+        return relations, kwargs
+    def add_entity(self, session, entity):
+        """add a new entity to the source"""
+        raise NotImplementedError()
+    def update_entity(self, session, entity):
+        """update an entity in the source"""
+        relations, kwargs = self._entity_relations_and_kwargs(session, entity)
+        cu = session.cnxset[self.uri]
+        cu.execute('SET %s WHERE X eid %%(x)s' % ','.join(relations), kwargs)
+        self._query_cache.clear()
+        entity.cw_clear_all_caches()
+    def delete_entity(self, session, entity):
+        """delete an entity from the source"""
+        if session.deleted_in_transaction(self.eid):
+            # source is being deleted, don't propagate
+            self._query_cache.clear()
+            return
+        cu = session.cnxset[self.uri]
+        cu.execute('DELETE %s X WHERE X eid %%(x)s' % entity.__regid__,
+                   {'x': self.repo.eid2extid(self, entity.eid, session)})
+        self._query_cache.clear()
+    def add_relation(self, session, subject, rtype, object):
+        """add a relation to the source"""
+        cu = session.cnxset[self.uri]
+        cu.execute('SET X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
+                   {'x': self.repo.eid2extid(self, subject, session),
+                    'y': self.repo.eid2extid(self, object, session)})
+        self._query_cache.clear()
+        session.entity_from_eid(subject).cw_clear_all_caches()
+        session.entity_from_eid(object).cw_clear_all_caches()
+    def delete_relation(self, session, subject, rtype, object):
+        """delete a relation from the source"""
+        if session.deleted_in_transaction(self.eid):
+            # source is being deleted, don't propagate
+            self._query_cache.clear()
+            return
+        cu = session.cnxset[self.uri]
+        cu.execute('DELETE X %s Y WHERE X eid %%(x)s, Y eid %%(y)s' % rtype,
+                   {'x': self.repo.eid2extid(self, subject, session),
+                    'y': self.repo.eid2extid(self, object, session)})
+        self._query_cache.clear()
+        session.entity_from_eid(subject).cw_clear_all_caches()
+        session.entity_from_eid(object).cw_clear_all_caches()
+class RQL2RQL(object):
+    """translate a local rql query to be executed on a distant repository"""
+    def __init__(self, source):
+        self.source = source
+        self.repo = source.repo
+        self.current_operator = None
+    def _accept_children(self, node):
+        res = []
+        for child in node.children:
+            rql = child.accept(self)
+            if rql is not None:
+                res.append(rql)
+        return res
+    def generate(self, session, rqlst, args):
+        self._session = session
+        self.kwargs = args
+        self.need_translation = False
+        self.has_local_eid = False
+        return self.visit_union(rqlst)
+    def visit_union(self, node):
+        s = self._accept_children(node)
+        if len(s) > 1:
+            return ' UNION '.join('(%s)' % q for q in s)
+        return s[0]
+    def visit_select(self, node):
+        """return the tree as an encoded rql string"""
+        self._varmaker = rqlvar_maker(defined=node.defined_vars.copy())
+        self._const_var = {}
+        if node.distinct:
+            base = 'DISTINCT Any'
+        else:
+            base = 'Any'
+        s = ['%s %s' % (base, ','.join(v.accept(self) for v in node.selection))]
+        if node.groupby:
+            s.append('GROUPBY %s' % ', '.join(group.accept(self)
+                                              for group in node.groupby))
+        if node.orderby:
+            s.append('ORDERBY %s' % ', '.join(self.visit_sortterm(term)
+                                              for term in node.orderby))
+        if node.limit is not None:
+            s.append('LIMIT %s' % node.limit)
+        if node.offset:
+            s.append('OFFSET %s' % node.offset)
+        restrictions = []
+        if node.where is not None:
+            nr = node.where.accept(self)
+            if nr is not None:
+                restrictions.append(nr)
+        if restrictions:
+            s.append('WHERE %s' % ','.join(restrictions))
+        if node.having:
+            s.append('HAVING %s' % ', '.join(term.accept(self)
+                                             for term in node.having))
+        subqueries = []
+        for subquery in node.with_:
+            subqueries.append('%s BEING (%s)' % (','.join(ca.name for ca in subquery.aliases),
+                                                 self.visit_union(subquery.query)))
+        if subqueries:
+            s.append('WITH %s' % (','.join(subqueries)))
+        return ' '.join(s)
+    def visit_and(self, node):
+        res = self._accept_children(node)
+        if res:
+            return ', '.join(res)
+        return
+    def visit_or(self, node):
+        res = self._accept_children(node)
+        if len(res) > 1:
+            return ' OR '.join('(%s)' % rql for rql in res)
+        elif res:
+            return res[0]
+        return
+    def visit_not(self, node):
+        rql = node.children[0].accept(self)
+        if rql:
+            return 'NOT (%s)' % rql
+        return
+    def visit_exists(self, node):
+        rql = node.children[0].accept(self)
+        if rql:
+            return 'EXISTS(%s)' % rql
+        return
+    def visit_relation(self, node):
+        try:
+            if isinstance(node.children[0], Constant):
+                # simplified rqlst, reintroduce eid relation
+                try:
+                    restr, lhs = self.process_eid_const(node.children[0])
+                except UnknownEid:
+                    # can safely skip not relation with an unsupported eid
+                    if neged_relation(node):
+                        return
+                    raise
+            else:
+                lhs = node.children[0].accept(self)
+                restr = None
+        except UnknownEid:
+            # can safely skip not relation with an unsupported eid
+            if neged_relation(node):
+                return
+            # XXX what about optional relation or outer NOT EXISTS()
+            raise
+        if node.optional in ('left', 'both'):
+            lhs += '?'
+        if node.r_type == 'eid' or not self.source.schema.rschema(node.r_type).final:
+            self.need_translation = True
+            self.current_operator = node.operator()
+            if isinstance(node.children[0], Constant):
+                self.current_etypes = (node.children[0].uidtype,)
+            else:
+                self.current_etypes = node.children[0].variable.stinfo['possibletypes']
+        try:
+            rhs = node.children[1].accept(self)
+        except UnknownEid:
+            # can safely skip not relation with an unsupported eid
+            if neged_relation(node):
+                return
+            # XXX what about optional relation or outer NOT EXISTS()
+            raise
+        except ReplaceByInOperator, ex:
+            rhs = 'IN (%s)' % ','.join(eid for eid in ex.eids)
+        self.need_translation = False
+        self.current_operator = None
+        if node.optional in ('right', 'both'):
+            rhs += '?'
+        if restr is not None:
+            return '%s %s %s, %s' % (lhs, node.r_type, rhs, restr)
+        return '%s %s %s' % (lhs, node.r_type, rhs)
+    def visit_comparison(self, node):
+        if node.operator in ('=', 'IS'):
+            return node.children[0].accept(self)
+        return '%s %s' % (node.operator.encode(),
+                          node.children[0].accept(self))
+    def visit_mathexpression(self, node):
+        return '(%s %s %s)' % (node.children[0].accept(self),
+                               node.operator.encode(),
+                               node.children[1].accept(self))
+    def visit_function(self, node):
+        #if node.name == 'IN':
+        res = []
+        for child in node.children:
+            try:
+                rql = child.accept(self)
+            except UnknownEid, ex:
+                continue
+            res.append(rql)
+        if not res:
+            raise ex
+        return '%s(%s)' % (node.name, ', '.join(res))
+    def visit_constant(self, node):
+        if self.need_translation or node.uidtype:
+            if node.type == 'Int':
+                self.has_local_eid = True
+                return str(self.eid2extid(node.value))
+            if node.type == 'Substitute':
+                key = node.value
+                # ensure we have not yet translated the value...
+                if not key in self._const_var:
+                    self.kwargs[key] = self.eid2extid(self.kwargs[key])
+                    self._const_var[key] = None
+                    self.has_local_eid = True
+        return node.as_string()
+    def visit_variableref(self, node):
+        """get the sql name for a variable reference"""
+        return node.name
+    def visit_sortterm(self, node):
+        if node.asc:
+            return node.term.accept(self)
+        return '%s DESC' % node.term.accept(self)
+    def process_eid_const(self, const):
+        value = const.eval(self.kwargs)
+        try:
+            return None, self._const_var[value]
+        except Exception:
+            var = self._varmaker.next()
+            self.need_translation = True
+            restr = '%s eid %s' % (var, self.visit_constant(const))
+            self.need_translation = False
+            self._const_var[value] = var
+            return restr, var
+    def eid2extid(self, eid):
+        try:
+            return self.repo.eid2extid(self.source, eid, self._session)
+        except UnknownEid:
+            operator = self.current_operator
+            if operator is not None and operator != '=':
+                # deal with query like "X eid > 12"
+                #
+                # The problem is that eid order in the external source may
+                # differ from the local source
+                #
+                # So search for all eids from this source matching the condition
+                # locally and then to replace the "> 12" branch by "IN (eids)"
+                #
+                # XXX we may have to insert a huge number of eids...)
+                sql = "SELECT extid FROM entities WHERE source='%s' AND type IN (%s) AND eid%s%s"
+                etypes = ','.join("'%s'" % etype for etype in self.current_etypes)
+                cu = self._session.system_sql(sql % (self.source.uri, etypes,
+                                                      operator, eid))
+                # XXX buggy cu.rowcount which may be zero while there are some
+                # results
+                rows = cu.fetchall()
+                if rows:
+                    raise ReplaceByInOperator((b64decode(r[0]) for r in rows))
+            raise