Remove the remote repository-access-through-zmq support
Modern methods such as the rqlcontroller cube + the cwclientlib library are the way forward.
Closes #2919297.
--- a/dbapi.py Tue Mar 03 14:57:34 2015 +0100
+++ b/dbapi.py Fri Mar 13 14:10:40 2015 +0100
@@ -118,11 +118,8 @@
* a simple instance id for in-memory connection
- * a uri like scheme://host:port/instanceid where scheme may be one of
- 'inmemory' or 'zmqpickle'
-
- * if scheme is handled by ZMQ (eg 'tcp'), you should not specify an
- instance id
+ * a uri like scheme://host:port/instanceid where scheme must be
+ 'inmemory'
Other arguments:
@@ -131,7 +128,7 @@
:cnxprops:
a :class:`ConnectionProperties` instance, allowing to specify
- the connection method (eg in memory or zmq).
+ the connection method (eg in memory).
:setvreg:
flag telling if a registry should be initialized for the connection.
@@ -150,36 +147,18 @@
:kwargs:
there goes authentication tokens. You usually have to specify a password
for the given user, using a named 'password' argument.
+
"""
if not urlparse(database).scheme:
warn('[3.16] give an qualified URI as database instead of using '
'host/cnxprops to specify the connection method',
DeprecationWarning, stacklevel=2)
- if cnxprops and cnxprops.cnxtype == 'zmq':
- database = kwargs.pop('host')
- elif cnxprops and cnxprops.cnxtype == 'inmemory':
- database = 'inmemory://' + database
puri = urlparse(database)
method = puri.scheme.lower()
- if method == 'inmemory':
- config = cwconfig.instance_configuration(puri.netloc)
- else:
- config = cwconfig.CubicWebNoAppConfiguration()
+ assert method == 'inmemory'
+ config = cwconfig.instance_configuration(puri.netloc)
repo = get_repository(database, config=config)
- if method == 'inmemory':
- vreg = repo.vreg
- elif setvreg:
- if mulcnx:
- multiple_connections_fix()
- vreg = cwvreg.CWRegistryStore(config, initlog=initlog)
- schema = repo.get_schema()
- for oldetype, newetype in ETYPE_NAME_MAP.items():
- if oldetype in schema:
- print 'aliasing', newetype, 'to', oldetype
- schema._entities[newetype] = schema._entities[oldetype]
- vreg.set_schema(schema)
- else:
- vreg = None
+ vreg = repo.vreg
cnx = _repo_connect(repo, login, cnxprops=cnxprops, **kwargs)
cnx.vreg = vreg
return cnx
--- a/doc/3.21.rst Tue Mar 03 14:57:34 2015 +0100
+++ b/doc/3.21.rst Fri Mar 13 14:10:40 2015 +0100
@@ -15,6 +15,6 @@
* the user_callback api has been removed; people should use plain
ajax functions instead
-* the Pyro remote repository access method has been entirely removed
- (emerging alternatives such as rqlcontroller and cwclientlib should
- be used instead)
+* the `Pyro` and `Zmq-pickle` remote repository access methods have
+ been entirely removed (emerging alternatives such as rqlcontroller
+ and cwclientlib should be used instead)
--- a/hooks/zmq.py Tue Mar 03 14:57:34 2015 +0100
+++ b/hooks/zmq.py Fri Mar 13 14:10:40 2015 +0100
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -50,30 +50,3 @@
self.repo.app_instances_bus.start()
-class ZMQRepositoryServerStopHook(hook.Hook):
- __regid__ = 'zmqrepositoryserverstop'
- events = ('server_shutdown',)
-
- def __call__(self):
- server = getattr(self.repo, 'zmq_repo_server', None)
- if server:
- self.repo.zmq_repo_server.quit()
-
-class ZMQRepositoryServerStartHook(hook.Hook):
- __regid__ = 'zmqrepositoryserverstart'
- events = ('server_startup',)
-
- def __call__(self):
- config = self.repo.config
- if config.name == 'repository':
- # start-repository command already starts a zmq repo
- return
- address = config.get('zmq-repository-address')
- if not address:
- return
- self.repo.warning('remote access to the repository via zmq/pickle is deprecated')
- from cubicweb.server import cwzmq
- self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo)
- server.connect(address)
- self.repo.threaded_task(server.run)
-
--- a/repoapi.py Tue Mar 03 14:57:34 2015 +0100
+++ b/repoapi.py Fri Mar 13 14:10:40 2015 +0100
@@ -52,11 +52,7 @@
# me may have been called with a dummy 'inmemory://' uri ...
return _get_inmemory_repo(config, vreg)
- if protocol.startswith('zmqpickle-'):
- from cubicweb.zmqclient import ZMQRepositoryClient
- return ZMQRepositoryClient(uri)
- else:
- raise ConnectionError('unknown protocol: `%s`' % protocol)
+ raise ConnectionError('unknown protocol: `%s`' % protocol)
def connect(repo, login, **kwargs):
"""Take credential and return associated ClientConnection.
--- a/server/cwzmq.py Tue Mar 03 14:57:34 2015 +0100
+++ b/server/cwzmq.py Fri Mar 13 14:10:40 2015 +0100
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# copyright 2012-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -17,9 +17,6 @@
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-import cPickle
-import traceback
-from time import localtime, mktime
from threading import Thread
from logging import getLogger
@@ -29,59 +26,9 @@
from cubicweb import set_log_methods
+
ctx = zmq.Context()
-def cwproto_to_zmqaddr(address):
- """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>)
- into a proper zmq address (tcp://<ip>:<port>)
- """
- assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address
- return address.split('-', 1)[1] # chop the `zmqpickle-` prefix
-
-
-class Finished(Exception):
- """raise to remove an event from the event loop"""
-
-class TimeEvent:
- """base event"""
- # timefunc = staticmethod(localtime)
- timefunc = localtime
-
- def __init__(self, absolute=None, period=None):
- # local time tuple
- if absolute is None:
- absolute = self.timefunc()
- self.absolute = absolute
- # optional period in seconds
- self.period = period
-
- def is_ready(self):
- """return true if the event is ready to be fired"""
- now = self.timefunc()
- if self.absolute <= now:
- return True
- return False
-
- def fire(self, server):
- """fire the event
- must be overridden by concrete events
- """
- raise NotImplementedError()
-
- def update(self):
- """update the absolute date for the event or raise a finished exception
- """
- if self.period is None:
- raise Finished
- self.absolute = localtime(mktime(self.absolute) + self.period)
-
-
-class QuitEvent(TimeEvent):
- """stop the server"""
- def fire(self, server):
- server.repo.shutdown()
- server.quiting = True
-
class ZMQComm(object):
"""
@@ -179,132 +126,5 @@
self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
-class ZMQRepositoryServer(object):
-
- def __init__(self, repository):
- """make the repository available as a PyRO object"""
- self.address = None
- self.repo = repository
- self.socket = None
- self.stream = None
- self.loop = ioloop.IOLoop()
-
- # event queue
- self.events = []
-
- def connect(self, address):
- self.address = cwproto_to_zmqaddr(address)
-
- def run(self):
- """enter the service loop"""
- # start repository looping tasks
- self.socket = ctx.socket(zmq.REP)
- self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
- self.stream.bind(self.address)
- self.info('ZMQ server bound on: %s', self.address)
-
- self.stream.on_recv(self.process_cmds)
-
- try:
- self.loop.start()
- except zmq.ZMQError:
- self.warning('ZMQ event loop killed')
- self.quit()
-
- def trigger_events(self):
- """trigger ready events"""
- for event in self.events[:]:
- if event.is_ready():
- self.info('starting event %s', event)
- event.fire(self)
- try:
- event.update()
- except Finished:
- self.events.remove(event)
-
- def process_cmd(self, cmd):
- """Delegate the given command to the repository.
-
- ``cmd`` is a list of (method_name, args, kwargs)
- where ``args`` is a list of positional arguments
- and ``kwargs`` is a dictionnary of named arguments.
-
- >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
-
- :note1: ``kwargs`` may be ommited
-
- >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
-
- :note2: both ``args`` and ``kwargs`` may be omitted
-
- >>> schema = delegate_to_repo(["get_schema"])
- >>> schema = delegate_to_repo("get_schema") # also allowed
-
- """
- cmd = cPickle.loads(cmd)
- if not cmd:
- raise AttributeError('function name required')
- if isinstance(cmd, basestring):
- cmd = [cmd]
- if len(cmd) < 2:
- cmd.append(())
- if len(cmd) < 3:
- cmd.append({})
- cmd = list(cmd) + [(), {}]
- funcname, args, kwargs = cmd[:3]
- result = getattr(self.repo, funcname)(*args, **kwargs)
- return result
-
- def process_cmds(self, cmds):
- """Callback intended to be used with ``on_recv``.
-
- Call ``delegate_to_repo`` on each command and send a pickled of
- each result recursively.
-
- Any exception are catched, pickled and sent.
- """
- try:
- for cmd in cmds:
- result = self.process_cmd(cmd)
- self.send_data(result)
- except Exception as exc:
- traceback.print_exc()
- self.send_data(exc)
-
- def send_data(self, data):
- self.socket.send_pyobj(data)
-
- def quit(self, shutdown_repo=False):
- """stop the server"""
- self.info('Quitting ZMQ server')
- try:
- self.loop.add_callback(self.loop.stop)
- self.stream.on_recv(None)
- self.stream.close()
- except Exception as e:
- print e
- pass
- if shutdown_repo and not self.repo.shutting_down:
- event = QuitEvent()
- event.fire(self)
-
- # server utilitities ######################################################
-
- def install_sig_handlers(self):
- """install signal handlers"""
- import signal
- self.info('installing signal handlers')
- signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
- signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
-
-
- # these are overridden by set_log_methods below
- # only defining here to prevent pylint from complaining
- @classmethod
- def info(cls, msg, *a, **kw):
- pass
-
-
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
-set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))
--- a/server/serverconfig.py Tue Mar 03 14:57:34 2015 +0100
+++ b/server/serverconfig.py Fri Mar 13 14:10:40 2015 +0100
@@ -197,14 +197,6 @@
notified of every changes.',
'group': 'email', 'level': 2,
}),
- # zmq services config
- ('zmq-repository-address',
- {'type' : 'string',
- 'default': None,
- 'help': ('ZMQ URI on which the repository will be bound '
- 'to (of the form `zmqpickle-tcp://<ipaddr>:<port>`).'),
- 'group': 'zmq', 'level': 3,
- }),
('zmq-address-sub',
{'type' : 'csv',
'default' : None,
--- a/server/serverctl.py Tue Mar 03 14:57:34 2015 +0100
+++ b/server/serverctl.py Fri Mar 13 14:10:40 2015 +0100
@@ -38,7 +38,6 @@
from cubicweb.toolsutils import Command, CommandHandler, underline_title
from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand
from cubicweb.server import SOURCE_TYPES
-from cubicweb.server.repository import Repository
from cubicweb.server.serverconfig import (
USER_OPTIONS, ServerConfiguration, SourceConfiguration,
ask_source_config, generate_source_config)
@@ -676,72 +675,6 @@
cnx.close()
-class StartRepositoryCommand(Command):
- """Start a CubicWeb RQL server for a given instance.
-
- The server will be remotely accessible through ZMQ
-
- <instance>
- the identifier of the instance to initialize.
- """
- name = 'start-repository'
- arguments = '<instance>'
- min_args = max_args = 1
- options = (
- ('debug',
- {'short': 'D', 'action' : 'store_true',
- 'help': 'start server in debug mode.'}),
- ('loglevel',
- {'short': 'l', 'type' : 'choice', 'metavar': '<log level>',
- 'default': None, 'choices': ('debug', 'info', 'warning', 'error'),
- 'help': 'debug if -D is set, error otherwise',
- }),
- ('address',
- {'short': 'a', 'type': 'string', 'metavar': '<protocol>://<host>:<port>',
- 'default': '',
- 'help': ('specify a ZMQ URI on which to bind'),
- }),
- )
-
- def create_repo(self, config):
- address = self['address']
- if not address:
- address = config.get('zmq-repository-address')
- from cubicweb.server.utils import TasksManager
- from cubicweb.server.cwzmq import ZMQRepositoryServer
- repo = Repository(config, TasksManager())
- return ZMQRepositoryServer(repo), address
-
- def run(self, args):
- from logilab.common.daemon import daemonize, setugid
- from cubicweb.cwctl import init_cmdline_log_threshold
- print 'WARNING: Standalone repository with pyro or zmq access is deprecated'
- appid = args[0]
- debug = self['debug']
- if sys.platform == 'win32' and not debug:
- logger = logging.getLogger('cubicweb.ctl')
- logger.info('Forcing debug mode on win32 platform')
- debug = True
- config = ServerConfiguration.config_for(appid, debugmode=debug)
- init_cmdline_log_threshold(config, self['loglevel'])
- # create the server
- server, address = self.create_repo(config)
- # ensure the directory where the pid-file should be set exists (for
- # instance /var/run/cubicweb may be deleted on computer restart)
- pidfile = config['pid-file']
- piddir = os.path.dirname(pidfile)
- # go ! (don't daemonize in debug mode)
- if not os.path.exists(piddir):
- os.makedirs(piddir)
- if not debug and daemonize(pidfile, umask=config['umask']):
- return
- uid = config['uid']
- if uid is not None:
- setugid(uid)
- server.install_sig_handlers()
- server.connect(address)
- server.run()
-
def _remote_dump(host, appid, output, sudo=False):
# XXX generate unique/portable file name
@@ -1125,7 +1058,6 @@
for cmdclass in (CreateInstanceDBCommand, InitInstanceCommand,
GrantUserOnInstanceCommand, ResetAdminPasswordCommand,
- StartRepositoryCommand,
DBDumpCommand, DBRestoreCommand, DBCopyCommand,
AddSourceCommand, CheckRepositoryCommand, RebuildFTICommand,
SynchronizeSourceCommand, SchemaDiffCommand,
--- a/server/test/unittest_repository.py Tue Mar 03 14:57:34 2015 +0100
+++ b/server/test/unittest_repository.py Fri Mar 13 14:10:40 2015 +0100
@@ -31,10 +31,9 @@
UnknownEid, AuthenticationError, Unauthorized, QueryError)
from cubicweb.predicates import is_instance
from cubicweb.schema import RQLConstraint
-from cubicweb.dbapi import connect, multiple_connections_unfix
from cubicweb.devtools.testlib import CubicWebTC
from cubicweb.devtools.repotest import tuplify
-from cubicweb.server import repository, hook
+from cubicweb.server import hook
from cubicweb.server.sqlutils import SQL_PREFIX
from cubicweb.server.hook import Hook
from cubicweb.server.sources import native
@@ -312,64 +311,6 @@
ownedby = schema.rschema('owned_by')
self.assertEqual(ownedby.objects('CWEType'), ('CWUser',))
- def test_zmq(self):
- try:
- import zmq
- except ImportError:
- self.skipTest("zmq in not available")
- done = []
- from cubicweb.devtools import TestServerConfiguration as ServerConfiguration
- from cubicweb.server.cwzmq import ZMQRepositoryServer
- # the client part has to be in a thread due to sqlite limitations
- t = threading.Thread(target=self._zmq_client, args=(done,))
- t.start()
-
- zmq_server = ZMQRepositoryServer(self.repo)
- zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415')
-
- t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,))
- t2.start()
-
- zmq_server.run()
-
- t2.join(1)
- t.join(1)
-
- self.assertTrue(done[0])
-
- if t.isAlive():
- self.fail('something went wrong, thread still alive')
-
- def _zmq_quit(self, done, srv):
- while not done:
- time.sleep(0.1)
- srv.quit()
-
- def _zmq_client(self, done):
- try:
- cnx = connect('zmqpickle-tcp://127.0.0.1:41415', u'admin', password=u'gingkow',
- initlog=False) # don't reset logging configuration
- try:
- cnx.load_appobjects(subpath=('entities',))
- # check we can get the schema
- schema = cnx.get_schema()
- self.assertTrue(cnx.vreg)
- self.assertTrue('etypes'in cnx.vreg)
- cu = cnx.cursor()
- rset = cu.execute('Any U,G WHERE U in_group G')
- user = iter(rset.entities()).next()
- self.assertTrue(user._cw)
- self.assertTrue(user._cw.vreg)
- from cubicweb.entities import authobjs
- self.assertIsInstance(user._cw.user, authobjs.CWUser)
- cnx.close()
- done.append(True)
- finally:
- # connect monkey patch some method by default, remove them
- multiple_connections_unfix()
- finally:
- done.append(False)
-
def test_internal_api(self):
repo = self.repo
cnxid = repo.connect(self.admlogin, password=self.admpassword)
--- a/test/unittest_utils.py Tue Mar 03 14:57:34 2015 +0100
+++ b/test/unittest_utils.py Fri Mar 13 14:10:40 2015 +0100
@@ -58,8 +58,6 @@
parse_repo_uri('myapp'))
self.assertEqual(('inmemory', None, 'myapp'),
parse_repo_uri('inmemory://myapp'))
- self.assertEqual(('zmqpickle-tcp', '127.0.0.1:666', ''),
- parse_repo_uri('zmqpickle-tcp://127.0.0.1:666'))
with self.assertRaises(NotImplementedError):
parse_repo_uri('foo://bar')
--- a/utils.py Tue Mar 03 14:57:34 2015 +0100
+++ b/utils.py Fri Mar 13 14:10:40 2015 +0100
@@ -607,7 +607,6 @@
""" transform a command line uri into a (protocol, hostport, appid), e.g:
<myapp> -> 'inmemory', None, '<myapp>'
inmemory://<myapp> -> 'inmemory', None, '<myapp>'
- zmqpickle://[host][:port] -> 'zmqpickle', 'host:port', None
"""
parseduri = urlparse(uri)
scheme = parseduri.scheme
@@ -615,8 +614,6 @@
return ('inmemory', None, parseduri.path)
if scheme == 'inmemory':
return (scheme, None, parseduri.netloc)
- if scheme.startswith('zmqpickle-'):
- return (scheme, parseduri.netloc, parseduri.path)
raise NotImplementedError('URI protocol not implemented for `%s`' % uri)
--- a/zmqclient.py Tue Mar 03 14:57:34 2015 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,64 +0,0 @@
-# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
-# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
-#
-# This file is part of CubicWeb.
-#
-# CubicWeb is free software: you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation, either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
-# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License along
-# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-"""Source to query another RQL repository using ZMQ"""
-
-__docformat__ = "restructuredtext en"
-_ = unicode
-
-from functools import partial
-import zmq
-
-from cubicweb.server.cwzmq import cwproto_to_zmqaddr
-
-# XXX hack to overpass old zmq limitation that force to have
-# only one context per python process
-try:
- from cubicweb.server.cwzmq import ctx
-except ImportError:
- ctx = zmq.Context()
-
-class ZMQRepositoryClient(object):
- """
- This class delegates the overall repository stuff to a remote source.
-
- So calling a method of this repository will result on calling the
- corresponding method of the remote source repository.
-
- Any raised exception on the remote source is propagated locally.
-
- ZMQ is used as the transport layer and cPickle is used to serialize data.
- """
-
- def __init__(self, zmq_address):
- """A zmq address provided here will be like
- `zmqpickle-tcp://127.0.0.1:42000`. W
-
- We chop the prefix to get a real zmq address.
- """
- self.socket = ctx.socket(zmq.REQ)
- self.socket.connect(cwproto_to_zmqaddr(zmq_address))
-
- def __zmqcall__(self, name, *args, **kwargs):
- self.socket.send_pyobj([name, args, kwargs])
- result = self.socket.recv_pyobj()
- if isinstance(result, BaseException):
- raise result
- return result
-
- def __getattr__(self, name):
- return partial(self.__zmqcall__, name)