|
1 # -*- coding: utf-8 -*- |
|
2 # copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
14 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 |
|
20 from threading import Thread |
|
21 import zmq |
|
22 from zmq.eventloop import ioloop |
|
23 import zmq.eventloop.zmqstream |
|
24 |
|
25 from logging import getLogger |
|
26 from cubicweb import set_log_methods |
|
27 |
|
28 ctx = zmq.Context() |
|
29 |
|
30 class ZMQComm(object): |
|
31 def __init__(self): |
|
32 self.ioloop = ioloop.IOLoop() |
|
33 self._topics = {} |
|
34 self._subscribers = [] |
|
35 self.publisher = None |
|
36 |
|
37 def add_publisher(self, address): |
|
38 assert self.publisher is None, "more than one publisher is not supported" |
|
39 self.publisher = Publisher(self.ioloop, address) |
|
40 |
|
41 def add_subscription(self, topic, callback): |
|
42 for subscriber in self._subscribers: |
|
43 subscriber.subscribe(topic, callback) |
|
44 self._topics[topic] = callback |
|
45 |
|
46 def add_subscriber(self, address): |
|
47 subscriber = Subscriber(self.ioloop, address) |
|
48 for topic, callback in self._topics.iteritems(): |
|
49 subscriber.subscribe(topic, callback) |
|
50 self._subscribers.append(subscriber) |
|
51 |
|
52 def publish(self, msg): |
|
53 assert self.publisher is not None, "can't publish without a publisher" |
|
54 self.publisher.send(msg) |
|
55 |
|
56 def start(self): |
|
57 Thread(target=self.ioloop.start).start() |
|
58 |
|
59 def stop(self): |
|
60 self.ioloop.add_callback(self.ioloop.stop) |
|
61 |
|
62 def __del__(self): |
|
63 self.ioloop.close() |
|
64 |
|
65 |
|
66 class Publisher(object): |
|
67 def __init__(self, ioloop, address): |
|
68 self.address = address |
|
69 self._topics = {} |
|
70 self._subscribers = [] |
|
71 self.ioloop = ioloop |
|
72 def callback(): |
|
73 s = ctx.socket(zmq.PUB) |
|
74 self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
|
75 self.stream.bind(self.address) |
|
76 self.debug('start publisher on %s', self.address) |
|
77 ioloop.add_callback(callback) |
|
78 |
|
79 def send(self, msg): |
|
80 self.ioloop.add_callback(lambda:self.stream.send_multipart(msg)) |
|
81 |
|
82 |
|
83 class Subscriber(object): |
|
84 def __init__(self, ioloop, address): |
|
85 self.address = address |
|
86 self.dispatch_table = {} |
|
87 self.ioloop = ioloop |
|
88 def callback(): |
|
89 s = ctx.socket(zmq.SUB) |
|
90 self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
|
91 self.stream.on_recv(self.dispatch) |
|
92 self.stream.connect(self.address) |
|
93 self.debug('start subscriber on %s', self.address) |
|
94 ioloop.add_callback(callback) |
|
95 |
|
96 def dispatch(self, msg): |
|
97 try: |
|
98 f = self.dispatch_table[msg[0]] |
|
99 except KeyError: |
|
100 return |
|
101 f(msg) |
|
102 |
|
103 def subscribe(self, topic, callback): |
|
104 self.dispatch_table[topic] = callback |
|
105 self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
|
106 |
|
107 |
|
108 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
|
109 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |