--- a/server/cwzmq.py Tue Mar 03 14:57:34 2015 +0100
+++ b/server/cwzmq.py Fri Mar 13 14:10:40 2015 +0100
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# copyright 2012-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
# contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
#
# This file is part of CubicWeb.
@@ -17,9 +17,6 @@
# You should have received a copy of the GNU Lesser General Public License along
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
-import cPickle
-import traceback
-from time import localtime, mktime
from threading import Thread
from logging import getLogger
@@ -29,59 +26,9 @@
from cubicweb import set_log_methods
+
ctx = zmq.Context()
-def cwproto_to_zmqaddr(address):
- """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>)
- into a proper zmq address (tcp://<ip>:<port>)
- """
- assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address
- return address.split('-', 1)[1] # chop the `zmqpickle-` prefix
-
-
-class Finished(Exception):
- """raise to remove an event from the event loop"""
-
-class TimeEvent:
- """base event"""
- # timefunc = staticmethod(localtime)
- timefunc = localtime
-
- def __init__(self, absolute=None, period=None):
- # local time tuple
- if absolute is None:
- absolute = self.timefunc()
- self.absolute = absolute
- # optional period in seconds
- self.period = period
-
- def is_ready(self):
- """return true if the event is ready to be fired"""
- now = self.timefunc()
- if self.absolute <= now:
- return True
- return False
-
- def fire(self, server):
- """fire the event
- must be overridden by concrete events
- """
- raise NotImplementedError()
-
- def update(self):
- """update the absolute date for the event or raise a finished exception
- """
- if self.period is None:
- raise Finished
- self.absolute = localtime(mktime(self.absolute) + self.period)
-
-
-class QuitEvent(TimeEvent):
- """stop the server"""
- def fire(self, server):
- server.repo.shutdown()
- server.quiting = True
-
class ZMQComm(object):
"""
@@ -179,132 +126,5 @@
self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
-class ZMQRepositoryServer(object):
-
- def __init__(self, repository):
- """make the repository available as a PyRO object"""
- self.address = None
- self.repo = repository
- self.socket = None
- self.stream = None
- self.loop = ioloop.IOLoop()
-
- # event queue
- self.events = []
-
- def connect(self, address):
- self.address = cwproto_to_zmqaddr(address)
-
- def run(self):
- """enter the service loop"""
- # start repository looping tasks
- self.socket = ctx.socket(zmq.REP)
- self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
- self.stream.bind(self.address)
- self.info('ZMQ server bound on: %s', self.address)
-
- self.stream.on_recv(self.process_cmds)
-
- try:
- self.loop.start()
- except zmq.ZMQError:
- self.warning('ZMQ event loop killed')
- self.quit()
-
- def trigger_events(self):
- """trigger ready events"""
- for event in self.events[:]:
- if event.is_ready():
- self.info('starting event %s', event)
- event.fire(self)
- try:
- event.update()
- except Finished:
- self.events.remove(event)
-
- def process_cmd(self, cmd):
- """Delegate the given command to the repository.
-
- ``cmd`` is a list of (method_name, args, kwargs)
- where ``args`` is a list of positional arguments
- and ``kwargs`` is a dictionnary of named arguments.
-
- >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
-
- :note1: ``kwargs`` may be ommited
-
- >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
-
- :note2: both ``args`` and ``kwargs`` may be omitted
-
- >>> schema = delegate_to_repo(["get_schema"])
- >>> schema = delegate_to_repo("get_schema") # also allowed
-
- """
- cmd = cPickle.loads(cmd)
- if not cmd:
- raise AttributeError('function name required')
- if isinstance(cmd, basestring):
- cmd = [cmd]
- if len(cmd) < 2:
- cmd.append(())
- if len(cmd) < 3:
- cmd.append({})
- cmd = list(cmd) + [(), {}]
- funcname, args, kwargs = cmd[:3]
- result = getattr(self.repo, funcname)(*args, **kwargs)
- return result
-
- def process_cmds(self, cmds):
- """Callback intended to be used with ``on_recv``.
-
- Call ``delegate_to_repo`` on each command and send a pickled of
- each result recursively.
-
- Any exception are catched, pickled and sent.
- """
- try:
- for cmd in cmds:
- result = self.process_cmd(cmd)
- self.send_data(result)
- except Exception as exc:
- traceback.print_exc()
- self.send_data(exc)
-
- def send_data(self, data):
- self.socket.send_pyobj(data)
-
- def quit(self, shutdown_repo=False):
- """stop the server"""
- self.info('Quitting ZMQ server')
- try:
- self.loop.add_callback(self.loop.stop)
- self.stream.on_recv(None)
- self.stream.close()
- except Exception as e:
- print e
- pass
- if shutdown_repo and not self.repo.shutting_down:
- event = QuitEvent()
- event.fire(self)
-
- # server utilitities ######################################################
-
- def install_sig_handlers(self):
- """install signal handlers"""
- import signal
- self.info('installing signal handlers')
- signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
- signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
-
-
- # these are overridden by set_log_methods below
- # only defining here to prevent pylint from complaining
- @classmethod
- def info(cls, msg, *a, **kw):
- pass
-
-
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
-set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))