|
1 # -*- coding: utf-8 -*- |
|
2 # copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
3 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
4 # |
|
5 # This file is part of CubicWeb. |
|
6 # |
|
7 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
8 # terms of the GNU Lesser General Public License as published by the Free |
|
9 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
10 # any later version. |
|
11 # |
|
12 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
13 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
14 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
15 # details. |
|
16 # |
|
17 # You should have received a copy of the GNU Lesser General Public License along |
|
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
19 |
|
20 from threading import Thread |
|
21 from logging import getLogger |
|
22 |
|
23 import zmq |
|
24 from zmq.eventloop import ioloop |
|
25 import zmq.eventloop.zmqstream |
|
26 |
|
27 from cubicweb import set_log_methods |
|
28 |
|
29 |
|
30 ctx = zmq.Context() |
|
31 |
|
32 |
|
33 class ZMQComm(object): |
|
34 """ |
|
35 A simple ZMQ-based notification bus. |
|
36 |
|
37 There should at most one instance of this class attached to a |
|
38 Repository. A typical usage may be something like:: |
|
39 |
|
40 def callback(msg): |
|
41 self.info('received message: %s', ' '.join(msg)) |
|
42 repo.app_instances_bus.subscribe('hello', callback) |
|
43 |
|
44 to subsribe to the 'hello' kind of message. On the other side, to |
|
45 emit a notification, call:: |
|
46 |
|
47 repo.app_instances_bus.publish(['hello', 'world']) |
|
48 |
|
49 See http://docs.cubicweb.org for more details. |
|
50 """ |
|
51 def __init__(self): |
|
52 self.ioloop = ioloop.IOLoop() |
|
53 self._topics = {} |
|
54 self._subscribers = [] |
|
55 self.publisher = None |
|
56 |
|
57 def add_publisher(self, address): |
|
58 assert self.publisher is None, "more than one publisher is not supported" |
|
59 self.publisher = Publisher(self.ioloop, address) |
|
60 |
|
61 def add_subscription(self, topic, callback): |
|
62 for subscriber in self._subscribers: |
|
63 subscriber.subscribe(topic, callback) |
|
64 self._topics[topic] = callback |
|
65 |
|
66 def add_subscriber(self, address): |
|
67 subscriber = Subscriber(self.ioloop, address) |
|
68 for topic, callback in self._topics.items(): |
|
69 subscriber.subscribe(topic, callback) |
|
70 self._subscribers.append(subscriber) |
|
71 |
|
72 def publish(self, msg): |
|
73 if self.publisher is None: |
|
74 return |
|
75 self.publisher.send(msg) |
|
76 |
|
77 def start(self): |
|
78 Thread(target=self.ioloop.start).start() |
|
79 |
|
80 def stop(self): |
|
81 self.ioloop.add_callback(self.ioloop.stop) |
|
82 |
|
83 def __del__(self): |
|
84 self.ioloop.close() |
|
85 |
|
86 |
|
87 class Publisher(object): |
|
88 def __init__(self, ioloop, address): |
|
89 self.address = address |
|
90 self._topics = {} |
|
91 self._subscribers = [] |
|
92 self.ioloop = ioloop |
|
93 def callback(): |
|
94 s = ctx.socket(zmq.PUB) |
|
95 self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
|
96 self.stream.bind(self.address) |
|
97 self.debug('start publisher on %s', self.address) |
|
98 ioloop.add_callback(callback) |
|
99 |
|
100 def send(self, msg): |
|
101 self.ioloop.add_callback(lambda:self.stream.send_multipart(msg)) |
|
102 |
|
103 |
|
104 class Subscriber(object): |
|
105 def __init__(self, ioloop, address): |
|
106 self.address = address |
|
107 self.dispatch_table = {} |
|
108 self.ioloop = ioloop |
|
109 def callback(): |
|
110 s = ctx.socket(zmq.SUB) |
|
111 self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
|
112 self.stream.on_recv(self.dispatch) |
|
113 self.stream.connect(self.address) |
|
114 self.debug('start subscriber on %s', self.address) |
|
115 ioloop.add_callback(callback) |
|
116 |
|
117 def dispatch(self, msg): |
|
118 try: |
|
119 f = self.dispatch_table[msg[0]] |
|
120 except KeyError: |
|
121 return |
|
122 f(msg) |
|
123 |
|
124 def subscribe(self, topic, callback): |
|
125 self.dispatch_table[topic] = callback |
|
126 self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
|
127 |
|
128 |
|
129 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
|
130 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |