Remove the remote repository-access-through-zmq support
authorAurelien Campeas <aurelien.campeas@logilab.fr>
Fri, 13 Mar 2015 14:10:40 +0100
changeset 10236 ef3059a692cb
parent 10235 684215aca046
child 10237 1e030b1a5622
Remove the remote repository-access-through-zmq support Modern methods such as the rqlcontroller cube + the cwclientlib library are the way forward. Closes #2919297.
dbapi.py
doc/3.21.rst
hooks/zmq.py
repoapi.py
server/cwzmq.py
server/serverconfig.py
server/serverctl.py
server/test/unittest_repository.py
test/unittest_utils.py
utils.py
zmqclient.py
--- a/dbapi.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/dbapi.py	Fri Mar 13 14:10:40 2015 +0100
@@ -118,11 +118,8 @@
 
     * a simple instance id for in-memory connection
 
-    * a uri like scheme://host:port/instanceid where scheme may be one of
-      'inmemory' or 'zmqpickle'
-
-      * if scheme is handled by ZMQ (eg 'tcp'), you should not specify an
-        instance id
+    * a uri like scheme://host:port/instanceid where scheme must be
+      'inmemory'
 
     Other arguments:
 
@@ -131,7 +128,7 @@
 
     :cnxprops:
       a :class:`ConnectionProperties` instance, allowing to specify
-      the connection method (eg in memory or zmq).
+      the connection method (eg in memory).
 
     :setvreg:
       flag telling if a registry should be initialized for the connection.
@@ -150,36 +147,18 @@
     :kwargs:
       there goes authentication tokens. You usually have to specify a password
       for the given user, using a named 'password' argument.
+
     """
     if not urlparse(database).scheme:
         warn('[3.16] give an qualified URI as database instead of using '
              'host/cnxprops to specify the connection method',
              DeprecationWarning, stacklevel=2)
-        if cnxprops and cnxprops.cnxtype == 'zmq':
-            database = kwargs.pop('host')
-        elif cnxprops and cnxprops.cnxtype == 'inmemory':
-            database = 'inmemory://' + database
     puri = urlparse(database)
     method = puri.scheme.lower()
-    if method == 'inmemory':
-        config = cwconfig.instance_configuration(puri.netloc)
-    else:
-        config = cwconfig.CubicWebNoAppConfiguration()
+    assert method == 'inmemory'
+    config = cwconfig.instance_configuration(puri.netloc)
     repo = get_repository(database, config=config)
-    if method == 'inmemory':
-        vreg = repo.vreg
-    elif setvreg:
-        if mulcnx:
-            multiple_connections_fix()
-        vreg = cwvreg.CWRegistryStore(config, initlog=initlog)
-        schema = repo.get_schema()
-        for oldetype, newetype in ETYPE_NAME_MAP.items():
-            if oldetype in schema:
-                print 'aliasing', newetype, 'to', oldetype
-                schema._entities[newetype] = schema._entities[oldetype]
-        vreg.set_schema(schema)
-    else:
-        vreg = None
+    vreg = repo.vreg
     cnx = _repo_connect(repo, login, cnxprops=cnxprops, **kwargs)
     cnx.vreg = vreg
     return cnx
--- a/doc/3.21.rst	Tue Mar 03 14:57:34 2015 +0100
+++ b/doc/3.21.rst	Fri Mar 13 14:10:40 2015 +0100
@@ -15,6 +15,6 @@
 * the user_callback api has been removed; people should use plain
   ajax functions instead
 
-* the Pyro remote repository access method has been entirely removed
-  (emerging alternatives such as rqlcontroller and cwclientlib should
-  be used instead)
+* the `Pyro` and `Zmq-pickle` remote repository access methods have
+  been entirely removed (emerging alternatives such as rqlcontroller
+  and cwclientlib should be used instead)
--- a/hooks/zmq.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/hooks/zmq.py	Fri Mar 13 14:10:40 2015 +0100
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -50,30 +50,3 @@
         self.repo.app_instances_bus.start()
 
 
-class ZMQRepositoryServerStopHook(hook.Hook):
-    __regid__ = 'zmqrepositoryserverstop'
-    events = ('server_shutdown',)
-
-    def __call__(self):
-        server = getattr(self.repo, 'zmq_repo_server', None)
-        if server:
-            self.repo.zmq_repo_server.quit()
-
-class ZMQRepositoryServerStartHook(hook.Hook):
-    __regid__ = 'zmqrepositoryserverstart'
-    events = ('server_startup',)
-
-    def __call__(self):
-        config = self.repo.config
-        if config.name == 'repository':
-            # start-repository command already starts a zmq repo
-            return
-        address = config.get('zmq-repository-address')
-        if not address:
-            return
-        self.repo.warning('remote access to the repository via zmq/pickle is deprecated')
-        from cubicweb.server import cwzmq
-        self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo)
-        server.connect(address)
-        self.repo.threaded_task(server.run)
-
--- a/repoapi.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/repoapi.py	Fri Mar 13 14:10:40 2015 +0100
@@ -52,11 +52,7 @@
         # me may have been called with a dummy 'inmemory://' uri ...
         return _get_inmemory_repo(config, vreg)
 
-    if protocol.startswith('zmqpickle-'):
-        from cubicweb.zmqclient import ZMQRepositoryClient
-        return ZMQRepositoryClient(uri)
-    else:
-        raise ConnectionError('unknown protocol: `%s`' % protocol)
+    raise ConnectionError('unknown protocol: `%s`' % protocol)
 
 def connect(repo, login, **kwargs):
     """Take credential and return associated ClientConnection.
--- a/server/cwzmq.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/server/cwzmq.py	Fri Mar 13 14:10:40 2015 +0100
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# copyright 2012-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -17,9 +17,6 @@
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 
-import cPickle
-import traceback
-from time import localtime, mktime
 from threading import Thread
 from logging import getLogger
 
@@ -29,59 +26,9 @@
 
 from cubicweb import set_log_methods
 
+
 ctx = zmq.Context()
 
-def cwproto_to_zmqaddr(address):
-    """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>)
-    into a proper zmq address (tcp://<ip>:<port>)
-    """
-    assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address
-    return address.split('-', 1)[1] # chop the `zmqpickle-` prefix
-
-
-class Finished(Exception):
-    """raise to remove an event from the event loop"""
-
-class TimeEvent:
-    """base event"""
-    # timefunc = staticmethod(localtime)
-    timefunc = localtime
-
-    def __init__(self, absolute=None, period=None):
-        # local time tuple
-        if absolute is None:
-            absolute = self.timefunc()
-        self.absolute = absolute
-        # optional period in seconds
-        self.period = period
-
-    def is_ready(self):
-        """return  true if the event is ready to be fired"""
-        now = self.timefunc()
-        if self.absolute <= now:
-            return True
-        return False
-
-    def fire(self, server):
-        """fire the event
-        must be overridden by concrete events
-        """
-        raise NotImplementedError()
-
-    def update(self):
-        """update the absolute date for the event or raise a finished exception
-        """
-        if self.period is None:
-            raise Finished
-        self.absolute = localtime(mktime(self.absolute) + self.period)
-
-
-class QuitEvent(TimeEvent):
-    """stop the server"""
-    def fire(self, server):
-        server.repo.shutdown()
-        server.quiting = True
-
 
 class ZMQComm(object):
     """
@@ -179,132 +126,5 @@
         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
 
 
-class ZMQRepositoryServer(object):
-
-    def __init__(self, repository):
-        """make the repository available as a PyRO object"""
-        self.address = None
-        self.repo = repository
-        self.socket = None
-        self.stream = None
-        self.loop = ioloop.IOLoop()
-
-        # event queue
-        self.events = []
-
-    def connect(self, address):
-        self.address = cwproto_to_zmqaddr(address)
-
-    def run(self):
-        """enter the service loop"""
-        # start repository looping tasks
-        self.socket = ctx.socket(zmq.REP)
-        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
-        self.stream.bind(self.address)
-        self.info('ZMQ server bound on: %s', self.address)
-
-        self.stream.on_recv(self.process_cmds)
-
-        try:
-            self.loop.start()
-        except zmq.ZMQError:
-            self.warning('ZMQ event loop killed')
-        self.quit()
-
-    def trigger_events(self):
-        """trigger ready events"""
-        for event in self.events[:]:
-            if event.is_ready():
-                self.info('starting event %s', event)
-                event.fire(self)
-                try:
-                    event.update()
-                except Finished:
-                    self.events.remove(event)
-
-    def process_cmd(self, cmd):
-        """Delegate the given command to the repository.
-
-        ``cmd`` is a list of (method_name, args, kwargs)
-        where ``args`` is a list of positional arguments
-        and ``kwargs`` is a dictionnary of named arguments.
-
-        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
-
-        :note1: ``kwargs`` may be ommited
-
-            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
-
-        :note2: both ``args`` and ``kwargs`` may be omitted
-
-            >>> schema = delegate_to_repo(["get_schema"])
-            >>> schema = delegate_to_repo("get_schema") # also allowed
-
-        """
-        cmd = cPickle.loads(cmd)
-        if not cmd:
-            raise AttributeError('function name required')
-        if isinstance(cmd, basestring):
-            cmd = [cmd]
-        if len(cmd) < 2:
-            cmd.append(())
-        if len(cmd) < 3:
-            cmd.append({})
-        cmd  = list(cmd) + [(), {}]
-        funcname, args, kwargs = cmd[:3]
-        result = getattr(self.repo, funcname)(*args, **kwargs)
-        return result
-
-    def process_cmds(self, cmds):
-        """Callback intended to be used with ``on_recv``.
-
-        Call ``delegate_to_repo`` on each command and send a pickled of
-        each result recursively.
-
-        Any exception are catched, pickled and sent.
-        """
-        try:
-            for cmd in cmds:
-                result = self.process_cmd(cmd)
-                self.send_data(result)
-        except Exception as exc:
-            traceback.print_exc()
-            self.send_data(exc)
-
-    def send_data(self, data):
-        self.socket.send_pyobj(data)
-
-    def quit(self, shutdown_repo=False):
-        """stop the server"""
-        self.info('Quitting ZMQ server')
-        try:
-            self.loop.add_callback(self.loop.stop)
-            self.stream.on_recv(None)
-            self.stream.close()
-        except Exception as e:
-            print e
-            pass
-        if shutdown_repo and not self.repo.shutting_down:
-            event = QuitEvent()
-            event.fire(self)
-
-    # server utilitities ######################################################
-
-    def install_sig_handlers(self):
-        """install signal handlers"""
-        import signal
-        self.info('installing signal handlers')
-        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
-        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
-
-
-    # these are overridden by set_log_methods below
-    # only defining here to prevent pylint from complaining
-    @classmethod
-    def info(cls, msg, *a, **kw):
-        pass
-
-
 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
-set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))
--- a/server/serverconfig.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/server/serverconfig.py	Fri Mar 13 14:10:40 2015 +0100
@@ -197,14 +197,6 @@
 notified of every changes.',
           'group': 'email', 'level': 2,
           }),
-        # zmq services config
-        ('zmq-repository-address',
-         {'type' : 'string',
-          'default': None,
-          'help': ('ZMQ URI on which the repository will be bound '
-                   'to (of the form `zmqpickle-tcp://<ipaddr>:<port>`).'),
-          'group': 'zmq', 'level': 3,
-          }),
          ('zmq-address-sub',
           {'type' : 'csv',
            'default' : None,
--- a/server/serverctl.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/server/serverctl.py	Fri Mar 13 14:10:40 2015 +0100
@@ -38,7 +38,6 @@
 from cubicweb.toolsutils import Command, CommandHandler, underline_title
 from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand
 from cubicweb.server import SOURCE_TYPES
-from cubicweb.server.repository import Repository
 from cubicweb.server.serverconfig import (
     USER_OPTIONS, ServerConfiguration, SourceConfiguration,
     ask_source_config, generate_source_config)
@@ -676,72 +675,6 @@
         cnx.close()
 
 
-class StartRepositoryCommand(Command):
-    """Start a CubicWeb RQL server for a given instance.
-
-    The server will be remotely accessible through ZMQ
-
-    <instance>
-      the identifier of the instance to initialize.
-    """
-    name = 'start-repository'
-    arguments = '<instance>'
-    min_args = max_args = 1
-    options = (
-        ('debug',
-         {'short': 'D', 'action' : 'store_true',
-          'help': 'start server in debug mode.'}),
-        ('loglevel',
-         {'short': 'l', 'type' : 'choice', 'metavar': '<log level>',
-          'default': None, 'choices': ('debug', 'info', 'warning', 'error'),
-          'help': 'debug if -D is set, error otherwise',
-          }),
-        ('address',
-         {'short': 'a', 'type': 'string', 'metavar': '<protocol>://<host>:<port>',
-          'default': '',
-          'help': ('specify a ZMQ URI on which to bind'),
-          }),
-        )
-
-    def create_repo(self, config):
-        address = self['address']
-        if not address:
-            address = config.get('zmq-repository-address')
-        from cubicweb.server.utils import TasksManager
-        from cubicweb.server.cwzmq import ZMQRepositoryServer
-        repo = Repository(config, TasksManager())
-        return ZMQRepositoryServer(repo), address
-
-    def run(self, args):
-        from logilab.common.daemon import daemonize, setugid
-        from cubicweb.cwctl import init_cmdline_log_threshold
-        print 'WARNING: Standalone repository with pyro or zmq access is deprecated'
-        appid = args[0]
-        debug = self['debug']
-        if sys.platform == 'win32' and not debug:
-            logger = logging.getLogger('cubicweb.ctl')
-            logger.info('Forcing debug mode on win32 platform')
-            debug = True
-        config = ServerConfiguration.config_for(appid, debugmode=debug)
-        init_cmdline_log_threshold(config, self['loglevel'])
-        # create the server
-        server, address = self.create_repo(config)
-        # ensure the directory where the pid-file should be set exists (for
-        # instance /var/run/cubicweb may be deleted on computer restart)
-        pidfile = config['pid-file']
-        piddir = os.path.dirname(pidfile)
-        # go ! (don't daemonize in debug mode)
-        if not os.path.exists(piddir):
-            os.makedirs(piddir)
-        if not debug and daemonize(pidfile, umask=config['umask']):
-            return
-        uid = config['uid']
-        if uid is not None:
-            setugid(uid)
-        server.install_sig_handlers()
-        server.connect(address)
-        server.run()
-
 
 def _remote_dump(host, appid, output, sudo=False):
     # XXX generate unique/portable file name
@@ -1125,7 +1058,6 @@
 
 for cmdclass in (CreateInstanceDBCommand, InitInstanceCommand,
                  GrantUserOnInstanceCommand, ResetAdminPasswordCommand,
-                 StartRepositoryCommand,
                  DBDumpCommand, DBRestoreCommand, DBCopyCommand,
                  AddSourceCommand, CheckRepositoryCommand, RebuildFTICommand,
                  SynchronizeSourceCommand, SchemaDiffCommand,
--- a/server/test/unittest_repository.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/server/test/unittest_repository.py	Fri Mar 13 14:10:40 2015 +0100
@@ -31,10 +31,9 @@
                       UnknownEid, AuthenticationError, Unauthorized, QueryError)
 from cubicweb.predicates import is_instance
 from cubicweb.schema import RQLConstraint
-from cubicweb.dbapi import connect, multiple_connections_unfix
 from cubicweb.devtools.testlib import CubicWebTC
 from cubicweb.devtools.repotest import tuplify
-from cubicweb.server import repository, hook
+from cubicweb.server import hook
 from cubicweb.server.sqlutils import SQL_PREFIX
 from cubicweb.server.hook import Hook
 from cubicweb.server.sources import native
@@ -312,64 +311,6 @@
         ownedby = schema.rschema('owned_by')
         self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))
 
-    def test_zmq(self):
-        try:
-            import zmq
-        except ImportError:
-            self.skipTest("zmq in not available")
-        done = []
-        from cubicweb.devtools import TestServerConfiguration as ServerConfiguration
-        from cubicweb.server.cwzmq import ZMQRepositoryServer
-        # the client part has to be in a thread due to sqlite limitations
-        t = threading.Thread(target=self._zmq_client, args=(done,))
-        t.start()
-
-        zmq_server = ZMQRepositoryServer(self.repo)
-        zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415')
-
-        t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,))
-        t2.start()
-
-        zmq_server.run()
-
-        t2.join(1)
-        t.join(1)
-
-        self.assertTrue(done[0])
-
-        if t.isAlive():
-            self.fail('something went wrong, thread still alive')
-
-    def _zmq_quit(self, done, srv):
-        while not done:
-            time.sleep(0.1)
-        srv.quit()
-
-    def _zmq_client(self, done):
-        try:
-            cnx = connect('zmqpickle-tcp://127.0.0.1:41415', u'admin', password=u'gingkow',
-                          initlog=False) # don't reset logging configuration
-            try:
-                cnx.load_appobjects(subpath=('entities',))
-                # check we can get the schema
-                schema = cnx.get_schema()
-                self.assertTrue(cnx.vreg)
-                self.assertTrue('etypes'in cnx.vreg)
-                cu = cnx.cursor()
-                rset = cu.execute('Any U,G WHERE U in_group G')
-                user = iter(rset.entities()).next()
-                self.assertTrue(user._cw)
-                self.assertTrue(user._cw.vreg)
-                from cubicweb.entities import authobjs
-                self.assertIsInstance(user._cw.user, authobjs.CWUser)
-                cnx.close()
-                done.append(True)
-            finally:
-                # connect monkey patch some method by default, remove them
-                multiple_connections_unfix()
-        finally:
-            done.append(False)
-
     def test_internal_api(self):
         repo = self.repo
         cnxid = repo.connect(self.admlogin, password=self.admpassword)
--- a/test/unittest_utils.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/test/unittest_utils.py	Fri Mar 13 14:10:40 2015 +0100
@@ -58,8 +58,6 @@
                          parse_repo_uri('myapp'))
         self.assertEqual(('inmemory', None, 'myapp'),
                          parse_repo_uri('inmemory://myapp'))
-        self.assertEqual(('zmqpickle-tcp', '127.0.0.1:666', ''),
-                         parse_repo_uri('zmqpickle-tcp://127.0.0.1:666'))
         with self.assertRaises(NotImplementedError):
             parse_repo_uri('foo://bar')
 
--- a/utils.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/utils.py	Fri Mar 13 14:10:40 2015 +0100
@@ -607,7 +607,6 @@
     """ transform a command line uri into a (protocol, hostport, appid), e.g:
     <myapp>                      -> 'inmemory', None, '<myapp>'
     inmemory://<myapp>           -> 'inmemory', None, '<myapp>'
-    zmqpickle://[host][:port]    -> 'zmqpickle', 'host:port', None
     """
     parseduri = urlparse(uri)
     scheme = parseduri.scheme
@@ -615,8 +614,6 @@
         return ('inmemory', None, parseduri.path)
     if scheme == 'inmemory':
         return (scheme, None, parseduri.netloc)
-    if scheme.startswith('zmqpickle-'):
-        return (scheme, parseduri.netloc, parseduri.path)
     raise NotImplementedError('URI protocol not implemented for `%s`' % uri)
 
 
--- a/zmqclient.py	Tue Mar 03 14:57:34 2015 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,64 +0,0 @@
-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
-"""Source to query another RQL repository using ZMQ"""
-
-__docformat__ = "restructuredtext en"
-_ = unicode
-
-from functools import partial
-import zmq
-
-from cubicweb.server.cwzmq import cwproto_to_zmqaddr
-
-# XXX hack to overpass old zmq limitation that force to have
-# only one context per python process
-try:
-    from cubicweb.server.cwzmq import ctx
-except ImportError:
-    ctx = zmq.Context()
-
-class ZMQRepositoryClient(object):
-    """
-    This class delegates the overall repository stuff to a remote source.
-
-    So calling a method of this repository will result on calling the
-    corresponding method of the remote source repository.
-
-    Any raised exception on the remote source is propagated locally.
-
-    ZMQ is used as the transport layer and cPickle is used to serialize data.
-    """
-
-    def __init__(self, zmq_address):
-        """A zmq address provided here will be like
-        `zmqpickle-tcp://127.0.0.1:42000`.  W
-
-        We chop the prefix to get a real zmq address.
-        """
-        self.socket = ctx.socket(zmq.REQ)
-        self.socket.connect(cwproto_to_zmqaddr(zmq_address))
-
-    def __zmqcall__(self, name, *args, **kwargs):
-         self.socket.send_pyobj([name, args, kwargs])
-         result = self.socket.recv_pyobj()
-         if isinstance(result, BaseException):
-             raise result
-         return result
-
-    def __getattr__(self, name):
-        return partial(self.__zmqcall__, name)