author | Julien Cristau <julien.cristau@logilab.fr> |
Wed, 22 Apr 2015 18:28:58 +0200 | |
changeset 10345 | ef54ea75a642 |
parent 10236 | ef3059a692cb |
child 10662 | 10942ed172de |
permissions | -rw-r--r-- |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
1 |
# -*- coding: utf-8 -*- |
10236
ef3059a692cb
Remove the remote repository-access-through-zmq support
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
10235
diff
changeset
|
2 |
# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
3 |
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
4 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
5 |
# This file is part of CubicWeb. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
6 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
7 |
# CubicWeb is free software: you can redistribute it and/or modify it under the |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
8 |
# terms of the GNU Lesser General Public License as published by the Free |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
9 |
# Software Foundation, either version 2.1 of the License, or (at your option) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
10 |
# any later version. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
11 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
12 |
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
14 |
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
15 |
# details. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
16 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
17 |
# You should have received a copy of the GNU Lesser General Public License along |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
18 |
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
19 |
|
9468
39b7a91a3f4c
[repo] pylint cleanup, mainly of imports, with a bit of style
Julien Cristau <julien.cristau@logilab.fr>
parents:
8982
diff
changeset
|
20 |
from threading import Thread |
39b7a91a3f4c
[repo] pylint cleanup, mainly of imports, with a bit of style
Julien Cristau <julien.cristau@logilab.fr>
parents:
8982
diff
changeset
|
21 |
from logging import getLogger |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
22 |
|
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
23 |
import zmq |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
24 |
from zmq.eventloop import ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
25 |
import zmq.eventloop.zmqstream |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
26 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
27 |
from cubicweb import set_log_methods |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
28 |
|
10236
ef3059a692cb
Remove the remote repository-access-through-zmq support
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
10235
diff
changeset
|
29 |
|
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
30 |
ctx = zmq.Context() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
31 |
|
10235
684215aca046
Remove remote repository-access-through-pyro support
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
9468
diff
changeset
|
32 |
|
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
33 |
class ZMQComm(object): |
8614
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
34 |
""" |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
35 |
A simple ZMQ-based notification bus. |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
36 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
37 |
There should at most one instance of this class attached to a |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
38 |
Repository. A typical usage may be something like:: |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
39 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
40 |
def callback(msg): |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
41 |
self.info('received message: %s', ' '.join(msg)) |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
42 |
repo.app_instances_bus.subscribe('hello', callback) |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
43 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
44 |
to subsribe to the 'hello' kind of message. On the other side, to |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
45 |
emit a notification, call:: |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
46 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
47 |
repo.app_instances_bus.publish(['hello', 'world']) |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
48 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
49 |
See http://docs.cubicweb.org for more details. |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
50 |
""" |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
51 |
def __init__(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
52 |
self.ioloop = ioloop.IOLoop() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
53 |
self._topics = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
54 |
self._subscribers = [] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
55 |
self.publisher = None |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
56 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
57 |
def add_publisher(self, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
58 |
assert self.publisher is None, "more than one publisher is not supported" |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
59 |
self.publisher = Publisher(self.ioloop, address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
60 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
61 |
def add_subscription(self, topic, callback): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
62 |
for subscriber in self._subscribers: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
63 |
subscriber.subscribe(topic, callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
64 |
self._topics[topic] = callback |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
65 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
66 |
def add_subscriber(self, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
67 |
subscriber = Subscriber(self.ioloop, address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
68 |
for topic, callback in self._topics.iteritems(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
69 |
subscriber.subscribe(topic, callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
70 |
self._subscribers.append(subscriber) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
71 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
72 |
def publish(self, msg): |
8982
6bc1c1b4473a
[zmq] make publish address optional
Julien Cristau <julien.cristau@logilab.fr>
parents:
8695
diff
changeset
|
73 |
if self.publisher is None: |
6bc1c1b4473a
[zmq] make publish address optional
Julien Cristau <julien.cristau@logilab.fr>
parents:
8695
diff
changeset
|
74 |
return |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
75 |
self.publisher.send(msg) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
76 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
77 |
def start(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
78 |
Thread(target=self.ioloop.start).start() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
79 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
80 |
def stop(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
81 |
self.ioloop.add_callback(self.ioloop.stop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
82 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
83 |
def __del__(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
84 |
self.ioloop.close() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
85 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
86 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
87 |
class Publisher(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
88 |
def __init__(self, ioloop, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
89 |
self.address = address |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
90 |
self._topics = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
91 |
self._subscribers = [] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
92 |
self.ioloop = ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
93 |
def callback(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
94 |
s = ctx.socket(zmq.PUB) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
95 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
96 |
self.stream.bind(self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
97 |
self.debug('start publisher on %s', self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
98 |
ioloop.add_callback(callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
99 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
100 |
def send(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
101 |
self.ioloop.add_callback(lambda:self.stream.send_multipart(msg)) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
102 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
103 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
104 |
class Subscriber(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
105 |
def __init__(self, ioloop, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
106 |
self.address = address |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
107 |
self.dispatch_table = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
108 |
self.ioloop = ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
109 |
def callback(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
110 |
s = ctx.socket(zmq.SUB) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
111 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
112 |
self.stream.on_recv(self.dispatch) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
113 |
self.stream.connect(self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
114 |
self.debug('start subscriber on %s', self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
115 |
ioloop.add_callback(callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
116 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
117 |
def dispatch(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
118 |
try: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
119 |
f = self.dispatch_table[msg[0]] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
120 |
except KeyError: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
121 |
return |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
122 |
f(msg) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
123 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
124 |
def subscribe(self, topic, callback): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
125 |
self.dispatch_table[topic] = callback |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
126 |
self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
127 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
128 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
129 |
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
130 |
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |