# HG changeset patch # User Julien Cristau , Quentin Roquefort # Date 1328887235 -3600 # Node ID 543e1579ba0df8a02bffa72b47bfa7a97606155f # Parent 1d1cfc97f6b92e55b389a113910ad0be8f98df29 [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq Each repo can have a publishing and any number of subscribing sockets whose addresses are specified in the instance's configuration. An application or cube can subscribe to some 'topics', and give a callback that gets called when a message matching that topic is received. As a proof of concept, this introduces a hook to clean up the caches associated with the repository when an entity is deleted. A subscription is added using Repository::zmq::add_subscription; the callback receives a list representing the received multi-part message as argument (the first element of the message is its topic). diff -r 1d1cfc97f6b9 -r 543e1579ba0d __pkginfo__.py --- a/__pkginfo__.py Fri Feb 10 10:45:17 2012 +0100 +++ b/__pkginfo__.py Fri Feb 10 16:20:35 2012 +0100 @@ -63,6 +63,7 @@ 'fyzz': '>= 0.1.0', # for sparql 'vobject': '>= 0.6.0', # for ical view 'rdflib': None, # + 'pyzmq': None, #'Products.FCKeditor':'', #'SimpleTAL':'>= 4.1.6', } diff -r 1d1cfc97f6b9 -r 543e1579ba0d debian/control --- a/debian/control Fri Feb 10 10:45:17 2012 +0100 +++ b/debian/control Fri Feb 10 16:20:35 2012 +0100 @@ -37,6 +37,7 @@ Provides: cubicweb-multisources Depends: ${misc:Depends}, ${python:Depends}, cubicweb-common (= ${source:Version}), cubicweb-ctl (= ${source:Version}), python-logilab-database (>= 1.8.2), cubicweb-postgresql-support | cubicweb-mysql-support | python-pysqlite2 Recommends: pyro (<< 4.0.0), cubicweb-documentation (= ${source:Version}) +Suggests: python-zmq Description: server part of the CubicWeb framework CubicWeb is a semantic web application framework. . diff -r 1d1cfc97f6b9 -r 543e1579ba0d doc/book/en/devrepo/repo/hooks.rst --- a/doc/book/en/devrepo/repo/hooks.rst Fri Feb 10 10:45:17 2012 +0100 +++ b/doc/book/en/devrepo/repo/hooks.rst Fri Feb 10 16:20:35 2012 +0100 @@ -162,6 +162,44 @@ :ref:`adv_tuto_security_propagation`. +Inter-instance communication +---------------------------- + +If your application consists of several instances, you may need some means to +communicate between them. Cubicweb provides a publish/subscribe mechanism +using ØMQ_. In order to use it, use +:meth:`~cubicweb.server.cwzmq.ZMQComm.add_subscription` on the +`repo.app_instances_bus` object. The `callback` will get the message (as a +list). A message can be sent by calling +:meth:`~cubicweb.server.cwzmq.ZMQComm.publish` on `repo.app_instances_bus`. +The first element of the message is the topic which is used for filtering and +dispatching messages. + +.. _ØMQ: http://www.zeromq.org/ + +.. sourcecode:: python + + class FooHook(hook.Hook): + events = ('server_startup',) + __regid__ = 'foo_startup' + + def __call__(self): + def callback(msg): + self.info('received message: %s', ' '.join(msg)) + self.repo.app_instances_bus.subscribe('hello', callback) + +.. sourcecode:: python + + def do_foo(self): + actually_do_foo() + self._cw.repo.app_instances_bus.publish(['hello', 'world']) + +The `zmq-address-pub` configuration variable contains the address used +by the instance for sending messages, e.g. `tcp://*:1234`. The +`zmq-address-sub` variable contains a comma-separated list of addresses +to listen on, e.g. `tcp://localhost:1234, tcp://192.168.1.1:2345`. + + Hooks writing tips ------------------ diff -r 1d1cfc97f6b9 -r 543e1579ba0d hooks/zmq.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/hooks/zmq.py Fri Feb 10 16:20:35 2012 +0100 @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . + +from cubicweb.server import hook + +class ZMQStopHook(hook.Hook): + __regid__ = 'zmqstop' + events = ('server_shutdown',) + + def __call__(self): + self.repo.app_instances_bus.stop() + +class ZMQStartHook(hook.Hook): + __regid__ = 'zmqstart' + events = ('server_startup',) + + def __call__(self): + config = self.repo.config + address_pub = config.get('zmq-address-pub') + if not address_pub: + return + from cubicweb.server import cwzmq + self.repo.app_instances_bus = cwzmq.ZMQComm() + self.repo.app_instances_bus.add_publisher(address_pub) + def clear_cache_callback(msg): + self.debug('clear_caches: %s', ' '.join(msg)) + self.repo.clear_caches(msg[1:]) + self.repo.app_instances_bus.add_subscription('delete', clear_cache_callback) + for address in config.get('zmq-address-sub'): + self.repo.app_instances_bus.add_subscriber(address) + self.repo.app_instances_bus.start() + diff -r 1d1cfc97f6b9 -r 543e1579ba0d server/cwzmq.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/server/cwzmq.py Fri Feb 10 16:20:35 2012 +0100 @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. +# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr +# +# This file is part of CubicWeb. +# +# CubicWeb is free software: you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 2.1 of the License, or (at your option) +# any later version. +# +# CubicWeb is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with CubicWeb. If not, see . + +from threading import Thread +import zmq +from zmq.eventloop import ioloop +import zmq.eventloop.zmqstream + +from logging import getLogger +from cubicweb import set_log_methods + +ctx = zmq.Context() + +class ZMQComm(object): + def __init__(self): + self.ioloop = ioloop.IOLoop() + self._topics = {} + self._subscribers = [] + self.publisher = None + + def add_publisher(self, address): + assert self.publisher is None, "more than one publisher is not supported" + self.publisher = Publisher(self.ioloop, address) + + def add_subscription(self, topic, callback): + for subscriber in self._subscribers: + subscriber.subscribe(topic, callback) + self._topics[topic] = callback + + def add_subscriber(self, address): + subscriber = Subscriber(self.ioloop, address) + for topic, callback in self._topics.iteritems(): + subscriber.subscribe(topic, callback) + self._subscribers.append(subscriber) + + def publish(self, msg): + assert self.publisher is not None, "can't publish without a publisher" + self.publisher.send(msg) + + def start(self): + Thread(target=self.ioloop.start).start() + + def stop(self): + self.ioloop.add_callback(self.ioloop.stop) + + def __del__(self): + self.ioloop.close() + + +class Publisher(object): + def __init__(self, ioloop, address): + self.address = address + self._topics = {} + self._subscribers = [] + self.ioloop = ioloop + def callback(): + s = ctx.socket(zmq.PUB) + self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) + self.stream.bind(self.address) + self.debug('start publisher on %s', self.address) + ioloop.add_callback(callback) + + def send(self, msg): + self.ioloop.add_callback(lambda:self.stream.send_multipart(msg)) + + +class Subscriber(object): + def __init__(self, ioloop, address): + self.address = address + self.dispatch_table = {} + self.ioloop = ioloop + def callback(): + s = ctx.socket(zmq.SUB) + self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) + self.stream.on_recv(self.dispatch) + self.stream.connect(self.address) + self.debug('start subscriber on %s', self.address) + ioloop.add_callback(callback) + + def dispatch(self, msg): + try: + f = self.dispatch_table[msg[0]] + except KeyError: + return + f(msg) + + def subscribe(self, topic, callback): + self.dispatch_table[topic] = callback + self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) + + +set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) +set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) diff -r 1d1cfc97f6b9 -r 543e1579ba0d server/hook.py --- a/server/hook.py Fri Feb 10 10:45:17 2012 +0100 +++ b/server/hook.py Fri Feb 10 16:20:35 2012 +0100 @@ -1064,6 +1064,8 @@ remove inserted eid from repository type/source cache """ try: - self.session.repo.clear_caches(self.get_data()) + eids = self.get_data() + self.session.repo.clear_caches(eids) + self.session.repo.app_instances_bus.publish(['delete'] + list(str(eid) for eid in eids)) except KeyError: pass diff -r 1d1cfc97f6b9 -r 543e1579ba0d server/repository.py --- a/server/repository.py Fri Feb 10 10:45:17 2012 +0100 +++ b/server/repository.py Fri Feb 10 16:20:35 2012 +0100 @@ -120,6 +120,20 @@ {'x': eidfrom, 'y': eidto}) +class NullEventBus(object): + def send(self, msg): + pass + + def add_subscription(self, topic, callback): + pass + + def start(self): + pass + + def stop(self): + pass + + class Repository(object): """a repository provides access to a set of persistent storages for entities and relations @@ -134,6 +148,7 @@ self.vreg = vreg self.pyro_registered = False self.pyro_uri = None + self.app_instances_bus = NullEventBus() self.info('starting repository from %s', self.config.apphome) # dictionary of opened sessions self._sessions = {} diff -r 1d1cfc97f6b9 -r 543e1579ba0d server/serverconfig.py --- a/server/serverconfig.py Fri Feb 10 10:45:17 2012 +0100 +++ b/server/serverconfig.py Fri Feb 10 16:20:35 2012 +0100 @@ -207,6 +207,19 @@ and if not set, it will be choosen randomly', 'group': 'pyro', 'level': 3, }), + + ('zmq-address-sub', + {'type' : 'csv', + 'default' : None, + 'help': ('List of ZMQ addresses to subscribe to (requires pyzmq)'), + 'group': 'zmq', 'level': 1, + }), + ('zmq-address-pub', + {'type' : 'string', + 'default' : None, + 'help': ('ZMQ address to use for publishing (requires pyzmq)'), + 'group': 'zmq', 'level': 1, + }), ) + CubicWebConfiguration.options) # should we init the connections pool (eg connect to sources). This is