server/cwzmq.py
author Sylvain Thénault <sylvain.thenault@logilab.fr>
Tue, 04 Sep 2012 06:09:17 +0200
branchstable
changeset 8529 1daea1f433c9
parent 8388 c6c624cea870
child 8614 52f576a7394c
permissions -rw-r--r--
[datafeed] make cnxset handling of datafeed source more robust currently we may run in some cases where the session has no more cnxset depending on errors and parser's handling of the cnxset. Also, free the cnxset and reacquire it later, letting a chance to other threads to run.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     1
# -*- coding: utf-8 -*-
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     2
# copyright 2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     3
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     4
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     5
# This file is part of CubicWeb.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     6
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     7
# CubicWeb is free software: you can redistribute it and/or modify it under the
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     8
# terms of the GNU Lesser General Public License as published by the Free
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
     9
# Software Foundation, either version 2.1 of the License, or (at your option)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    10
# any later version.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    11
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    12
# CubicWeb is distributed in the hope that it will be useful, but WITHOUT
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    13
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    14
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    15
# details.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    16
#
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    17
# You should have received a copy of the GNU Lesser General Public License along
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    18
# with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    19
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    20
from threading import Thread
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    21
import cPickle
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    22
import traceback
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    23
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    24
import zmq
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    25
from zmq.eventloop import ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    26
import zmq.eventloop.zmqstream
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    27
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    28
from logging import getLogger
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    29
from cubicweb import set_log_methods
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
    30
from cubicweb.server.server import QuitEvent
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    31
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    32
ctx = zmq.Context()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    33
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    34
class ZMQComm(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    35
    def __init__(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    36
        self.ioloop = ioloop.IOLoop()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    37
        self._topics = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    38
        self._subscribers = []
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    39
        self.publisher = None
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    40
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    41
    def add_publisher(self, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    42
        assert self.publisher is None, "more than one publisher is not supported"
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    43
        self.publisher = Publisher(self.ioloop, address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    44
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    45
    def add_subscription(self, topic, callback):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    46
        for subscriber in self._subscribers:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    47
            subscriber.subscribe(topic, callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    48
        self._topics[topic] = callback
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    49
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    50
    def add_subscriber(self, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    51
        subscriber = Subscriber(self.ioloop, address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    52
        for topic, callback in self._topics.iteritems():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    53
            subscriber.subscribe(topic, callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    54
        self._subscribers.append(subscriber)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    55
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    56
    def publish(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    57
        assert self.publisher is not None, "can't publish without a publisher"
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    58
        self.publisher.send(msg)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    59
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    60
    def start(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    61
        Thread(target=self.ioloop.start).start()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    62
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    63
    def stop(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    64
        self.ioloop.add_callback(self.ioloop.stop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    65
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    66
    def __del__(self):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    67
        self.ioloop.close()
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    68
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    69
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    70
class Publisher(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    71
    def __init__(self, ioloop, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    72
        self.address = address
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    73
        self._topics = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    74
        self._subscribers = []
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    75
        self.ioloop = ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    76
        def callback():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    77
            s = ctx.socket(zmq.PUB)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    78
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    79
            self.stream.bind(self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    80
            self.debug('start publisher on %s', self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    81
        ioloop.add_callback(callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    82
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    83
    def send(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    84
        self.ioloop.add_callback(lambda:self.stream.send_multipart(msg))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    85
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    86
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    87
class Subscriber(object):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    88
    def __init__(self, ioloop, address):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    89
        self.address = address
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    90
        self.dispatch_table = {}
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    91
        self.ioloop = ioloop
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    92
        def callback():
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    93
            s = ctx.socket(zmq.SUB)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    94
            self.stream = zmq.eventloop.zmqstream.ZMQStream(s, io_loop=ioloop)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    95
            self.stream.on_recv(self.dispatch)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    96
            self.stream.connect(self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    97
            self.debug('start subscriber on %s', self.address)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    98
        ioloop.add_callback(callback)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
    99
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   100
    def dispatch(self, msg):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   101
        try:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   102
            f = self.dispatch_table[msg[0]]
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   103
        except KeyError:
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   104
            return
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   105
        f(msg)
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   106
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   107
    def subscribe(self, topic, callback):
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   108
        self.dispatch_table[topic] = callback
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   109
        self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   110
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   111
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   112
class ZMQRepositoryServer(object):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   113
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   114
    def __init__(self, repository):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   115
        """make the repository available as a PyRO object"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   116
        self.address = None
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   117
        self.repo = repository
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   118
        self.socket = None
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   119
        self.stream = None
8388
c6c624cea870 [repo test] Avoid hangs in zmq repo unit test
Julien Cristau <julien.cristau@logilab.fr>
parents: 8350
diff changeset
   120
        self.loop = ioloop.IOLoop()
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   121
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   122
        # event queue
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   123
        self.events = []
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   124
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   125
    def connect(self, address):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   126
        self.address = address
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   127
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   128
    def run(self):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   129
        """enter the service loop"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   130
        # start repository looping tasks
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   131
        self.socket = ctx.socket(zmq.REP)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   132
        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   133
        self.stream.bind(self.address)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   134
        self.info('ZMQ server bound on: %s', self.address)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   135
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   136
        self.stream.on_recv(self.process_cmds)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   137
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   138
        try:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   139
            self.loop.start()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   140
        except zmq.ZMQError:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   141
            self.warning('ZMQ event loop killed')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   142
        self.quit()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   143
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   144
    def trigger_events(self):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   145
        """trigger ready events"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   146
        for event in self.events[:]:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   147
            if event.is_ready():
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   148
                self.info('starting event %s', event)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   149
                event.fire(self)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   150
                try:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   151
                    event.update()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   152
                except Finished:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   153
                    self.events.remove(event)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   154
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   155
    def process_cmd(self, cmd):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   156
        """Delegate the given command to the repository.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   157
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   158
        ``cmd`` is a list of (method_name, args, kwargs)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   159
        where ``args`` is a list of positional arguments
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   160
        and ``kwargs`` is a dictionnary of named arguments.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   161
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   162
        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   163
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   164
        :note1: ``kwargs`` may be ommited
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   165
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   166
            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   167
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   168
        :note2: both ``args`` and ``kwargs`` may be omitted
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   169
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   170
            >>> schema = delegate_to_repo(["get_schema"])
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   171
            >>> schema = delegate_to_repo("get_schema") # also allowed
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   172
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   173
        """
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   174
        cmd = cPickle.loads(cmd)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   175
        if not cmd:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   176
            raise AttributeError('function name required')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   177
        if isinstance(cmd, basestring):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   178
            cmd = [cmd]
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   179
        if len(cmd) < 2:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   180
            cmd.append(())
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   181
        if len(cmd) < 3:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   182
            cmd.append({})
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   183
        cmd  = list(cmd) + [(), {}]
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   184
        funcname, args, kwargs = cmd[:3]
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   185
        result = getattr(self.repo, funcname)(*args, **kwargs)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   186
        return result
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   187
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   188
    def process_cmds(self, cmds):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   189
        """Callback intended to be used with ``on_recv``.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   190
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   191
        Call ``delegate_to_repo`` on each command and send a pickled of
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   192
        each result recursively.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   193
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   194
        Any exception are catched, pickled and sent.
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   195
        """
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   196
        try:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   197
            for cmd in cmds:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   198
                result = self.process_cmd(cmd)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   199
                self.send_data(result)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   200
        except Exception, exc:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   201
            traceback.print_exc()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   202
            self.send_data(exc)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   203
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   204
    def send_data(self, data):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   205
        self.socket.send_pyobj(data)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   206
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   207
    def quit(self, shutdown_repo=False):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   208
        """stop the server"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   209
        self.info('Quitting ZMQ server')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   210
        try:
8388
c6c624cea870 [repo test] Avoid hangs in zmq repo unit test
Julien Cristau <julien.cristau@logilab.fr>
parents: 8350
diff changeset
   211
            self.loop.add_callback(self.loop.stop)
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   212
            self.stream.on_recv(None)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   213
            self.stream.close()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   214
        except Exception, e:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   215
            print e
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   216
            pass
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   217
        if shutdown_repo and not self.repo.shutting_down:
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   218
            event = QuitEvent()
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   219
            event.fire(self)
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   220
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   221
    # server utilitities ######################################################
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   222
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   223
    def install_sig_handlers(self):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   224
        """install signal handlers"""
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   225
        import signal
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   226
        self.info('installing signal handlers')
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   227
        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   228
        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   229
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   230
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   231
    # these are overridden by set_log_methods below
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   232
    # only defining here to prevent pylint from complaining
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   233
    @classmethod
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   234
    def info(cls, msg, *a, **kw):
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   235
        pass
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   236
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   237
8211
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   238
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
543e1579ba0d [repo] Add a publish/subscribe mechanism for inter-instance communication using zmq
Julien Cristau <julien.cristau@logilab.fr>, Quentin Roquefort <quentin@kpsule.me>
parents:
diff changeset
   239
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
8350
e1c05bf6fdeb [zmq] Implement a ZMQ-based Repository (closes #2290125)
David Douard <david.douard@logilab.fr>
parents: 8211
diff changeset
   240
set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))