cubicweb/server/cwzmq.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Fri, 20 Jan 2017 18:17:04 +0100
changeset 11919 3a6746dfc57f
parent 11057 0b59724cb3f2
permissions -rw-r--r--
Change hooks control (deny_all_hooks_but / allow_all_hooks_but) to be more predictable Prior to this, if one execute code like: with cnx.hooks.deny_all_hooks_but('metadata'): with cnx.hooks.deny_all_hooks_but(): # mycode 'metadata' hooks will be activated anyway in the inner block, which is rather unexpected (of course in real life you only see the latest hooks control statement, the former being higher in the call stack). This is due to the underlying usage of old `enable_hook_categories` / `disable_hook_categories` methods, which were introduced much before the now official context manager based API (with `cnx.[deny|all]_all_hooks_but(...)`). To move on, this patch drop the two legacy methods, rename and privatize related internal state on the connection (`hooks_mode` becomes `_hooks_mode`, `disabled_hook_cats` and `enabled_hook_cats` become `_hooks_categories`) and reimplement the `_hooks_control` context manager to simply update them. See the added unit test for details.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     1
# -*- coding: utf-8 -*-
10236
ef3059a692cb Remove the remote repository-access-through-zmq support
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 10235
diff changeset
     2
# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     3
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     4
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     5
# This file is part of CubicWeb.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     6
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     7
# CubicWeb is free software: you can redistribute it and/or modify it under the
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     8
# terms of the GNU Lesser General Public License as published by the Free
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     9
# Software Foundation, either version 2.1 of the License, or (at your option)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    10
# any later version.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    11
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    13
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    14
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    15
# details.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    16
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    17
# You should have received a copy of the GNU Lesser General Public License along
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    18
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    19
9468
39b7a91a3f4c [repo] pylint cleanup, mainly of imports, with a bit of style
Julien Cristau <julien.cristau@logilab.fr>
parents: 8982
diff changeset
    20
from threading import Thread
39b7a91a3f4c [repo] pylint cleanup, mainly of imports, with a bit of style
Julien Cristau <julien.cristau@logilab.fr>
parents: 8982
diff changeset
    21
from logging import getLogger
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    22
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    23
import zmq
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    24
from zmq.eventloop import ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    25
import zmq.eventloop.zmqstream
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    26
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    27
from cubicweb import set_log_methods
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    28
10236
ef3059a692cb Remove the remote repository-access-through-zmq support
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 10235
diff changeset
    29
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    30
ctx = zmq.Context()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    31
10235
684215aca046 Remove remote repository-access-through-pyro support
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 9468
diff changeset
    32
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    33
class ZMQComm(object):
8614
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    34
    """
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    35
    A simple ZMQ-based notification bus.
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    36
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    37
    There should at most one instance of this class attached to a
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    38
    Repository. A typical usage may be something like::
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    39
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    40
        def callback(msg):
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    41
            self.info('received message: %s', ' '.join(msg))
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    42
        repo.app_instances_bus.subscribe('hello', callback)
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    43
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    44
    to subsribe to the 'hello' kind of message. On the other side, to
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    45
    emit a notification, call::
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    46
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    47
       repo.app_instances_bus.publish(['hello', 'world'])
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    48
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    49
    See http://docs.cubicweb.org for more details.
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    50
    """
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    51
    def __init__(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    52
        self.ioloop = ioloop.IOLoop()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    53
        self._topics = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    54
        self._subscribers = []
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    55
        self.publisher = None
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    56
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    57
    def add_publisher(self, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    58
        assert self.publisher is None, "more than one publisher is not supported"
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    59
        self.publisher = Publisher(self.ioloop, address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    60
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    61
    def add_subscription(self, topic, callback):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    62
        for subscriber in self._subscribers:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    63
            subscriber.subscribe(topic, callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    64
        self._topics[topic] = callback
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    65
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    66
    def add_subscriber(self, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    67
        subscriber = Subscriber(self.ioloop, address)
10662
10942ed172de [py3k] dict.iteritems → dict.items
Rémi Cardona <remi.cardona@logilab.fr>
parents: 10236
diff changeset
    68
        for topic, callback in self._topics.items():
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    69
            subscriber.subscribe(topic, callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    70
        self._subscribers.append(subscriber)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    71
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    72
    def publish(self, msg):
8982
6bc1c1b4473a [zmq] make publish address optional
Julien Cristau <julien.cristau@logilab.fr>
parents: 8695
diff changeset
    73
        if self.publisher is None:
6bc1c1b4473a [zmq] make publish address optional
Julien Cristau <julien.cristau@logilab.fr>
parents: 8695
diff changeset
    74
            return
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    75
        self.publisher.send(msg)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    76
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    77
    def start(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    78
        Thread(target=self.ioloop.start).start()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    79
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    80
    def stop(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    81
        self.ioloop.add_callback(self.ioloop.stop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    82
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    83
    def __del__(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    84
        self.ioloop.close()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    85
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    86
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    87
class Publisher(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    88
    def __init__(self, ioloop, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    89
        self.address = address
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    90
        self._topics = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    91
        self._subscribers = []
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    92
        self.ioloop = ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    93
        def callback():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    94
            s = ctx.socket(zmq.PUB)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    95
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    96
            self.stream.bind(self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    97
            self.debug('start publisher on %s', self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    98
        ioloop.add_callback(callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    99
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   100
    def send(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   101
        self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   102
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   103
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   104
class Subscriber(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   105
    def __init__(self, ioloop, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   106
        self.address = address
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   107
        self.dispatch_table = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   108
        self.ioloop = ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   109
        def callback():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   110
            s = ctx.socket(zmq.SUB)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   111
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   112
            self.stream.on_recv(self.dispatch)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   113
            self.stream.connect(self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   114
            self.debug('start subscriber on %s', self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   115
        ioloop.add_callback(callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   116
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   117
    def dispatch(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   118
        try:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   119
            f = self.dispatch_table[msg[0]]
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   120
        except KeyError:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   121
            return
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   122
        f(msg)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   123
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   124
    def subscribe(self, topic, callback):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   125
        self.dispatch_table[topic] = callback
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   126
        self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   127
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   128
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   129
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   130
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))