server/cwzmq.py
changeset 8350 e1c05bf6fdeb
parent 8211 543e1579ba0d
child 8388 c6c624cea870
equal deleted inserted replaced
8349:fdb796435d7b 8350:e1c05bf6fdeb
    16 #
    16 #
    17 # You should have received a copy of the GNU Lesser General Public License along
    17 # You should have received a copy of the GNU Lesser General Public License along
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    18 # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
    19 
    19 
    20 from threading import Thread
    20 from threading import Thread
       
    21 import cPickle
       
    22 import traceback
       
    23 
    21 import zmq
    24 import zmq
    22 from zmq.eventloop import ioloop
    25 from zmq.eventloop import ioloop
    23 import zmq.eventloop.zmqstream
    26 import zmq.eventloop.zmqstream
    24 
    27 
    25 from logging import getLogger
    28 from logging import getLogger
    26 from cubicweb import set_log_methods
    29 from cubicweb import set_log_methods
       
    30 from cubicweb.server.server import QuitEvent
    27 
    31 
    28 ctx = zmq.Context()
    32 ctx = zmq.Context()
    29 
    33 
    30 class ZMQComm(object):
    34 class ZMQComm(object):
    31     def __init__(self):
    35     def __init__(self):
   103     def subscribe(self, topic, callback):
   107     def subscribe(self, topic, callback):
   104         self.dispatch_table[topic] = callback
   108         self.dispatch_table[topic] = callback
   105         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
   109         self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
   106 
   110 
   107 
   111 
       
   112 class ZMQRepositoryServer(object):
       
   113 
       
   114     def __init__(self, repository):
       
   115         """make the repository available as a PyRO object"""
       
   116         self.address = None
       
   117         self.repo = repository
       
   118         self.socket = None
       
   119         self.stream = None
       
   120         self.loop = None
       
   121 
       
   122         # event queue
       
   123         self.events = []
       
   124 
       
   125     def connect(self, address):
       
   126         self.address = address
       
   127 
       
   128     def run(self):
       
   129         """enter the service loop"""
       
   130         # start repository looping tasks
       
   131         self.socket = ctx.socket(zmq.REP)
       
   132         self.loop = ioloop.IOLoop()
       
   133         self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
       
   134         self.stream.bind(self.address)
       
   135         self.info('ZMQ server bound on: %s', self.address)
       
   136 
       
   137         self.stream.on_recv(self.process_cmds)
       
   138 
       
   139         try:
       
   140             self.loop.start()
       
   141         except zmq.ZMQError:
       
   142             self.warning('ZMQ event loop killed')
       
   143         self.quit()
       
   144 
       
   145     def trigger_events(self):
       
   146         """trigger ready events"""
       
   147         for event in self.events[:]:
       
   148             if event.is_ready():
       
   149                 self.info('starting event %s', event)
       
   150                 event.fire(self)
       
   151                 try:
       
   152                     event.update()
       
   153                 except Finished:
       
   154                     self.events.remove(event)
       
   155 
       
   156     def process_cmd(self, cmd):
       
   157         """Delegate the given command to the repository.
       
   158 
       
   159         ``cmd`` is a list of (method_name, args, kwargs)
       
   160         where ``args`` is a list of positional arguments
       
   161         and ``kwargs`` is a dictionnary of named arguments.
       
   162 
       
   163         >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
       
   164 
       
   165         :note1: ``kwargs`` may be ommited
       
   166 
       
   167             >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
       
   168 
       
   169         :note2: both ``args`` and ``kwargs`` may be omitted
       
   170 
       
   171             >>> schema = delegate_to_repo(["get_schema"])
       
   172             >>> schema = delegate_to_repo("get_schema") # also allowed
       
   173 
       
   174         """
       
   175         cmd = cPickle.loads(cmd)
       
   176         if not cmd:
       
   177             raise AttributeError('function name required')
       
   178         if isinstance(cmd, basestring):
       
   179             cmd = [cmd]
       
   180         if len(cmd) < 2:
       
   181             cmd.append(())
       
   182         if len(cmd) < 3:
       
   183             cmd.append({})
       
   184         cmd  = list(cmd) + [(), {}]
       
   185         funcname, args, kwargs = cmd[:3]
       
   186         result = getattr(self.repo, funcname)(*args, **kwargs)
       
   187         return result
       
   188 
       
   189     def process_cmds(self, cmds):
       
   190         """Callback intended to be used with ``on_recv``.
       
   191 
       
   192         Call ``delegate_to_repo`` on each command and send a pickled of
       
   193         each result recursively.
       
   194 
       
   195         Any exception are catched, pickled and sent.
       
   196         """
       
   197         try:
       
   198             for cmd in cmds:
       
   199                 result = self.process_cmd(cmd)
       
   200                 self.send_data(result)
       
   201         except Exception, exc:
       
   202             traceback.print_exc()
       
   203             self.send_data(exc)
       
   204 
       
   205     def send_data(self, data):
       
   206         self.socket.send_pyobj(data)
       
   207 
       
   208     def quit(self, shutdown_repo=False):
       
   209         """stop the server"""
       
   210         self.info('Quitting ZMQ server')
       
   211         try:
       
   212             self.loop.stop()
       
   213             self.stream.on_recv(None)
       
   214             self.stream.close()
       
   215         except Exception, e:
       
   216             print e
       
   217             pass
       
   218         if shutdown_repo and not self.repo.shutting_down:
       
   219             event = QuitEvent()
       
   220             event.fire(self)
       
   221 
       
   222     # server utilitities ######################################################
       
   223 
       
   224     def install_sig_handlers(self):
       
   225         """install signal handlers"""
       
   226         import signal
       
   227         self.info('installing signal handlers')
       
   228         signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
       
   229         signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
       
   230 
       
   231 
       
   232     # these are overridden by set_log_methods below
       
   233     # only defining here to prevent pylint from complaining
       
   234     @classmethod
       
   235     def info(cls, msg, *a, **kw):
       
   236         pass
       
   237 
       
   238 
   108 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
   239 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
   109 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
   240 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
       
   241 set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))