server/cwzmq.py
changeset 10236 ef3059a692cb
parent 10235 684215aca046
child 10662 10942ed172de
--- a/server/cwzmq.py	Tue Mar 03 14:57:34 2015 +0100
+++ b/server/cwzmq.py	Fri Mar 13 14:10:40 2015 +0100
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# copyright 2012-2013 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# copyright 2012-2014 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
 #
 # This file is part of CubicWeb.
@@ -17,9 +17,6 @@
 # You should have received a copy of the GNU Lesser General Public License along
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 
-import cPickle
-import traceback
-from time import localtime, mktime
 from threading import Thread
 from logging import getLogger
 
@@ -29,59 +26,9 @@
 
 from cubicweb import set_log_methods
 
+
 ctx = zmq.Context()
 
-def cwproto_to_zmqaddr(address):
-    """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>)
-    into a proper zmq address (tcp://<ip>:<port>)
-    """
-    assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address
-    return address.split('-', 1)[1] # chop the `zmqpickle-` prefix
-
-
-class Finished(Exception):
-    """raise to remove an event from the event loop"""
-
-class TimeEvent:
-    """base event"""
-    # timefunc = staticmethod(localtime)
-    timefunc = localtime
-
-    def __init__(self, absolute=None, period=None):
-        # local time tuple
-        if absolute is None:
-            absolute = self.timefunc()
-        self.absolute = absolute
-        # optional period in seconds
-        self.period = period
-
-    def is_ready(self):
-        """return  true if the event is ready to be fired"""
-        now = self.timefunc()
-        if self.absolute <= now:
-            return True
-        return False
-
-    def fire(self, server):
-        """fire the event
-        must be overridden by concrete events
-        """
-        raise NotImplementedError()
-
-    def update(self):
-        """update the absolute date for the event or raise a finished exception
-        """
-        if self.period is None:
-            raise Finished
-        self.absolute = localtime(mktime(self.absolute) + self.period)
-
-
-class QuitEvent(TimeEvent):
-    """stop the server"""
-    def fire(self, server):
-        server.repo.shutdown()
-        server.quiting = True
-
 
 class ZMQComm(object):
     """
@@ -179,132 +126,5 @@
         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
 
 
-class ZMQRepositoryServer(object):
-
-    def __init__(self, repository):
-        """make the repository available as a PyRO object"""
-        self.address = None
-        self.repo = repository
-        self.socket = None
-        self.stream = None
-        self.loop = ioloop.IOLoop()
-
-        # event queue
-        self.events = []
-
-    def connect(self, address):
-        self.address = cwproto_to_zmqaddr(address)
-
-    def run(self):
-        """enter the service loop"""
-        # start repository looping tasks
-        self.socket = ctx.socket(zmq.REP)
-        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
-        self.stream.bind(self.address)
-        self.info('ZMQ server bound on: %s', self.address)
-
-        self.stream.on_recv(self.process_cmds)
-
-        try:
-            self.loop.start()
-        except zmq.ZMQError:
-            self.warning('ZMQ event loop killed')
-        self.quit()
-
-    def trigger_events(self):
-        """trigger ready events"""
-        for event in self.events[:]:
-            if event.is_ready():
-                self.info('starting event %s', event)
-                event.fire(self)
-                try:
-                    event.update()
-                except Finished:
-                    self.events.remove(event)
-
-    def process_cmd(self, cmd):
-        """Delegate the given command to the repository.
-
-        ``cmd`` is a list of (method_name, args, kwargs)
-        where ``args`` is a list of positional arguments
-        and ``kwargs`` is a dictionnary of named arguments.
-
-        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
-
-        :note1: ``kwargs`` may be ommited
-
-            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
-
-        :note2: both ``args`` and ``kwargs`` may be omitted
-
-            >>> schema = delegate_to_repo(["get_schema"])
-            >>> schema = delegate_to_repo("get_schema") # also allowed
-
-        """
-        cmd = cPickle.loads(cmd)
-        if not cmd:
-            raise AttributeError('function name required')
-        if isinstance(cmd, basestring):
-            cmd = [cmd]
-        if len(cmd) < 2:
-            cmd.append(())
-        if len(cmd) < 3:
-            cmd.append({})
-        cmd  = list(cmd) + [(), {}]
-        funcname, args, kwargs = cmd[:3]
-        result = getattr(self.repo, funcname)(*args, **kwargs)
-        return result
-
-    def process_cmds(self, cmds):
-        """Callback intended to be used with ``on_recv``.
-
-        Call ``delegate_to_repo`` on each command and send a pickled of
-        each result recursively.
-
-        Any exception are catched, pickled and sent.
-        """
-        try:
-            for cmd in cmds:
-                result = self.process_cmd(cmd)
-                self.send_data(result)
-        except Exception as exc:
-            traceback.print_exc()
-            self.send_data(exc)
-
-    def send_data(self, data):
-        self.socket.send_pyobj(data)
-
-    def quit(self, shutdown_repo=False):
-        """stop the server"""
-        self.info('Quitting ZMQ server')
-        try:
-            self.loop.add_callback(self.loop.stop)
-            self.stream.on_recv(None)
-            self.stream.close()
-        except Exception as e:
-            print e
-            pass
-        if shutdown_repo and not self.repo.shutting_down:
-            event = QuitEvent()
-            event.fire(self)
-
-    # server utilitities ######################################################
-
-    def install_sig_handlers(self):
-        """install signal handlers"""
-        import signal
-        self.info('installing signal handlers')
-        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
-        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
-
-
-    # these are overridden by set_log_methods below
-    # only defining here to prevent pylint from complaining
-    @classmethod
-    def info(cls, msg, *a, **kw):
-        pass
-
-
 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
-set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))