author | Sylvain Thénault <sylvain.thenault@logilab.fr> |
Fri, 10 Feb 2012 17:31:56 +0100 | |
changeset 8219 | fb61698f93fc |
parent 8211 | 543e1579ba0d |
child 8350 | e1c05bf6fdeb |
permissions | -rw-r--r-- |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
1 |
# -*- coding: utf-8 -*- |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
2 |
# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
3 |
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
4 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
5 |
# This file is part of CubicWeb. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
6 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
7 |
# CubicWeb is free software: you can redistribute it and/or modify it under the |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
8 |
# terms of the GNU Lesser General Public License as published by the Free |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
9 |
# Software Foundation, either version 2.1 of the License, or (at your option) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
10 |
# any later version. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
11 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
12 |
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
14 |
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
15 |
# details. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
16 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
17 |
# You should have received a copy of the GNU Lesser General Public License along |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
18 |
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
19 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
20 |
from threading import Thread |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
21 |
import zmq |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
22 |
from zmq.eventloop import ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
23 |
import zmq.eventloop.zmqstream |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
24 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
25 |
from logging import getLogger |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
26 |
from cubicweb import set_log_methods |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
27 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
28 |
ctx = zmq.Context() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
29 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
30 |
class ZMQComm(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
31 |
def __init__(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
32 |
self.ioloop = ioloop.IOLoop() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
33 |
self._topics = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
34 |
self._subscribers = [] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
35 |
self.publisher = None |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
36 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
37 |
def add_publisher(self, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
38 |
assert self.publisher is None, "more than one publisher is not supported" |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
39 |
self.publisher = Publisher(self.ioloop, address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
40 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
41 |
def add_subscription(self, topic, callback): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
42 |
for subscriber in self._subscribers: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
43 |
subscriber.subscribe(topic, callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
44 |
self._topics[topic] = callback |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
45 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
46 |
def add_subscriber(self, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
47 |
subscriber = Subscriber(self.ioloop, address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
48 |
for topic, callback in self._topics.iteritems(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
49 |
subscriber.subscribe(topic, callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
50 |
self._subscribers.append(subscriber) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
51 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
52 |
def publish(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
53 |
assert self.publisher is not None, "can't publish without a publisher" |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
54 |
self.publisher.send(msg) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
55 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
56 |
def start(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
57 |
Thread(target=self.ioloop.start).start() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
58 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
59 |
def stop(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
60 |
self.ioloop.add_callback(self.ioloop.stop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
61 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
62 |
def __del__(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
63 |
self.ioloop.close() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
64 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
65 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
66 |
class Publisher(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
67 |
def __init__(self, ioloop, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
68 |
self.address = address |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
69 |
self._topics = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
70 |
self._subscribers = [] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
71 |
self.ioloop = ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
72 |
def callback(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
73 |
s = ctx.socket(zmq.PUB) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
74 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
75 |
self.stream.bind(self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
76 |
self.debug('start publisher on %s', self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
77 |
ioloop.add_callback(callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
78 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
79 |
def send(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
80 |
self.ioloop.add_callback(lambda:self.stream.send_multipart(msg)) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
81 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
82 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
83 |
class Subscriber(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
84 |
def __init__(self, ioloop, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
85 |
self.address = address |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
86 |
self.dispatch_table = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
87 |
self.ioloop = ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
88 |
def callback(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
89 |
s = ctx.socket(zmq.SUB) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
90 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
91 |
self.stream.on_recv(self.dispatch) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
92 |
self.stream.connect(self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
93 |
self.debug('start subscriber on %s', self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
94 |
ioloop.add_callback(callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
95 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
96 |
def dispatch(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
97 |
try: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
98 |
f = self.dispatch_table[msg[0]] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
99 |
except KeyError: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
100 |
return |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
101 |
f(msg) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
102 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
103 |
def subscribe(self, topic, callback): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
104 |
self.dispatch_table[topic] = callback |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
105 |
self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
106 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
107 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
108 |
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
109 |
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |