hooks/zmq.py
author Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
Fri, 10 Feb 2012 16:20:35 +0100
changeset 8211 543e1579ba0d
child 8350 e1c05bf6fdeb
permissions -rw-r--r--
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq Each repo can have a publishing and any number of subscribing sockets whose addresses are specified in the instance's configuration. An application or cube can subscribe to some 'topics', and give a callback that gets called when a message matching that topic is received. As a proof of concept, this introduces a hook to clean up the caches associated with the repository when an entity is deleted. A subscription is added using Repository::zmq::add_subscription; the callback receives a list representing the received multi-part message as argument (the first element of the message is its topic).

# -*- coding: utf-8 -*-
# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
#
# CubicWeb is free software: you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 2.1 of the License, or (at your option)
# any later version.
#
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.

from cubicweb.server import hook

class ZMQStopHook(hook.Hook):
    __regid__ = 'zmqstop'
    events = ('server_shutdown',)

    def __call__(self):
        self.repo.app_instances_bus.stop()

class ZMQStartHook(hook.Hook):
    __regid__ = 'zmqstart'
    events = ('server_startup',)

    def __call__(self):
        config = self.repo.config
        address_pub = config.get('zmq-address-pub')
        if not address_pub:
            return
        from cubicweb.server import cwzmq
        self.repo.app_instances_bus = cwzmq.ZMQComm()
        self.repo.app_instances_bus.add_publisher(address_pub)
        def clear_cache_callback(msg):
            self.debug('clear_caches: %s', ' '.join(msg))
            self.repo.clear_caches(msg[1:])
        self.repo.app_instances_bus.add_subscription('delete', clear_cache_callback)
        for address in config.get('zmq-address-sub'):
            self.repo.app_instances_bus.add_subscriber(address)
        self.repo.app_instances_bus.start()