server/cwzmq.py
author David Douard <david.douard@logilab.fr>
Fri, 25 Jan 2013 17:56:15 +0100
changeset 8687 06b2951ee81d
parent 8670 f02139297beb
child 8695 358d8bed9626
permissions -rw-r--r--
Added tag cubicweb-debian-version-3.16.0-1 for changeset 853237d1daf6
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     1
# -*- coding: utf-8 -*-
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     2
# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     3
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     4
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     5
# This file is part of CubicWeb.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     6
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     7
# CubicWeb is free software: you can redistribute it and/or modify it under the
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     8
# terms of the GNU Lesser General Public License as published by the Free
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     9
# Software Foundation, either version 2.1 of the License, or (at your option)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    10
# any later version.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    11
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    13
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    14
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    15
# details.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    16
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    17
# You should have received a copy of the GNU Lesser General Public License along
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    18
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    19
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    20
from threading import Thread
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    21
import cPickle
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    22
import traceback
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    23
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    24
import zmq
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    25
from zmq.eventloop import ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    26
import zmq.eventloop.zmqstream
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    27
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    28
from logging import getLogger
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    29
from cubicweb import set_log_methods
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    30
from cubicweb.server.server import QuitEvent
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    31
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    32
ctx = zmq.Context()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    33
8670
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    34
def cwproto_to_zmqaddr(address):
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    35
    """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>)
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    36
    into a proper zmq address (tcp://<ip>:<port>)
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    37
    """
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    38
    assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    39
    return address.split('-', 1)[1] # chop the `zmqpickle-` prefix
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
    40
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    41
class ZMQComm(object):
8614
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    42
    """
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    43
    A simple ZMQ-based notification bus.
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    44
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    45
    There should at most one instance of this class attached to a
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    46
    Repository. A typical usage may be something like::
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    47
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    48
        def callback(msg):
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    49
            self.info('received message: %s', ' '.join(msg))
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    50
        repo.app_instances_bus.subscribe('hello', callback)
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    51
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    52
    to subsribe to the 'hello' kind of message. On the other side, to
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    53
    emit a notification, call::
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    54
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    55
       repo.app_instances_bus.publish(['hello', 'world'])
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    56
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    57
    See http://docs.cubicweb.org for more details.
52f576a7394c [zmq] add an introductive docstring on ZMQComm
David Douard <david.douard@logilab.fr>
parents: 8388
diff changeset
    58
    """
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    59
    def __init__(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    60
        self.ioloop = ioloop.IOLoop()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    61
        self._topics = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    62
        self._subscribers = []
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    63
        self.publisher = None
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    64
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    65
    def add_publisher(self, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    66
        assert self.publisher is None, "more than one publisher is not supported"
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    67
        self.publisher = Publisher(self.ioloop, address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    68
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    69
    def add_subscription(self, topic, callback):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    70
        for subscriber in self._subscribers:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    71
            subscriber.subscribe(topic, callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    72
        self._topics[topic] = callback
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    73
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    74
    def add_subscriber(self, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    75
        subscriber = Subscriber(self.ioloop, address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    76
        for topic, callback in self._topics.iteritems():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    77
            subscriber.subscribe(topic, callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    78
        self._subscribers.append(subscriber)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    79
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    80
    def publish(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    81
        assert self.publisher is not None, "can't publish without a publisher"
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    82
        self.publisher.send(msg)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    83
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    84
    def start(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    85
        Thread(target=self.ioloop.start).start()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    86
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    87
    def stop(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    88
        self.ioloop.add_callback(self.ioloop.stop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    89
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    90
    def __del__(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    91
        self.ioloop.close()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    92
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    93
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    94
class Publisher(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    95
    def __init__(self, ioloop, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    96
        self.address = address
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    97
        self._topics = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    98
        self._subscribers = []
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    99
        self.ioloop = ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   100
        def callback():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   101
            s = ctx.socket(zmq.PUB)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   102
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   103
            self.stream.bind(self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   104
            self.debug('start publisher on %s', self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   105
        ioloop.add_callback(callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   106
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   107
    def send(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   108
        self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   109
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   110
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   111
class Subscriber(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   112
    def __init__(self, ioloop, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   113
        self.address = address
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   114
        self.dispatch_table = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   115
        self.ioloop = ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   116
        def callback():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   117
            s = ctx.socket(zmq.SUB)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   118
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   119
            self.stream.on_recv(self.dispatch)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   120
            self.stream.connect(self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   121
            self.debug('start subscriber on %s', self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   122
        ioloop.add_callback(callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   123
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   124
    def dispatch(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   125
        try:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   126
            f = self.dispatch_table[msg[0]]
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   127
        except KeyError:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   128
            return
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   129
        f(msg)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   130
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   131
    def subscribe(self, topic, callback):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   132
        self.dispatch_table[topic] = callback
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   133
        self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   134
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   135
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   136
class ZMQRepositoryServer(object):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   137
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   138
    def __init__(self, repository):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   139
        """make the repository available as a PyRO object"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   140
        self.address = None
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   141
        self.repo = repository
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   142
        self.socket = None
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   143
        self.stream = None
8388
c6c624cea870 [repo test] Avoid hangs in zmq repo unit test
Julien Cristau <julien.cristau@logilab.fr>
parents: 8350
diff changeset
   144
        self.loop = ioloop.IOLoop()
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   145
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   146
        # event queue
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   147
        self.events = []
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   148
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   149
    def connect(self, address):
8670
f02139297beb prefix "tcp://" zmq uris with "zmqpickle" (closes #2574114)
Aurelien Campeas <aurelien.campeas@logilab.fr>
parents: 8614
diff changeset
   150
        self.address = cwproto_to_zmqaddr(address)
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   151
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   152
    def run(self):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   153
        """enter the service loop"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   154
        # start repository looping tasks
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   155
        self.socket = ctx.socket(zmq.REP)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   156
        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   157
        self.stream.bind(self.address)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   158
        self.info('ZMQ server bound on: %s', self.address)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   159
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   160
        self.stream.on_recv(self.process_cmds)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   161
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   162
        try:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   163
            self.loop.start()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   164
        except zmq.ZMQError:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   165
            self.warning('ZMQ event loop killed')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   166
        self.quit()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   167
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   168
    def trigger_events(self):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   169
        """trigger ready events"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   170
        for event in self.events[:]:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   171
            if event.is_ready():
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   172
                self.info('starting event %s', event)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   173
                event.fire(self)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   174
                try:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   175
                    event.update()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   176
                except Finished:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   177
                    self.events.remove(event)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   178
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   179
    def process_cmd(self, cmd):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   180
        """Delegate the given command to the repository.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   181
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   182
        ``cmd`` is a list of (method_name, args, kwargs)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   183
        where ``args`` is a list of positional arguments
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   184
        and ``kwargs`` is a dictionnary of named arguments.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   185
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   186
        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   187
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   188
        :note1: ``kwargs`` may be ommited
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   189
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   190
            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   191
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   192
        :note2: both ``args`` and ``kwargs`` may be omitted
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   193
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   194
            >>> schema = delegate_to_repo(["get_schema"])
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   195
            >>> schema = delegate_to_repo("get_schema") # also allowed
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   196
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   197
        """
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   198
        cmd = cPickle.loads(cmd)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   199
        if not cmd:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   200
            raise AttributeError('function name required')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   201
        if isinstance(cmd, basestring):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   202
            cmd = [cmd]
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   203
        if len(cmd) < 2:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   204
            cmd.append(())
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   205
        if len(cmd) < 3:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   206
            cmd.append({})
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   207
        cmd  = list(cmd) + [(), {}]
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   208
        funcname, args, kwargs = cmd[:3]
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   209
        result = getattr(self.repo, funcname)(*args, **kwargs)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   210
        return result
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   211
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   212
    def process_cmds(self, cmds):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   213
        """Callback intended to be used with ``on_recv``.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   214
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   215
        Call ``delegate_to_repo`` on each command and send a pickled of
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   216
        each result recursively.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   217
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   218
        Any exception are catched, pickled and sent.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   219
        """
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   220
        try:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   221
            for cmd in cmds:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   222
                result = self.process_cmd(cmd)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   223
                self.send_data(result)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   224
        except Exception, exc:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   225
            traceback.print_exc()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   226
            self.send_data(exc)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   227
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   228
    def send_data(self, data):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   229
        self.socket.send_pyobj(data)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   230
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   231
    def quit(self, shutdown_repo=False):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   232
        """stop the server"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   233
        self.info('Quitting ZMQ server')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   234
        try:
8388
c6c624cea870 [repo test] Avoid hangs in zmq repo unit test
Julien Cristau <julien.cristau@logilab.fr>
parents: 8350
diff changeset
   235
            self.loop.add_callback(self.loop.stop)
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   236
            self.stream.on_recv(None)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   237
            self.stream.close()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   238
        except Exception, e:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   239
            print e
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   240
            pass
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   241
        if shutdown_repo and not self.repo.shutting_down:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   242
            event = QuitEvent()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   243
            event.fire(self)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   244
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   245
    # server utilitities ######################################################
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   246
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   247
    def install_sig_handlers(self):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   248
        """install signal handlers"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   249
        import signal
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   250
        self.info('installing signal handlers')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   251
        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   252
        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   253
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   254
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   255
    # these are overridden by set_log_methods below
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   256
    # only defining here to prevent pylint from complaining
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   257
    @classmethod
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   258
    def info(cls, msg, *a, **kw):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   259
        pass
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   260
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   261
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   262
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   263
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   264
set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))