# HG changeset patch # User Aurelien Campeas # Date 1426252240 -3600 # Node ID ef3059a692cb92ae405daf6ad9243e645b64a972 # Parent 684215aca046b3b458e4cadc6b337f4a79a2dc12 Remove the remote repository-access-through-zmq support Modern methods such as the rqlcontroller cube + the cwclientlib library are the way forward. Closes #2919297. diff -r 684215aca046 -r ef3059a692cb dbapi.py --- a/dbapi.py Tue Mar 03 14:57:34 2015 +0100 +++ b/dbapi.py Fri Mar 13 14:10:40 2015 +0100 @@ -118,11 +118,8 @@ * a simple instance id for in-memory connection - * a uri like scheme://host:port/instanceid where scheme may be one of - 'inmemory' or 'zmqpickle' - - * if scheme is handled by ZMQ (eg 'tcp'), you should not specify an - instance id + * a uri like scheme://host:port/instanceid where scheme must be + 'inmemory' Other arguments: @@ -131,7 +128,7 @@ :cnxprops: a :class:`ConnectionProperties` instance, allowing to specify - the connection method (eg in memory or zmq). + the connection method (eg in memory). :setvreg: flag telling if a registry should be initialized for the connection. @@ -150,36 +147,18 @@ :kwargs: there goes authentication tokens. You usually have to specify a password for the given user, using a named 'password' argument. + """ if not urlparse(database).scheme: warn('[3.16] give an qualified URI as database instead of using ' 'host/cnxprops to specify the connection method', DeprecationWarning, stacklevel=2) - if cnxprops and cnxprops.cnxtype == 'zmq': - database = kwargs.pop('host') - elif cnxprops and cnxprops.cnxtype == 'inmemory': - database = 'inmemory://' + database puri = urlparse(database) method = puri.scheme.lower() - if method == 'inmemory': - config = cwconfig.instance_configuration(puri.netloc) - else: - config = cwconfig.CubicWebNoAppConfiguration() + assert method == 'inmemory' + config = cwconfig.instance_configuration(puri.netloc) repo = get_repository(database, config=config) - if method == 'inmemory': - vreg = repo.vreg - elif setvreg: - if mulcnx: - multiple_connections_fix() - vreg = cwvreg.CWRegistryStore(config, initlog=initlog) - schema = repo.get_schema() - for oldetype, newetype in ETYPE_NAME_MAP.items(): - if oldetype in schema: - print 'aliasing', newetype, 'to', oldetype - schema._entities[newetype] = schema._entities[oldetype] - vreg.set_schema(schema) - else: - vreg = None + vreg = repo.vreg cnx = _repo_connect(repo, login, cnxprops=cnxprops, **kwargs) cnx.vreg = vreg return cnx diff -r 684215aca046 -r ef3059a692cb doc/3.21.rst --- a/doc/3.21.rst Tue Mar 03 14:57:34 2015 +0100 +++ b/doc/3.21.rst Fri Mar 13 14:10:40 2015 +0100 @@ -15,6 +15,6 @@ * the user_callback api has been removed; people should use plain ajax functions instead -* the Pyro remote repository access method has been entirely removed - (emerging alternatives such as rqlcontroller and cwclientlib should - be used instead) +* the `Pyro` and `Zmq-pickle` remote repository access methods have + been entirely removed (emerging alternatives such as rqlcontroller + and cwclientlib should be used instead) diff -r 684215aca046 -r ef3059a692cb hooks/zmq.py --- a/hooks/zmq.py Tue Mar 03 14:57:34 2015 +0100 +++ b/hooks/zmq.py Fri Mar 13 14:10:40 2015 +0100 @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -50,30 +50,3 @@ self.repo.app_instances_bus.start() -class ZMQRepositoryServerStopHook(hook.Hook): - __regid__ = 'zmqrepositoryserverstop' - events = ('server_shutdown',) - - def __call__(self): - server = getattr(self.repo, 'zmq_repo_server', None) - if server: - self.repo.zmq_repo_server.quit() - -class ZMQRepositoryServerStartHook(hook.Hook): - __regid__ = 'zmqrepositoryserverstart' - events = ('server_startup',) - - def __call__(self): - config = self.repo.config - if config.name == 'repository': - # start-repository command already starts a zmq repo - return - address = config.get('zmq-repository-address') - if not address: - return - self.repo.warning('remote access to the repository via zmq/pickle is deprecated') - from cubicweb.server import cwzmq - self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo) - server.connect(address) - self.repo.threaded_task(server.run) - diff -r 684215aca046 -r ef3059a692cb repoapi.py --- a/repoapi.py Tue Mar 03 14:57:34 2015 +0100 +++ b/repoapi.py Fri Mar 13 14:10:40 2015 +0100 @@ -52,11 +52,7 @@ # me may have been called with a dummy 'inmemory://' uri ... return _get_inmemory_repo(config, vreg) - if protocol.startswith('zmqpickle-'): - from cubicweb.zmqclient import ZMQRepositoryClient - return ZMQRepositoryClient(uri) - else: - raise ConnectionError('unknown protocol: `%s`' % protocol) + raise ConnectionError('unknown protocol: `%s`' % protocol) def connect(repo, login, **kwargs): """Take credential and return associated ClientConnection. diff -r 684215aca046 -r ef3059a692cb server/cwzmq.py --- a/server/cwzmq.py Tue Mar 03 14:57:34 2015 +0100 +++ b/server/cwzmq.py Fri Mar 13 14:10:40 2015 +0100 @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# copyright 2012-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr # # This file is part of CubicWeb. @@ -17,9 +17,6 @@ # You should have received a copy of the GNU Lesser General Public License along # with CubicWeb. If not, see . -import cPickle -import traceback -from time import localtime, mktime from threading import Thread from logging import getLogger @@ -29,59 +26,9 @@ from cubicweb import set_log_methods + ctx = zmq.Context() -def cwproto_to_zmqaddr(address): - """ converts a cw-zmq address (like zmqpickle-tcp://:) - into a proper zmq address (tcp://:) - """ - assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address - return address.split('-', 1)[1] # chop the `zmqpickle-` prefix - - -class Finished(Exception): - """raise to remove an event from the event loop""" - -class TimeEvent: - """base event""" - # timefunc = staticmethod(localtime) - timefunc = localtime - - def __init__(self, absolute=None, period=None): - # local time tuple - if absolute is None: - absolute = self.timefunc() - self.absolute = absolute - # optional period in seconds - self.period = period - - def is_ready(self): - """return true if the event is ready to be fired""" - now = self.timefunc() - if self.absolute <= now: - return True - return False - - def fire(self, server): - """fire the event - must be overridden by concrete events - """ - raise NotImplementedError() - - def update(self): - """update the absolute date for the event or raise a finished exception - """ - if self.period is None: - raise Finished - self.absolute = localtime(mktime(self.absolute) + self.period) - - -class QuitEvent(TimeEvent): - """stop the server""" - def fire(self, server): - server.repo.shutdown() - server.quiting = True - class ZMQComm(object): """ @@ -179,132 +126,5 @@ self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) -class ZMQRepositoryServer(object): - - def __init__(self, repository): - """make the repository available as a PyRO object""" - self.address = None - self.repo = repository - self.socket = None - self.stream = None - self.loop = ioloop.IOLoop() - - # event queue - self.events = [] - - def connect(self, address): - self.address = cwproto_to_zmqaddr(address) - - def run(self): - """enter the service loop""" - # start repository looping tasks - self.socket = ctx.socket(zmq.REP) - self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop) - self.stream.bind(self.address) - self.info('ZMQ server bound on: %s', self.address) - - self.stream.on_recv(self.process_cmds) - - try: - self.loop.start() - except zmq.ZMQError: - self.warning('ZMQ event loop killed') - self.quit() - - def trigger_events(self): - """trigger ready events""" - for event in self.events[:]: - if event.is_ready(): - self.info('starting event %s', event) - event.fire(self) - try: - event.update() - except Finished: - self.events.remove(event) - - def process_cmd(self, cmd): - """Delegate the given command to the repository. - - ``cmd`` is a list of (method_name, args, kwargs) - where ``args`` is a list of positional arguments - and ``kwargs`` is a dictionnary of named arguments. - - >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}]) - - :note1: ``kwargs`` may be ommited - - >>> rset = delegate_to_repo(["execute", [sessionid, rql]]) - - :note2: both ``args`` and ``kwargs`` may be omitted - - >>> schema = delegate_to_repo(["get_schema"]) - >>> schema = delegate_to_repo("get_schema") # also allowed - - """ - cmd = cPickle.loads(cmd) - if not cmd: - raise AttributeError('function name required') - if isinstance(cmd, basestring): - cmd = [cmd] - if len(cmd) < 2: - cmd.append(()) - if len(cmd) < 3: - cmd.append({}) - cmd = list(cmd) + [(), {}] - funcname, args, kwargs = cmd[:3] - result = getattr(self.repo, funcname)(*args, **kwargs) - return result - - def process_cmds(self, cmds): - """Callback intended to be used with ``on_recv``. - - Call ``delegate_to_repo`` on each command and send a pickled of - each result recursively. - - Any exception are catched, pickled and sent. - """ - try: - for cmd in cmds: - result = self.process_cmd(cmd) - self.send_data(result) - except Exception as exc: - traceback.print_exc() - self.send_data(exc) - - def send_data(self, data): - self.socket.send_pyobj(data) - - def quit(self, shutdown_repo=False): - """stop the server""" - self.info('Quitting ZMQ server') - try: - self.loop.add_callback(self.loop.stop) - self.stream.on_recv(None) - self.stream.close() - except Exception as e: - print e - pass - if shutdown_repo and not self.repo.shutting_down: - event = QuitEvent() - event.fire(self) - - # server utilitities ###################################################### - - def install_sig_handlers(self): - """install signal handlers""" - import signal - self.info('installing signal handlers') - signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True)) - signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True)) - - - # these are overridden by set_log_methods below - # only defining here to prevent pylint from complaining - @classmethod - def info(cls, msg, *a, **kw): - pass - - set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) -set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo')) diff -r 684215aca046 -r ef3059a692cb server/serverconfig.py --- a/server/serverconfig.py Tue Mar 03 14:57:34 2015 +0100 +++ b/server/serverconfig.py Fri Mar 13 14:10:40 2015 +0100 @@ -197,14 +197,6 @@ notified of every changes.', 'group': 'email', 'level': 2, }), - # zmq services config - ('zmq-repository-address', - {'type' : 'string', - 'default': None, - 'help': ('ZMQ URI on which the repository will be bound ' - 'to (of the form `zmqpickle-tcp://:`).'), - 'group': 'zmq', 'level': 3, - }), ('zmq-address-sub', {'type' : 'csv', 'default' : None, diff -r 684215aca046 -r ef3059a692cb server/serverctl.py --- a/server/serverctl.py Tue Mar 03 14:57:34 2015 +0100 +++ b/server/serverctl.py Fri Mar 13 14:10:40 2015 +0100 @@ -38,7 +38,6 @@ from cubicweb.toolsutils import Command, CommandHandler, underline_title from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand from cubicweb.server import SOURCE_TYPES -from cubicweb.server.repository import Repository from cubicweb.server.serverconfig import ( USER_OPTIONS, ServerConfiguration, SourceConfiguration, ask_source_config, generate_source_config) @@ -676,72 +675,6 @@ cnx.close() -class StartRepositoryCommand(Command): - """Start a CubicWeb RQL server for a given instance. - - The server will be remotely accessible through ZMQ - - - the identifier of the instance to initialize. - """ - name = 'start-repository' - arguments = '' - min_args = max_args = 1 - options = ( - ('debug', - {'short': 'D', 'action' : 'store_true', - 'help': 'start server in debug mode.'}), - ('loglevel', - {'short': 'l', 'type' : 'choice', 'metavar': '', - 'default': None, 'choices': ('debug', 'info', 'warning', 'error'), - 'help': 'debug if -D is set, error otherwise', - }), - ('address', - {'short': 'a', 'type': 'string', 'metavar': '://:', - 'default': '', - 'help': ('specify a ZMQ URI on which to bind'), - }), - ) - - def create_repo(self, config): - address = self['address'] - if not address: - address = config.get('zmq-repository-address') - from cubicweb.server.utils import TasksManager - from cubicweb.server.cwzmq import ZMQRepositoryServer - repo = Repository(config, TasksManager()) - return ZMQRepositoryServer(repo), address - - def run(self, args): - from logilab.common.daemon import daemonize, setugid - from cubicweb.cwctl import init_cmdline_log_threshold - print 'WARNING: Standalone repository with pyro or zmq access is deprecated' - appid = args[0] - debug = self['debug'] - if sys.platform == 'win32' and not debug: - logger = logging.getLogger('cubicweb.ctl') - logger.info('Forcing debug mode on win32 platform') - debug = True - config = ServerConfiguration.config_for(appid, debugmode=debug) - init_cmdline_log_threshold(config, self['loglevel']) - # create the server - server, address = self.create_repo(config) - # ensure the directory where the pid-file should be set exists (for - # instance /var/run/cubicweb may be deleted on computer restart) - pidfile = config['pid-file'] - piddir = os.path.dirname(pidfile) - # go ! (don't daemonize in debug mode) - if not os.path.exists(piddir): - os.makedirs(piddir) - if not debug and daemonize(pidfile, umask=config['umask']): - return - uid = config['uid'] - if uid is not None: - setugid(uid) - server.install_sig_handlers() - server.connect(address) - server.run() - def _remote_dump(host, appid, output, sudo=False): # XXX generate unique/portable file name @@ -1125,7 +1058,6 @@ for cmdclass in (CreateInstanceDBCommand, InitInstanceCommand, GrantUserOnInstanceCommand, ResetAdminPasswordCommand, - StartRepositoryCommand, DBDumpCommand, DBRestoreCommand, DBCopyCommand, AddSourceCommand, CheckRepositoryCommand, RebuildFTICommand, SynchronizeSourceCommand, SchemaDiffCommand, diff -r 684215aca046 -r ef3059a692cb server/test/unittest_repository.py --- a/server/test/unittest_repository.py Tue Mar 03 14:57:34 2015 +0100 +++ b/server/test/unittest_repository.py Fri Mar 13 14:10:40 2015 +0100 @@ -31,10 +31,9 @@ UnknownEid, AuthenticationError, Unauthorized, QueryError) from cubicweb.predicates import is_instance from cubicweb.schema import RQLConstraint -from cubicweb.dbapi import connect, multiple_connections_unfix from cubicweb.devtools.testlib import CubicWebTC from cubicweb.devtools.repotest import tuplify -from cubicweb.server import repository, hook +from cubicweb.server import hook from cubicweb.server.sqlutils import SQL_PREFIX from cubicweb.server.hook import Hook from cubicweb.server.sources import native @@ -312,64 +311,6 @@ ownedby = schema.rschema('owned_by') self.assertEqual(ownedby.objects('CWEType'), ('CWUser',)) - def test_zmq(self): - try: - import zmq - except ImportError: - self.skipTest("zmq in not available") - done = [] - from cubicweb.devtools import TestServerConfiguration as ServerConfiguration - from cubicweb.server.cwzmq import ZMQRepositoryServer - # the client part has to be in a thread due to sqlite limitations - t = threading.Thread(target=self._zmq_client, args=(done,)) - t.start() - - zmq_server = ZMQRepositoryServer(self.repo) - zmq_server.connect('zmqpickle-tcp://127.0.0.1:41415') - - t2 = threading.Thread(target=self._zmq_quit, args=(done, zmq_server,)) - t2.start() - - zmq_server.run() - - t2.join(1) - t.join(1) - - self.assertTrue(done[0]) - - if t.isAlive(): - self.fail('something went wrong, thread still alive') - - def _zmq_quit(self, done, srv): - while not done: - time.sleep(0.1) - srv.quit() - - def _zmq_client(self, done): - try: - cnx = connect('zmqpickle-tcp://127.0.0.1:41415', u'admin', password=u'gingkow', - initlog=False) # don't reset logging configuration - try: - cnx.load_appobjects(subpath=('entities',)) - # check we can get the schema - schema = cnx.get_schema() - self.assertTrue(cnx.vreg) - self.assertTrue('etypes'in cnx.vreg) - cu = cnx.cursor() - rset = cu.execute('Any U,G WHERE U in_group G') - user = iter(rset.entities()).next() - self.assertTrue(user._cw) - self.assertTrue(user._cw.vreg) - from cubicweb.entities import authobjs - self.assertIsInstance(user._cw.user, authobjs.CWUser) - cnx.close() - done.append(True) - finally: - # connect monkey patch some method by default, remove them - multiple_connections_unfix() - finally: - done.append(False) - def test_internal_api(self): repo = self.repo cnxid = repo.connect(self.admlogin, password=self.admpassword) diff -r 684215aca046 -r ef3059a692cb test/unittest_utils.py --- a/test/unittest_utils.py Tue Mar 03 14:57:34 2015 +0100 +++ b/test/unittest_utils.py Fri Mar 13 14:10:40 2015 +0100 @@ -58,8 +58,6 @@ parse_repo_uri('myapp')) self.assertEqual(('inmemory', None, 'myapp'), parse_repo_uri('inmemory://myapp')) - self.assertEqual(('zmqpickle-tcp', '127.0.0.1:666', ''), - parse_repo_uri('zmqpickle-tcp://127.0.0.1:666')) with self.assertRaises(NotImplementedError): parse_repo_uri('foo://bar') diff -r 684215aca046 -r ef3059a692cb utils.py --- a/utils.py Tue Mar 03 14:57:34 2015 +0100 +++ b/utils.py Fri Mar 13 14:10:40 2015 +0100 @@ -607,7 +607,6 @@ """ transform a command line uri into a (protocol, hostport, appid), e.g: -> 'inmemory', None, '' inmemory:// -> 'inmemory', None, '' - zmqpickle://[host][:port] -> 'zmqpickle', 'host:port', None """ parseduri = urlparse(uri) scheme = parseduri.scheme @@ -615,8 +614,6 @@ return ('inmemory', None, parseduri.path) if scheme == 'inmemory': return (scheme, None, parseduri.netloc) - if scheme.startswith('zmqpickle-'): - return (scheme, parseduri.netloc, parseduri.path) raise NotImplementedError('URI protocol not implemented for `%s`' % uri) diff -r 684215aca046 -r ef3059a692cb zmqclient.py --- a/zmqclient.py Tue Mar 03 14:57:34 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,64 +0,0 @@ -# copyright 2003-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. -# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr -# -# This file is part of CubicWeb. -# -# CubicWeb is free software: you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation, either version 2.1 of the License, or (at your option) -# any later version. -# -# CubicWeb is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License along -# with CubicWeb. If not, see . -"""Source to query another RQL repository using ZMQ""" - -__docformat__ = "restructuredtext en" -_ = unicode - -from functools import partial -import zmq - -from cubicweb.server.cwzmq import cwproto_to_zmqaddr - -# XXX hack to overpass old zmq limitation that force to have -# only one context per python process -try: - from cubicweb.server.cwzmq import ctx -except ImportError: - ctx = zmq.Context() - -class ZMQRepositoryClient(object): - """ - This class delegates the overall repository stuff to a remote source. - - So calling a method of this repository will result on calling the - corresponding method of the remote source repository. - - Any raised exception on the remote source is propagated locally. - - ZMQ is used as the transport layer and cPickle is used to serialize data. - """ - - def __init__(self, zmq_address): - """A zmq address provided here will be like - `zmqpickle-tcp://127.0.0.1:42000`. W - - We chop the prefix to get a real zmq address. - """ - self.socket = ctx.socket(zmq.REQ) - self.socket.connect(cwproto_to_zmqaddr(zmq_address)) - - def __zmqcall__(self, name, *args, **kwargs): - self.socket.send_pyobj([name, args, kwargs]) - result = self.socket.recv_pyobj() - if isinstance(result, BaseException): - raise result - return result - - def __getattr__(self, name): - return partial(self.__zmqcall__, name)