server/cwzmq.py
changeset 8211 543e1579ba0d
child 8350 e1c05bf6fdeb
equal deleted inserted replaced
8210:1d1cfc97f6b9 8211:543e1579ba0d
       
     1 # -*- coding: utf-8 -*-
       
     2 # copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
       
     3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
       
     4 #
       
     5 # This file is part of CubicWeb.
       
     6 #
       
     7 # CubicWeb is free software: you can redistribute it and/or modify it under the
       
     8 # terms of the GNU Lesser General Public License as published by the Free
       
     9 # Software Foundation, either version 2.1 of the License, or (at your option)
       
    10 # any later version.
       
    11 #
       
    12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT
       
    13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
       
    14 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
       
    15 # details.
       
    16 #
       
    17 # You should have received a copy of the GNU Lesser General Public License along
       
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
       
    19 
       
    20 from threading import Thread
       
    21 import zmq
       
    22 from zmq.eventloop import ioloop
       
    23 import zmq.eventloop.zmqstream
       
    24 
       
    25 from logging import getLogger
       
    26 from cubicweb import set_log_methods
       
    27 
       
    28 ctx = zmq.Context()
       
    29 
       
    30 class ZMQComm(object):
       
    31     def __init__(self):
       
    32         self.ioloop = ioloop.IOLoop()
       
    33         self._topics = {}
       
    34         self._subscribers = []
       
    35         self.publisher = None
       
    36 
       
    37     def add_publisher(self, address):
       
    38         assert self.publisher is None, "more than one publisher is not supported"
       
    39         self.publisher = Publisher(self.ioloop, address)
       
    40 
       
    41     def add_subscription(self, topic, callback):
       
    42         for subscriber in self._subscribers:
       
    43             subscriber.subscribe(topic, callback)
       
    44         self._topics[topic] = callback
       
    45 
       
    46     def add_subscriber(self, address):
       
    47         subscriber = Subscriber(self.ioloop, address)
       
    48         for topic, callback in self._topics.iteritems():
       
    49             subscriber.subscribe(topic, callback)
       
    50         self._subscribers.append(subscriber)
       
    51 
       
    52     def publish(self, msg):
       
    53         assert self.publisher is not None, "can't publish without a publisher"
       
    54         self.publisher.send(msg)
       
    55 
       
    56     def start(self):
       
    57         Thread(target=self.ioloop.start).start()
       
    58 
       
    59     def stop(self):
       
    60         self.ioloop.add_callback(self.ioloop.stop)
       
    61 
       
    62     def __del__(self):
       
    63         self.ioloop.close()
       
    64 
       
    65 
       
    66 class Publisher(object):
       
    67     def __init__(self, ioloop, address):
       
    68         self.address = address
       
    69         self._topics = {}
       
    70         self._subscribers = []
       
    71         self.ioloop = ioloop
       
    72         def callback():
       
    73             s = ctx.socket(zmq.PUB)
       
    74             self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
       
    75             self.stream.bind(self.address)
       
    76             self.debug('start publisher on %s', self.address)
       
    77         ioloop.add_callback(callback)
       
    78 
       
    79     def send(self, msg):
       
    80         self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))
       
    81 
       
    82 
       
    83 class Subscriber(object):
       
    84     def __init__(self, ioloop, address):
       
    85         self.address = address
       
    86         self.dispatch_table = {}
       
    87         self.ioloop = ioloop
       
    88         def callback():
       
    89             s = ctx.socket(zmq.SUB)
       
    90             self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
       
    91             self.stream.on_recv(self.dispatch)
       
    92             self.stream.connect(self.address)
       
    93             self.debug('start subscriber on %s', self.address)
       
    94         ioloop.add_callback(callback)
       
    95 
       
    96     def dispatch(self, msg):
       
    97         try:
       
    98             f = self.dispatch_table[msg[0]]
       
    99         except KeyError:
       
   100             return
       
   101         f(msg)
       
   102 
       
   103     def subscribe(self, topic, callback):
       
   104         self.dispatch_table[topic] = callback
       
   105         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
       
   106 
       
   107 
       
   108 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
       
   109 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))