# -*- coding: utf-8 -*-# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr## This file is part of CubicWeb.## CubicWeb is free software: you can redistribute it and/or modify it under the# terms of the GNU Lesser General Public License as published by the Free# Software Foundation, either version 2.1 of the License, or (at your option)# any later version.## CubicWeb is distributed in the hope that it will be useful, but WITHOUT# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more# details.## You should have received a copy of the GNU Lesser General Public License along# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.fromthreadingimportThreadfromloggingimportgetLoggerimportzmqfromzmq.eventloopimportioloopimportzmq.eventloop.zmqstreamfromcubicwebimportset_log_methodsctx=zmq.Context()classZMQComm(object):""" A simple ZMQ-based notification bus. There should at most one instance of this class attached to a Repository. A typical usage may be something like:: def callback(msg): self.info('received message: %s', ' '.join(msg)) repo.app_instances_bus.subscribe('hello', callback) to subsribe to the 'hello' kind of message. On the other side, to emit a notification, call:: repo.app_instances_bus.publish(['hello', 'world']) See http://docs.cubicweb.org for more details. """def__init__(self):self.ioloop=ioloop.IOLoop()self._topics={}self._subscribers=[]self.publisher=Nonedefadd_publisher(self,address):assertself.publisherisNone,"more than one publisher is not supported"self.publisher=Publisher(self.ioloop,address)defadd_subscription(self,topic,callback):forsubscriberinself._subscribers:subscriber.subscribe(topic,callback)self._topics[topic]=callbackdefadd_subscriber(self,address):subscriber=Subscriber(self.ioloop,address)fortopic,callbackinself._topics.items():subscriber.subscribe(topic,callback)self._subscribers.append(subscriber)defpublish(self,msg):ifself.publisherisNone:returnself.publisher.send(msg)defstart(self):Thread(target=self.ioloop.start).start()defstop(self):self.ioloop.add_callback(self.ioloop.stop)def__del__(self):self.ioloop.close()classPublisher(object):def__init__(self,ioloop,address):self.address=addressself._topics={}self._subscribers=[]self.ioloop=ioloopdefcallback():s=ctx.socket(zmq.PUB)self.stream=zmq.eventloop.zmqstream.ZMQStream(s,io_loop=ioloop)self.stream.bind(self.address)self.debug('start publisher on %s',self.address)ioloop.add_callback(callback)defsend(self,msg):self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))classSubscriber(object):def__init__(self,ioloop,address):self.address=addressself.dispatch_table={}self.ioloop=ioloopdefcallback():s=ctx.socket(zmq.SUB)self.stream=zmq.eventloop.zmqstream.ZMQStream(s,io_loop=ioloop)self.stream.on_recv(self.dispatch)self.stream.connect(self.address)self.debug('start subscriber on %s',self.address)ioloop.add_callback(callback)defdispatch(self,msg):try:f=self.dispatch_table[msg[0]]exceptKeyError:returnf(msg)defsubscribe(self,topic,callback):self.dispatch_table[topic]=callbackself.ioloop.add_callback(lambda:self.stream.setsockopt(zmq.SUBSCRIBE,topic))set_log_methods(Publisher,getLogger('cubicweb.zmq.pub'))set_log_methods(Subscriber,getLogger('cubicweb.zmq.sub'))