# HG changeset patch # User David Douard # Date 1333551069 -7200 # Node ID e1c05bf6fdeb6aa0f615fd5a719697eb4d0d2047 # Parent fdb796435d7b6afb4e5b68fdfb97468cdb1edb19 [zmq] Implement a ZMQ-based Repository (closes #2290125) This Repository server behave mainly like the Pyro-based repository. diff -r fdb796435d7b -r e1c05bf6fdeb hooks/zmq.py --- a/hooks/zmq.py Tue Apr 10 17:03:19 2012 +0200 +++ b/hooks/zmq.py Wed Apr 04 16:51:09 2012 +0200 @@ -46,3 +46,30 @@ self.repo.app_instances_bus.add_subscriber(address) self.repo.app_instances_bus.start() + +class ZMQRepositoryServerStopHook(hook.Hook): + __regid__ = 'zmqrepositoryserverstop' + events = ('server_shutdown',) + + def __call__(self): + server = getattr(self.repo, 'zmq_repo_server', None) + if server: + self.repo.zmq_repo_server.quit() + +class ZMQRepositoryServerStartHook(hook.Hook): + __regid__ = 'zmqrepositoryserverstart' + events = ('server_startup',) + + def __call__(self): + config = self.repo.config + if config.name == 'repository': + # start-repository command already starts a zmq repo + return + address = config.get('zmq-repository-address') + if not address: + return + from cubicweb.server import cwzmq + self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo) + server.connect(address) + self.repo.threaded_task(server.run) + diff -r fdb796435d7b -r e1c05bf6fdeb server/cwzmq.py --- a/server/cwzmq.py Tue Apr 10 17:03:19 2012 +0200 +++ b/server/cwzmq.py Wed Apr 04 16:51:09 2012 +0200 @@ -18,12 +18,16 @@ # with CubicWeb. If not, see . from threading import Thread +import cPickle +import traceback + import zmq from zmq.eventloop import ioloop import zmq.eventloop.zmqstream from logging import getLogger from cubicweb import set_log_methods +from cubicweb.server.server import QuitEvent ctx = zmq.Context() @@ -105,5 +109,133 @@ self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) +class ZMQRepositoryServer(object): + + def __init__(self, repository): + """make the repository available as a PyRO object""" + self.address = None + self.repo = repository + self.socket = None + self.stream = None + self.loop = None + + # event queue + self.events = [] + + def connect(self, address): + self.address = address + + def run(self): + """enter the service loop""" + # start repository looping tasks + self.socket = ctx.socket(zmq.REP) + self.loop = ioloop.IOLoop() + self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop) + self.stream.bind(self.address) + self.info('ZMQ server bound on: %s', self.address) + + self.stream.on_recv(self.process_cmds) + + try: + self.loop.start() + except zmq.ZMQError: + self.warning('ZMQ event loop killed') + self.quit() + + def trigger_events(self): + """trigger ready events""" + for event in self.events[:]: + if event.is_ready(): + self.info('starting event %s', event) + event.fire(self) + try: + event.update() + except Finished: + self.events.remove(event) + + def process_cmd(self, cmd): + """Delegate the given command to the repository. + + ``cmd`` is a list of (method_name, args, kwargs) + where ``args`` is a list of positional arguments + and ``kwargs`` is a dictionnary of named arguments. + + >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}]) + + :note1: ``kwargs`` may be ommited + + >>> rset = delegate_to_repo(["execute", [sessionid, rql]]) + + :note2: both ``args`` and ``kwargs`` may be omitted + + >>> schema = delegate_to_repo(["get_schema"]) + >>> schema = delegate_to_repo("get_schema") # also allowed + + """ + cmd = cPickle.loads(cmd) + if not cmd: + raise AttributeError('function name required') + if isinstance(cmd, basestring): + cmd = [cmd] + if len(cmd) < 2: + cmd.append(()) + if len(cmd) < 3: + cmd.append({}) + cmd = list(cmd) + [(), {}] + funcname, args, kwargs = cmd[:3] + result = getattr(self.repo, funcname)(*args, **kwargs) + return result + + def process_cmds(self, cmds): + """Callback intended to be used with ``on_recv``. + + Call ``delegate_to_repo`` on each command and send a pickled of + each result recursively. + + Any exception are catched, pickled and sent. + """ + try: + for cmd in cmds: + result = self.process_cmd(cmd) + self.send_data(result) + except Exception, exc: + traceback.print_exc() + self.send_data(exc) + + def send_data(self, data): + self.socket.send_pyobj(data) + + def quit(self, shutdown_repo=False): + """stop the server""" + self.info('Quitting ZMQ server') + try: + self.loop.stop() + self.stream.on_recv(None) + self.stream.close() + except Exception, e: + print e + pass + if shutdown_repo and not self.repo.shutting_down: + event = QuitEvent() + event.fire(self) + + # server utilitities ###################################################### + + def install_sig_handlers(self): + """install signal handlers""" + import signal + self.info('installing signal handlers') + signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True)) + signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True)) + + + # these are overridden by set_log_methods below + # only defining here to prevent pylint from complaining + @classmethod + def info(cls, msg, *a, **kw): + pass + + set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) +set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo')) diff -r fdb796435d7b -r e1c05bf6fdeb server/serverconfig.py --- a/server/serverconfig.py Tue Apr 10 17:03:19 2012 +0200 +++ b/server/serverconfig.py Wed Apr 04 16:51:09 2012 +0200 @@ -204,7 +204,13 @@ and if not set, it will be choosen randomly', 'group': 'pyro', 'level': 3, }), - + # zmq services config + ('zmq-repository-address', + {'type' : 'string', + 'default': None, + 'help': 'ZMQ URI on which the repository will be bound to.', + 'group': 'zmq', 'level': 3, + }), ('zmq-address-sub', {'type' : 'csv', 'default' : None, diff -r fdb796435d7b -r e1c05bf6fdeb server/serverctl.py --- a/server/serverctl.py Tue Apr 10 17:03:19 2012 +0200 +++ b/server/serverctl.py Wed Apr 04 16:51:09 2012 +0200 @@ -35,6 +35,7 @@ from cubicweb.toolsutils import Command, CommandHandler, underline_title from cubicweb.cwctl import CWCTL, check_options_consistency from cubicweb.server import SOURCE_TYPES +from cubicweb.server.repository import Repository from cubicweb.server.serverconfig import ( USER_OPTIONS, ServerConfiguration, SourceConfiguration, ask_source_config, generate_source_config) @@ -633,7 +634,7 @@ class StartRepositoryCommand(Command): """Start a CubicWeb RQL server for a given instance. - The server will be accessible through pyro + The server will be remotely accessible through pyro or ZMQ the identifier of the instance to initialize. @@ -650,12 +651,30 @@ 'default': None, 'choices': ('debug', 'info', 'warning', 'error'), 'help': 'debug if -D is set, error otherwise', }), + ('address', + {'short': 'a', 'type': 'string', 'metavar': '://:', + 'default': '', + 'help': ('specify a ZMQ URI on which to bind, or use "pyro://"' + 'to create a pyro-based repository'), + }), ) + def create_repo(self, config): + address = self['address'] + if not address: + address = config.get('zmq-repository-address', 'pyro://') + if address.startswith('pyro://'): + from cubicweb.server.server import RepositoryServer + return RepositoryServer(config), config['host'] + else: + from cubicweb.server.utils import TasksManager + from cubicweb.server.cwzmq import ZMQRepositoryServer + repo = Repository(config, TasksManager()) + return ZMQRepositoryServer(repo), address + def run(self, args): from logilab.common.daemon import daemonize, setugid from cubicweb.cwctl import init_cmdline_log_threshold - from cubicweb.server.server import RepositoryServer appid = args[0] debug = self['debug'] if sys.platform == 'win32' and not debug: @@ -665,7 +684,7 @@ config = ServerConfiguration.config_for(appid, debugmode=debug) init_cmdline_log_threshold(config, self['loglevel']) # create the server - server = RepositoryServer(config) + server, address = self.create_repo(config) # ensure the directory where the pid-file should be set exists (for # instance /var/run/cubicweb may be deleted on computer restart) pidfile = config['pid-file'] @@ -679,7 +698,7 @@ if uid is not None: setugid(uid) server.install_sig_handlers() - server.connect(config['host'], 0) + server.connect(address) server.run()