server/cwzmq.py
author Julien Cristau <julien.cristau@logilab.fr>
Thu, 23 May 2013 12:35:02 +0200
branchstable
changeset 8982 6bc1c1b4473a
parent 8695 358d8bed9626
child 9468 39b7a91a3f4c
permissions -rw-r--r--
[zmq] make publish address optional It should be possible to use the zmq communication bus in a read-only manner. Allow setting zmq-address-sub without zmq-address-pub. Closes #2897178.

# -*- coding: utf-8 -*-
# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

from threading import Thread
import cPickle
import traceback

import zmq
from zmq.eventloop import ioloop
import zmq.eventloop.zmqstream

from logging import getLogger
from cubicweb import set_log_methods
from cubicweb.server.server import QuitEvent

ctx = zmq.Context()

def cwproto_to_zmqaddr(address):
    """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>)
    into a proper zmq address (tcp://<ip>:<port>)
    """
    assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address
    return address.split('-', 1)[1] # chop the `zmqpickle-` prefix

class ZMQComm(object):
    """
    A simple ZMQ-based notification bus.

    There should at most one instance of this class attached to a
    Repository. A typical usage may be something like::

        def callback(msg):
            self.info('received message: %s', ' '.join(msg))
        repo.app_instances_bus.subscribe('hello', callback)

    to subsribe to the 'hello' kind of message. On the other side, to
    emit a notification, call::

       repo.app_instances_bus.publish(['hello', 'world'])

    See http://docs.cubicweb.org for more details.
    """
    def __init__(self):
        self.ioloop = ioloop.IOLoop()
        self._topics = {}
        self._subscribers = []
        self.publisher = None

    def add_publisher(self, address):
        assert self.publisher is None, "more than one publisher is not supported"
        self.publisher = Publisher(self.ioloop, address)

    def add_subscription(self, topic, callback):
        for subscriber in self._subscribers:
            subscriber.subscribe(topic, callback)
        self._topics[topic] = callback

    def add_subscriber(self, address):
        subscriber = Subscriber(self.ioloop, address)
        for topic, callback in self._topics.iteritems():
            subscriber.subscribe(topic, callback)
        self._subscribers.append(subscriber)

    def publish(self, msg):
        if self.publisher is None:
            return
        self.publisher.send(msg)

    def start(self):
        Thread(target=self.ioloop.start).start()

    def stop(self):
        self.ioloop.add_callback(self.ioloop.stop)

    def __del__(self):
        self.ioloop.close()


class Publisher(object):
    def __init__(self, ioloop, address):
        self.address = address
        self._topics = {}
        self._subscribers = []
        self.ioloop = ioloop
        def callback():
            s = ctx.socket(zmq.PUB)
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
            self.stream.bind(self.address)
            self.debug('start publisher on %s', self.address)
        ioloop.add_callback(callback)

    def send(self, msg):
        self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))


class Subscriber(object):
    def __init__(self, ioloop, address):
        self.address = address
        self.dispatch_table = {}
        self.ioloop = ioloop
        def callback():
            s = ctx.socket(zmq.SUB)
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
            self.stream.on_recv(self.dispatch)
            self.stream.connect(self.address)
            self.debug('start subscriber on %s', self.address)
        ioloop.add_callback(callback)

    def dispatch(self, msg):
        try:
            f = self.dispatch_table[msg[0]]
        except KeyError:
            return
        f(msg)

    def subscribe(self, topic, callback):
        self.dispatch_table[topic] = callback
        self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))


class ZMQRepositoryServer(object):

    def __init__(self, repository):
        """make the repository available as a PyRO object"""
        self.address = None
        self.repo = repository
        self.socket = None
        self.stream = None
        self.loop = ioloop.IOLoop()

        # event queue
        self.events = []

    def connect(self, address):
        self.address = cwproto_to_zmqaddr(address)

    def run(self):
        """enter the service loop"""
        # start repository looping tasks
        self.socket = ctx.socket(zmq.REP)
        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
        self.stream.bind(self.address)
        self.info('ZMQ server bound on: %s', self.address)

        self.stream.on_recv(self.process_cmds)

        try:
            self.loop.start()
        except zmq.ZMQError:
            self.warning('ZMQ event loop killed')
        self.quit()

    def trigger_events(self):
        """trigger ready events"""
        for event in self.events[:]:
            if event.is_ready():
                self.info('starting event %s', event)
                event.fire(self)
                try:
                    event.update()
                except Finished:
                    self.events.remove(event)

    def process_cmd(self, cmd):
        """Delegate the given command to the repository.

        ``cmd`` is a list of (method_name, args, kwargs)
        where ``args`` is a list of positional arguments
        and ``kwargs`` is a dictionnary of named arguments.

        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])

        :note1: ``kwargs`` may be ommited

            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])

        :note2: both ``args`` and ``kwargs`` may be omitted

            >>> schema = delegate_to_repo(["get_schema"])
            >>> schema = delegate_to_repo("get_schema") # also allowed

        """
        cmd = cPickle.loads(cmd)
        if not cmd:
            raise AttributeError('function name required')
        if isinstance(cmd, basestring):
            cmd = [cmd]
        if len(cmd) < 2:
            cmd.append(())
        if len(cmd) < 3:
            cmd.append({})
        cmd  = list(cmd) + [(), {}]
        funcname, args, kwargs = cmd[:3]
        result = getattr(self.repo, funcname)(*args, **kwargs)
        return result

    def process_cmds(self, cmds):
        """Callback intended to be used with ``on_recv``.

        Call ``delegate_to_repo`` on each command and send a pickled of
        each result recursively.

        Any exception are catched, pickled and sent.
        """
        try:
            for cmd in cmds:
                result = self.process_cmd(cmd)
                self.send_data(result)
        except Exception as exc:
            traceback.print_exc()
            self.send_data(exc)

    def send_data(self, data):
        self.socket.send_pyobj(data)

    def quit(self, shutdown_repo=False):
        """stop the server"""
        self.info('Quitting ZMQ server')
        try:
            self.loop.add_callback(self.loop.stop)
            self.stream.on_recv(None)
            self.stream.close()
        except Exception as e:
            print e
            pass
        if shutdown_repo and not self.repo.shutting_down:
            event = QuitEvent()
            event.fire(self)

    # server utilitities ######################################################

    def install_sig_handlers(self):
        """install signal handlers"""
        import signal
        self.info('installing signal handlers')
        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))


    # these are overridden by set_log_methods below
    # only defining here to prevent pylint from complaining
    @classmethod
    def info(cls, msg, *a, **kw):
        pass


set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))