[zmq] Implement a ZMQ-based Repository (closes #2290125)
authorDavid Douard <david.douard@logilab.fr>
Wed, 04 Apr 2012 16:51:09 +0200
changeset 8350 e1c05bf6fdeb
parent 8349 fdb796435d7b
child 8351 02f4f01375e8
[zmq] Implement a ZMQ-based Repository (closes #2290125) This Repository server behave mainly like the Pyro-based repository.
hooks/zmq.py
server/cwzmq.py
server/serverconfig.py
server/serverctl.py
--- a/hooks/zmq.py	Tue Apr 10 17:03:19 2012 +0200
+++ b/hooks/zmq.py	Wed Apr 04 16:51:09 2012 +0200
@@ -46,3 +46,30 @@
             self.repo.app_instances_bus.add_subscriber(address)
         self.repo.app_instances_bus.start()
 
+
+class ZMQRepositoryServerStopHook(hook.Hook):
+    __regid__ = 'zmqrepositoryserverstop'
+    events = ('server_shutdown',)
+
+    def __call__(self):
+        server = getattr(self.repo, 'zmq_repo_server', None)
+        if server:
+            self.repo.zmq_repo_server.quit()
+
+class ZMQRepositoryServerStartHook(hook.Hook):
+    __regid__ = 'zmqrepositoryserverstart'
+    events = ('server_startup',)
+
+    def __call__(self):
+        config = self.repo.config
+        if config.name == 'repository':
+            # start-repository command already starts a zmq repo
+            return
+        address = config.get('zmq-repository-address')
+        if not address:
+            return
+        from cubicweb.server import cwzmq
+        self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo)
+        server.connect(address)
+        self.repo.threaded_task(server.run)
+
--- a/server/cwzmq.py	Tue Apr 10 17:03:19 2012 +0200
+++ b/server/cwzmq.py	Wed Apr 04 16:51:09 2012 +0200
@@ -18,12 +18,16 @@
 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
 
 from threading import Thread
+import cPickle
+import traceback
+
 import zmq
 from zmq.eventloop import ioloop
 import zmq.eventloop.zmqstream
 
 from logging import getLogger
 from cubicweb import set_log_methods
+from cubicweb.server.server import QuitEvent
 
 ctx = zmq.Context()
 
@@ -105,5 +109,133 @@
         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
 
 
+class ZMQRepositoryServer(object):
+
+    def __init__(self, repository):
+        """make the repository available as a PyRO object"""
+        self.address = None
+        self.repo = repository
+        self.socket = None
+        self.stream = None
+        self.loop = None
+
+        # event queue
+        self.events = []
+
+    def connect(self, address):
+        self.address = address
+
+    def run(self):
+        """enter the service loop"""
+        # start repository looping tasks
+        self.socket = ctx.socket(zmq.REP)
+        self.loop = ioloop.IOLoop()
+        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
+        self.stream.bind(self.address)
+        self.info('ZMQ server bound on: %s', self.address)
+
+        self.stream.on_recv(self.process_cmds)
+
+        try:
+            self.loop.start()
+        except zmq.ZMQError:
+            self.warning('ZMQ event loop killed')
+        self.quit()
+
+    def trigger_events(self):
+        """trigger ready events"""
+        for event in self.events[:]:
+            if event.is_ready():
+                self.info('starting event %s', event)
+                event.fire(self)
+                try:
+                    event.update()
+                except Finished:
+                    self.events.remove(event)
+
+    def process_cmd(self, cmd):
+        """Delegate the given command to the repository.
+
+        ``cmd`` is a list of (method_name, args, kwargs)
+        where ``args`` is a list of positional arguments
+        and ``kwargs`` is a dictionnary of named arguments.
+
+        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
+
+        :note1: ``kwargs`` may be ommited
+
+            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
+
+        :note2: both ``args`` and ``kwargs`` may be omitted
+
+            >>> schema = delegate_to_repo(["get_schema"])
+            >>> schema = delegate_to_repo("get_schema") # also allowed
+
+        """
+        cmd = cPickle.loads(cmd)
+        if not cmd:
+            raise AttributeError('function name required')
+        if isinstance(cmd, basestring):
+            cmd = [cmd]
+        if len(cmd) < 2:
+            cmd.append(())
+        if len(cmd) < 3:
+            cmd.append({})
+        cmd  = list(cmd) + [(), {}]
+        funcname, args, kwargs = cmd[:3]
+        result = getattr(self.repo, funcname)(*args, **kwargs)
+        return result
+
+    def process_cmds(self, cmds):
+        """Callback intended to be used with ``on_recv``.
+
+        Call ``delegate_to_repo`` on each command and send a pickled of
+        each result recursively.
+
+        Any exception are catched, pickled and sent.
+        """
+        try:
+            for cmd in cmds:
+                result = self.process_cmd(cmd)
+                self.send_data(result)
+        except Exception, exc:
+            traceback.print_exc()
+            self.send_data(exc)
+
+    def send_data(self, data):
+        self.socket.send_pyobj(data)
+
+    def quit(self, shutdown_repo=False):
+        """stop the server"""
+        self.info('Quitting ZMQ server')
+        try:
+            self.loop.stop()
+            self.stream.on_recv(None)
+            self.stream.close()
+        except Exception, e:
+            print e
+            pass
+        if shutdown_repo and not self.repo.shutting_down:
+            event = QuitEvent()
+            event.fire(self)
+
+    # server utilitities ######################################################
+
+    def install_sig_handlers(self):
+        """install signal handlers"""
+        import signal
+        self.info('installing signal handlers')
+        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
+        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
+
+
+    # these are overridden by set_log_methods below
+    # only defining here to prevent pylint from complaining
+    @classmethod
+    def info(cls, msg, *a, **kw):
+        pass
+
+
 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
+set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))
--- a/server/serverconfig.py	Tue Apr 10 17:03:19 2012 +0200
+++ b/server/serverconfig.py	Wed Apr 04 16:51:09 2012 +0200
@@ -204,7 +204,13 @@
 and if not set, it will be choosen randomly',
           'group': 'pyro', 'level': 3,
           }),
-
+        # zmq services config
+        ('zmq-repository-address',
+         {'type' : 'string',
+          'default': None,
+          'help': 'ZMQ URI on which the repository will be bound to.',
+          'group': 'zmq', 'level': 3,
+          }),
          ('zmq-address-sub',
           {'type' : 'csv',
            'default' : None,
--- a/server/serverctl.py	Tue Apr 10 17:03:19 2012 +0200
+++ b/server/serverctl.py	Wed Apr 04 16:51:09 2012 +0200
@@ -35,6 +35,7 @@
 from cubicweb.toolsutils import Command, CommandHandler, underline_title
 from cubicweb.cwctl import CWCTL, check_options_consistency
 from cubicweb.server import SOURCE_TYPES
+from cubicweb.server.repository import Repository
 from cubicweb.server.serverconfig import (
     USER_OPTIONS, ServerConfiguration, SourceConfiguration,
     ask_source_config, generate_source_config)
@@ -633,7 +634,7 @@
 class StartRepositoryCommand(Command):
     """Start a CubicWeb RQL server for a given instance.
 
-    The server will be accessible through pyro
+    The server will be remotely accessible through pyro or ZMQ
 
     <instance>
       the identifier of the instance to initialize.
@@ -650,12 +651,30 @@
           'default': None, 'choices': ('debug', 'info', 'warning', 'error'),
           'help': 'debug if -D is set, error otherwise',
           }),
+        ('address',
+         {'short': 'a', 'type': 'string', 'metavar': '<protocol>://<host>:<port>',
+          'default': '',
+          'help': ('specify a ZMQ URI on which to bind, or use "pyro://"'
+                   'to create a pyro-based repository'),
+          }),
         )
 
+    def create_repo(self, config):
+        address = self['address']
+        if not address:
+            address = config.get('zmq-repository-address', 'pyro://')
+        if address.startswith('pyro://'):
+            from cubicweb.server.server import RepositoryServer
+            return RepositoryServer(config), config['host']
+        else:
+            from cubicweb.server.utils import TasksManager
+            from cubicweb.server.cwzmq import ZMQRepositoryServer
+            repo = Repository(config, TasksManager())
+            return ZMQRepositoryServer(repo), address
+
     def run(self, args):
         from logilab.common.daemon import daemonize, setugid
         from cubicweb.cwctl import init_cmdline_log_threshold
-        from cubicweb.server.server import RepositoryServer
         appid = args[0]
         debug = self['debug']
         if sys.platform == 'win32' and not debug:
@@ -665,7 +684,7 @@
         config = ServerConfiguration.config_for(appid, debugmode=debug)
         init_cmdline_log_threshold(config, self['loglevel'])
         # create the server
-        server = RepositoryServer(config)
+        server, address = self.create_repo(config)
         # ensure the directory where the pid-file should be set exists (for
         # instance /var/run/cubicweb may be deleted on computer restart)
         pidfile = config['pid-file']
@@ -679,7 +698,7 @@
         if uid is not None:
             setugid(uid)
         server.install_sig_handlers()
-        server.connect(config['host'], 0)
+        server.connect(address)
         server.run()