cubicweb/server/cwzmq.py
changeset 11057 0b59724cb3f2
parent 10662 10942ed172de
equal deleted inserted replaced
11052:058bb3dc685f 11057:0b59724cb3f2
       
     1 # -*- coding: utf-8 -*-
       
     2 # copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    14 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 
       
    20 from threading import Thread
       
    21 from logging import getLogger
       
    22 
       
    23 import zmq
       
    24 from zmq.eventloop import ioloop
       
    25 import zmq.eventloop.zmqstream
       
    26 
       
    27 from cubicweb import set_log_methods
       
    28 
       
    29 
       
    30 ctx = zmq.Context()
       
    31 
       
    32 
       
    33 class ZMQComm(object):
       
    34     """
       
    35     A simple ZMQ-based notification bus.
       
    36 
       
    37     There should at most one instance of this class attached to a
       
    38     Repository. A typical usage may be something like::
       
    39 
       
    40         def callback(msg):
       
    41             self.info('received message: %s', ' '.join(msg))
       
    42         repo.app_instances_bus.subscribe('hello', callback)
       
    43 
       
    44     to subsribe to the 'hello' kind of message. On the other side, to
       
    45     emit a notification, call::
       
    46 
       
    47        repo.app_instances_bus.publish(['hello', 'world'])
       
    48 
       
    49     See http://docs.cubicweb.org for more details.
       
    50     """
       
    51     def __init__(self):
       
    52         self.ioloop = ioloop.IOLoop()
       
    53         self._topics = {}
       
    54         self._subscribers = []
       
    55         self.publisher = None
       
    56 
       
    57     def add_publisher(self, address):
       
    58         assert self.publisher is None, "more than one publisher is not supported"
       
    59         self.publisher = Publisher(self.ioloop, address)
       
    60 
       
    61     def add_subscription(self, topic, callback):
       
    62         for subscriber in self._subscribers:
       
    63             subscriber.subscribe(topic, callback)
       
    64         self._topics[topic] = callback
       
    65 
       
    66     def add_subscriber(self, address):
       
    67         subscriber = Subscriber(self.ioloop, address)
       
    68         for topic, callback in self._topics.items():
       
    69             subscriber.subscribe(topic, callback)
       
    70         self._subscribers.append(subscriber)
       
    71 
       
    72     def publish(self, msg):
       
    73         if self.publisher is None:
       
    74             return
       
    75         self.publisher.send(msg)
       
    76 
       
    77     def start(self):
       
    78         Thread(target=self.ioloop.start).start()
       
    79 
       
    80     def stop(self):
       
    81         self.ioloop.add_callback(self.ioloop.stop)
       
    82 
       
    83     def __del__(self):
       
    84         self.ioloop.close()
       
    85 
       
    86 
       
    87 class Publisher(object):
       
    88     def __init__(self, ioloop, address):
       
    89         self.address = address
       
    90         self._topics = {}
       
    91         self._subscribers = []
       
    92         self.ioloop = ioloop
       
    93         def callback():
       
    94             s = ctx.socket(zmq.PUB)
       
    95             self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
       
    96             self.stream.bind(self.address)
       
    97             self.debug('start publisher on %s', self.address)
       
    98         ioloop.add_callback(callback)
       
    99 
       
   100     def send(self, msg):
       
   101         self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))
       
   102 
       
   103 
       
   104 class Subscriber(object):
       
   105     def __init__(self, ioloop, address):
       
   106         self.address = address
       
   107         self.dispatch_table = {}
       
   108         self.ioloop = ioloop
       
   109         def callback():
       
   110             s = ctx.socket(zmq.SUB)
       
   111             self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
       
   112             self.stream.on_recv(self.dispatch)
       
   113             self.stream.connect(self.address)
       
   114             self.debug('start subscriber on %s', self.address)
       
   115         ioloop.add_callback(callback)
       
   116 
       
   117     def dispatch(self, msg):
       
   118         try:
       
   119             f = self.dispatch_table[msg[0]]
       
   120         except KeyError:
       
   121             return
       
   122         f(msg)
       
   123 
       
   124     def subscribe(self, topic, callback):
       
   125         self.dispatch_table[topic] = callback
       
   126         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
       
   127 
       
   128 
       
   129 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
       
   130 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))