equal
deleted
inserted
replaced
28 from logging import getLogger |
28 from logging import getLogger |
29 from cubicweb import set_log_methods |
29 from cubicweb import set_log_methods |
30 from cubicweb.server.server import QuitEvent |
30 from cubicweb.server.server import QuitEvent |
31 |
31 |
32 ctx = zmq.Context() |
32 ctx = zmq.Context() |
|
33 |
|
34 def cwproto_to_zmqaddr(address): |
|
35 """ converts a cw-zmq address (like zmqpickle-tcp://<ip>:<port>) |
|
36 into a proper zmq address (tcp://<ip>:<port>) |
|
37 """ |
|
38 assert address.startswith('zmqpickle-'), 'bad protocol string %s' % address |
|
39 return address.split('-', 1)[1] # chop the `zmqpickle-` prefix |
33 |
40 |
34 class ZMQComm(object): |
41 class ZMQComm(object): |
35 """ |
42 """ |
36 A simple ZMQ-based notification bus. |
43 A simple ZMQ-based notification bus. |
37 |
44 |
138 |
145 |
139 # event queue |
146 # event queue |
140 self.events = [] |
147 self.events = [] |
141 |
148 |
142 def connect(self, address): |
149 def connect(self, address): |
143 self.address = address |
150 self.address = cwproto_to_zmqaddr(address) |
144 |
151 |
145 def run(self): |
152 def run(self): |
146 """enter the service loop""" |
153 """enter the service loop""" |
147 # start repository looping tasks |
154 # start repository looping tasks |
148 self.socket = ctx.socket(zmq.REP) |
155 self.socket = ctx.socket(zmq.REP) |