[zmq] Implement a ZMQ-based Repository (closes #2290125)
This Repository server behave mainly like the Pyro-based repository.
--- a/hooks/zmq.py Tue Apr 10 17:03:19 2012 +0200
+++ b/hooks/zmq.py Wed Apr 04 16:51:09 2012 +0200
@@ -46,3 +46,30 @@
self.repo.app_instances_bus.add_subscriber(address)
self.repo.app_instances_bus.start()
+
+class ZMQRepositoryServerStopHook(hook.Hook):
+ __regid__ = 'zmqrepositoryserverstop'
+ events = ('server_shutdown',)
+
+ def __call__(self):
+ server = getattr(self.repo, 'zmq_repo_server', None)
+ if server:
+ self.repo.zmq_repo_server.quit()
+
+class ZMQRepositoryServerStartHook(hook.Hook):
+ __regid__ = 'zmqrepositoryserverstart'
+ events = ('server_startup',)
+
+ def __call__(self):
+ config = self.repo.config
+ if config.name == 'repository':
+ # start-repository command already starts a zmq repo
+ return
+ address = config.get('zmq-repository-address')
+ if not address:
+ return
+ from cubicweb.server import cwzmq
+ self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo)
+ server.connect(address)
+ self.repo.threaded_task(server.run)
+
--- a/server/cwzmq.py Tue Apr 10 17:03:19 2012 +0200
+++ b/server/cwzmq.py Wed Apr 04 16:51:09 2012 +0200
@@ -18,12 +18,16 @@
# with CubicWeb. If not, see <http://www.gnu.org/licenses/>.
from threading import Thread
+import cPickle
+import traceback
+
import zmq
from zmq.eventloop import ioloop
import zmq.eventloop.zmqstream
from logging import getLogger
from cubicweb import set_log_methods
+from cubicweb.server.server import QuitEvent
ctx = zmq.Context()
@@ -105,5 +109,133 @@
self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
+class ZMQRepositoryServer(object):
+
+ def __init__(self, repository):
+ """make the repository available as a PyRO object"""
+ self.address = None
+ self.repo = repository
+ self.socket = None
+ self.stream = None
+ self.loop = None
+
+ # event queue
+ self.events = []
+
+ def connect(self, address):
+ self.address = address
+
+ def run(self):
+ """enter the service loop"""
+ # start repository looping tasks
+ self.socket = ctx.socket(zmq.REP)
+ self.loop = ioloop.IOLoop()
+ self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
+ self.stream.bind(self.address)
+ self.info('ZMQ server bound on: %s', self.address)
+
+ self.stream.on_recv(self.process_cmds)
+
+ try:
+ self.loop.start()
+ except zmq.ZMQError:
+ self.warning('ZMQ event loop killed')
+ self.quit()
+
+ def trigger_events(self):
+ """trigger ready events"""
+ for event in self.events[:]:
+ if event.is_ready():
+ self.info('starting event %s', event)
+ event.fire(self)
+ try:
+ event.update()
+ except Finished:
+ self.events.remove(event)
+
+ def process_cmd(self, cmd):
+ """Delegate the given command to the repository.
+
+ ``cmd`` is a list of (method_name, args, kwargs)
+ where ``args`` is a list of positional arguments
+ and ``kwargs`` is a dictionnary of named arguments.
+
+ >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
+
+ :note1: ``kwargs`` may be ommited
+
+ >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
+
+ :note2: both ``args`` and ``kwargs`` may be omitted
+
+ >>> schema = delegate_to_repo(["get_schema"])
+ >>> schema = delegate_to_repo("get_schema") # also allowed
+
+ """
+ cmd = cPickle.loads(cmd)
+ if not cmd:
+ raise AttributeError('function name required')
+ if isinstance(cmd, basestring):
+ cmd = [cmd]
+ if len(cmd) < 2:
+ cmd.append(())
+ if len(cmd) < 3:
+ cmd.append({})
+ cmd = list(cmd) + [(), {}]
+ funcname, args, kwargs = cmd[:3]
+ result = getattr(self.repo, funcname)(*args, **kwargs)
+ return result
+
+ def process_cmds(self, cmds):
+ """Callback intended to be used with ``on_recv``.
+
+ Call ``delegate_to_repo`` on each command and send a pickled of
+ each result recursively.
+
+ Any exception are catched, pickled and sent.
+ """
+ try:
+ for cmd in cmds:
+ result = self.process_cmd(cmd)
+ self.send_data(result)
+ except Exception, exc:
+ traceback.print_exc()
+ self.send_data(exc)
+
+ def send_data(self, data):
+ self.socket.send_pyobj(data)
+
+ def quit(self, shutdown_repo=False):
+ """stop the server"""
+ self.info('Quitting ZMQ server')
+ try:
+ self.loop.stop()
+ self.stream.on_recv(None)
+ self.stream.close()
+ except Exception, e:
+ print e
+ pass
+ if shutdown_repo and not self.repo.shutting_down:
+ event = QuitEvent()
+ event.fire(self)
+
+ # server utilitities ######################################################
+
+ def install_sig_handlers(self):
+ """install signal handlers"""
+ import signal
+ self.info('installing signal handlers')
+ signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
+ signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
+
+
+ # these are overridden by set_log_methods below
+ # only defining here to prevent pylint from complaining
+ @classmethod
+ def info(cls, msg, *a, **kw):
+ pass
+
+
set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
+set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))
--- a/server/serverconfig.py Tue Apr 10 17:03:19 2012 +0200
+++ b/server/serverconfig.py Wed Apr 04 16:51:09 2012 +0200
@@ -204,7 +204,13 @@
and if not set, it will be choosen randomly',
'group': 'pyro', 'level': 3,
}),
-
+ # zmq services config
+ ('zmq-repository-address',
+ {'type' : 'string',
+ 'default': None,
+ 'help': 'ZMQ URI on which the repository will be bound to.',
+ 'group': 'zmq', 'level': 3,
+ }),
('zmq-address-sub',
{'type' : 'csv',
'default' : None,
--- a/server/serverctl.py Tue Apr 10 17:03:19 2012 +0200
+++ b/server/serverctl.py Wed Apr 04 16:51:09 2012 +0200
@@ -35,6 +35,7 @@
from cubicweb.toolsutils import Command, CommandHandler, underline_title
from cubicweb.cwctl import CWCTL, check_options_consistency
from cubicweb.server import SOURCE_TYPES
+from cubicweb.server.repository import Repository
from cubicweb.server.serverconfig import (
USER_OPTIONS, ServerConfiguration, SourceConfiguration,
ask_source_config, generate_source_config)
@@ -633,7 +634,7 @@
class StartRepositoryCommand(Command):
"""Start a CubicWeb RQL server for a given instance.
- The server will be accessible through pyro
+ The server will be remotely accessible through pyro or ZMQ
<instance>
the identifier of the instance to initialize.
@@ -650,12 +651,30 @@
'default': None, 'choices': ('debug', 'info', 'warning', 'error'),
'help': 'debug if -D is set, error otherwise',
}),
+ ('address',
+ {'short': 'a', 'type': 'string', 'metavar': '<protocol>://<host>:<port>',
+ 'default': '',
+ 'help': ('specify a ZMQ URI on which to bind, or use "pyro://"'
+ 'to create a pyro-based repository'),
+ }),
)
+ def create_repo(self, config):
+ address = self['address']
+ if not address:
+ address = config.get('zmq-repository-address', 'pyro://')
+ if address.startswith('pyro://'):
+ from cubicweb.server.server import RepositoryServer
+ return RepositoryServer(config), config['host']
+ else:
+ from cubicweb.server.utils import TasksManager
+ from cubicweb.server.cwzmq import ZMQRepositoryServer
+ repo = Repository(config, TasksManager())
+ return ZMQRepositoryServer(repo), address
+
def run(self, args):
from logilab.common.daemon import daemonize, setugid
from cubicweb.cwctl import init_cmdline_log_threshold
- from cubicweb.server.server import RepositoryServer
appid = args[0]
debug = self['debug']
if sys.platform == 'win32' and not debug:
@@ -665,7 +684,7 @@
config = ServerConfiguration.config_for(appid, debugmode=debug)
init_cmdline_log_threshold(config, self['loglevel'])
# create the server
- server = RepositoryServer(config)
+ server, address = self.create_repo(config)
# ensure the directory where the pid-file should be set exists (for
# instance /var/run/cubicweb may be deleted on computer restart)
pidfile = config['pid-file']
@@ -679,7 +698,7 @@
if uid is not None:
setugid(uid)
server.install_sig_handlers()
- server.connect(config['host'], 0)
+ server.connect(address)
server.run()