author | Rémi Cardona <remi.cardona@logilab.fr> |
Wed, 19 Feb 2014 11:01:19 +0100 | |
branch | stable |
changeset 9558 | 1a719ca9c585 |
parent 8982 | 6bc1c1b4473a |
child 9468 | 39b7a91a3f4c |
permissions | -rw-r--r-- |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
1 |
# -*- coding: utf-8 -*- |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
2 |
# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
3 |
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
4 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
5 |
# This file is part of CubicWeb. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
6 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
7 |
# CubicWeb is free software: you can redistribute it and/or modify it under the |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
8 |
# terms of the GNU Lesser General Public License as published by the Free |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
9 |
# Software Foundation, either version 2.1 of the License, or (at your option) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
10 |
# any later version. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
11 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
12 |
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
13 |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
14 |
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
15 |
# details. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
16 |
# |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
17 |
# You should have received a copy of the GNU Lesser General Public License along |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
18 |
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
19 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
20 |
from threading import Thread |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
21 |
import cPickle |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
22 |
import traceback |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
23 |
|
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
24 |
import zmq |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
25 |
from zmq.eventloop import ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
26 |
import zmq.eventloop.zmqstream |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
27 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
28 |
from logging import getLogger |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
29 |
from cubicweb import set_log_methods |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
30 |
from cubicweb.server.server import QuitEvent |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
31 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
32 |
ctx = zmq.Context() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
33 |
|
8670
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
34 |
def cwproto_to_zmqaddr(address): |
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
35 |
""" converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>) |
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
36 |
into a proper zmq address (tcp://<ip>:<port>) |
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
37 |
""" |
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
38 |
assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address |
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
39 |
return address.split('-', 1)[1] # chop the `zmqpickle-` prefix |
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
40 |
|
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
41 |
class ZMQComm(object): |
8614
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
42 |
""" |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
43 |
A simple ZMQ-based notification bus. |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
44 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
45 |
There should at most one instance of this class attached to a |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
46 |
Repository. A typical usage may be something like:: |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
47 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
48 |
def callback(msg): |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
49 |
self.info('received message: %s', ' '.join(msg)) |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
50 |
repo.app_instances_bus.subscribe('hello', callback) |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
51 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
52 |
to subsribe to the 'hello' kind of message. On the other side, to |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
53 |
emit a notification, call:: |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
54 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
55 |
repo.app_instances_bus.publish(['hello', 'world']) |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
56 |
|
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
57 |
See http://docs.cubicweb.org for more details. |
52f576a7394c
[zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents:
8388
diff
changeset
|
58 |
""" |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
59 |
def __init__(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
60 |
self.ioloop = ioloop.IOLoop() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
61 |
self._topics = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
62 |
self._subscribers = [] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
63 |
self.publisher = None |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
64 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
65 |
def add_publisher(self, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
66 |
assert self.publisher is None, "more than one publisher is not supported" |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
67 |
self.publisher = Publisher(self.ioloop, address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
68 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
69 |
def add_subscription(self, topic, callback): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
70 |
for subscriber in self._subscribers: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
71 |
subscriber.subscribe(topic, callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
72 |
self._topics[topic] = callback |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
73 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
74 |
def add_subscriber(self, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
75 |
subscriber = Subscriber(self.ioloop, address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
76 |
for topic, callback in self._topics.iteritems(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
77 |
subscriber.subscribe(topic, callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
78 |
self._subscribers.append(subscriber) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
79 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
80 |
def publish(self, msg): |
8982
6bc1c1b4473a
[zmq] make publish address optional
Julien Cristau <julien.cristau@logilab.fr>
parents:
8695
diff
changeset
|
81 |
if self.publisher is None: |
6bc1c1b4473a
[zmq] make publish address optional
Julien Cristau <julien.cristau@logilab.fr>
parents:
8695
diff
changeset
|
82 |
return |
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
83 |
self.publisher.send(msg) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
84 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
85 |
def start(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
86 |
Thread(target=self.ioloop.start).start() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
87 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
88 |
def stop(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
89 |
self.ioloop.add_callback(self.ioloop.stop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
90 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
91 |
def __del__(self): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
92 |
self.ioloop.close() |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
93 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
94 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
95 |
class Publisher(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
96 |
def __init__(self, ioloop, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
97 |
self.address = address |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
98 |
self._topics = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
99 |
self._subscribers = [] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
100 |
self.ioloop = ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
101 |
def callback(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
102 |
s = ctx.socket(zmq.PUB) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
103 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
104 |
self.stream.bind(self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
105 |
self.debug('start publisher on %s', self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
106 |
ioloop.add_callback(callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
107 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
108 |
def send(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
109 |
self.ioloop.add_callback(lambda:self.stream.send_multipart(msg)) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
110 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
111 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
112 |
class Subscriber(object): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
113 |
def __init__(self, ioloop, address): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
114 |
self.address = address |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
115 |
self.dispatch_table = {} |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
116 |
self.ioloop = ioloop |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
117 |
def callback(): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
118 |
s = ctx.socket(zmq.SUB) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
119 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
120 |
self.stream.on_recv(self.dispatch) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
121 |
self.stream.connect(self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
122 |
self.debug('start subscriber on %s', self.address) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
123 |
ioloop.add_callback(callback) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
124 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
125 |
def dispatch(self, msg): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
126 |
try: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
127 |
f = self.dispatch_table[msg[0]] |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
128 |
except KeyError: |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
129 |
return |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
130 |
f(msg) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
131 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
132 |
def subscribe(self, topic, callback): |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
133 |
self.dispatch_table[topic] = callback |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
134 |
self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
135 |
|
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
136 |
|
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
137 |
class ZMQRepositoryServer(object): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
138 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
139 |
def __init__(self, repository): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
140 |
"""make the repository available as a PyRO object""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
141 |
self.address = None |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
142 |
self.repo = repository |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
143 |
self.socket = None |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
144 |
self.stream = None |
8388
c6c624cea870
[repo test] Avoid hangs in zmq repo unit test
Julien Cristau <julien.cristau@logilab.fr>
parents:
8350
diff
changeset
|
145 |
self.loop = ioloop.IOLoop() |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
146 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
147 |
# event queue |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
148 |
self.events = [] |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
149 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
150 |
def connect(self, address): |
8670
f02139297beb
prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents:
8614
diff
changeset
|
151 |
self.address = cwproto_to_zmqaddr(address) |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
152 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
153 |
def run(self): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
154 |
"""enter the service loop""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
155 |
# start repository looping tasks |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
156 |
self.socket = ctx.socket(zmq.REP) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
157 |
self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
158 |
self.stream.bind(self.address) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
159 |
self.info('ZMQ server bound on: %s', self.address) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
160 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
161 |
self.stream.on_recv(self.process_cmds) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
162 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
163 |
try: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
164 |
self.loop.start() |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
165 |
except zmq.ZMQError: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
166 |
self.warning('ZMQ event loop killed') |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
167 |
self.quit() |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
168 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
169 |
def trigger_events(self): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
170 |
"""trigger ready events""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
171 |
for event in self.events[:]: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
172 |
if event.is_ready(): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
173 |
self.info('starting event %s', event) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
174 |
event.fire(self) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
175 |
try: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
176 |
event.update() |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
177 |
except Finished: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
178 |
self.events.remove(event) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
179 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
180 |
def process_cmd(self, cmd): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
181 |
"""Delegate the given command to the repository. |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
182 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
183 |
``cmd`` is a list of (method_name, args, kwargs) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
184 |
where ``args`` is a list of positional arguments |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
185 |
and ``kwargs`` is a dictionnary of named arguments. |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
186 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
187 |
>>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}]) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
188 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
189 |
:note1: ``kwargs`` may be ommited |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
190 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
191 |
>>> rset = delegate_to_repo(["execute", [sessionid, rql]]) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
192 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
193 |
:note2: both ``args`` and ``kwargs`` may be omitted |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
194 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
195 |
>>> schema = delegate_to_repo(["get_schema"]) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
196 |
>>> schema = delegate_to_repo("get_schema") # also allowed |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
197 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
198 |
""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
199 |
cmd = cPickle.loads(cmd) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
200 |
if not cmd: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
201 |
raise AttributeError('function name required') |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
202 |
if isinstance(cmd, basestring): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
203 |
cmd = [cmd] |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
204 |
if len(cmd) < 2: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
205 |
cmd.append(()) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
206 |
if len(cmd) < 3: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
207 |
cmd.append({}) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
208 |
cmd = list(cmd) + [(), {}] |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
209 |
funcname, args, kwargs = cmd[:3] |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
210 |
result = getattr(self.repo, funcname)(*args, **kwargs) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
211 |
return result |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
212 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
213 |
def process_cmds(self, cmds): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
214 |
"""Callback intended to be used with ``on_recv``. |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
215 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
216 |
Call ``delegate_to_repo`` on each command and send a pickled of |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
217 |
each result recursively. |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
218 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
219 |
Any exception are catched, pickled and sent. |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
220 |
""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
221 |
try: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
222 |
for cmd in cmds: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
223 |
result = self.process_cmd(cmd) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
224 |
self.send_data(result) |
8695
358d8bed9626
[toward-py3k] rewrite to "except AnException as exc:" (part of #2711624)
Nicolas Chauvat <nicolas.chauvat@logilab.fr>
parents:
8670
diff
changeset
|
225 |
except Exception as exc: |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
226 |
traceback.print_exc() |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
227 |
self.send_data(exc) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
228 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
229 |
def send_data(self, data): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
230 |
self.socket.send_pyobj(data) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
231 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
232 |
def quit(self, shutdown_repo=False): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
233 |
"""stop the server""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
234 |
self.info('Quitting ZMQ server') |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
235 |
try: |
8388
c6c624cea870
[repo test] Avoid hangs in zmq repo unit test
Julien Cristau <julien.cristau@logilab.fr>
parents:
8350
diff
changeset
|
236 |
self.loop.add_callback(self.loop.stop) |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
237 |
self.stream.on_recv(None) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
238 |
self.stream.close() |
8695
358d8bed9626
[toward-py3k] rewrite to "except AnException as exc:" (part of #2711624)
Nicolas Chauvat <nicolas.chauvat@logilab.fr>
parents:
8670
diff
changeset
|
239 |
except Exception as e: |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
240 |
print e |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
241 |
pass |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
242 |
if shutdown_repo and not self.repo.shutting_down: |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
243 |
event = QuitEvent() |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
244 |
event.fire(self) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
245 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
246 |
# server utilitities ###################################################### |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
247 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
248 |
def install_sig_handlers(self): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
249 |
"""install signal handlers""" |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
250 |
import signal |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
251 |
self.info('installing signal handlers') |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
252 |
signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True)) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
253 |
signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True)) |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
254 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
255 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
256 |
# these are overridden by set_log_methods below |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
257 |
# only defining here to prevent pylint from complaining |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
258 |
@classmethod |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
259 |
def info(cls, msg, *a, **kw): |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
260 |
pass |
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
261 |
|
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
262 |
|
8211
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
263 |
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
543e1579ba0d
[repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff
changeset
|
264 |
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |
8350
e1c05bf6fdeb
[zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents:
8211
diff
changeset
|
265 |
set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo')) |