16 # |
16 # |
17 # You should have received a copy of the GNU Lesser General Public License along |
17 # You should have received a copy of the GNU Lesser General Public License along |
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
18 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
19 |
19 |
20 from threading import Thread |
20 from threading import Thread |
|
21 import cPickle |
|
22 import traceback |
|
23 |
21 import zmq |
24 import zmq |
22 from zmq.eventloop import ioloop |
25 from zmq.eventloop import ioloop |
23 import zmq.eventloop.zmqstream |
26 import zmq.eventloop.zmqstream |
24 |
27 |
25 from logging import getLogger |
28 from logging import getLogger |
26 from cubicweb import set_log_methods |
29 from cubicweb import set_log_methods |
|
30 from cubicweb.server.server import QuitEvent |
27 |
31 |
28 ctx = zmq.Context() |
32 ctx = zmq.Context() |
29 |
33 |
30 class ZMQComm(object): |
34 class ZMQComm(object): |
31 def __init__(self): |
35 def __init__(self): |
103 def subscribe(self, topic, callback): |
107 def subscribe(self, topic, callback): |
104 self.dispatch_table[topic] = callback |
108 self.dispatch_table[topic] = callback |
105 self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
109 self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic)) |
106 |
110 |
107 |
111 |
|
112 class ZMQRepositoryServer(object): |
|
113 |
|
114 def __init__(self, repository): |
|
115 """make the repository available as a PyRO object""" |
|
116 self.address = None |
|
117 self.repo = repository |
|
118 self.socket = None |
|
119 self.stream = None |
|
120 self.loop = None |
|
121 |
|
122 # event queue |
|
123 self.events = [] |
|
124 |
|
125 def connect(self, address): |
|
126 self.address = address |
|
127 |
|
128 def run(self): |
|
129 """enter the service loop""" |
|
130 # start repository looping tasks |
|
131 self.socket = ctx.socket(zmq.REP) |
|
132 self.loop = ioloop.IOLoop() |
|
133 self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop) |
|
134 self.stream.bind(self.address) |
|
135 self.info('ZMQ server bound on: %s', self.address) |
|
136 |
|
137 self.stream.on_recv(self.process_cmds) |
|
138 |
|
139 try: |
|
140 self.loop.start() |
|
141 except zmq.ZMQError: |
|
142 self.warning('ZMQ event loop killed') |
|
143 self.quit() |
|
144 |
|
145 def trigger_events(self): |
|
146 """trigger ready events""" |
|
147 for event in self.events[:]: |
|
148 if event.is_ready(): |
|
149 self.info('starting event %s', event) |
|
150 event.fire(self) |
|
151 try: |
|
152 event.update() |
|
153 except Finished: |
|
154 self.events.remove(event) |
|
155 |
|
156 def process_cmd(self, cmd): |
|
157 """Delegate the given command to the repository. |
|
158 |
|
159 ``cmd`` is a list of (method_name, args, kwargs) |
|
160 where ``args`` is a list of positional arguments |
|
161 and ``kwargs`` is a dictionnary of named arguments. |
|
162 |
|
163 >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}]) |
|
164 |
|
165 :note1: ``kwargs`` may be ommited |
|
166 |
|
167 >>> rset = delegate_to_repo(["execute", [sessionid, rql]]) |
|
168 |
|
169 :note2: both ``args`` and ``kwargs`` may be omitted |
|
170 |
|
171 >>> schema = delegate_to_repo(["get_schema"]) |
|
172 >>> schema = delegate_to_repo("get_schema") # also allowed |
|
173 |
|
174 """ |
|
175 cmd = cPickle.loads(cmd) |
|
176 if not cmd: |
|
177 raise AttributeError('function name required') |
|
178 if isinstance(cmd, basestring): |
|
179 cmd = [cmd] |
|
180 if len(cmd) < 2: |
|
181 cmd.append(()) |
|
182 if len(cmd) < 3: |
|
183 cmd.append({}) |
|
184 cmd = list(cmd) + [(), {}] |
|
185 funcname, args, kwargs = cmd[:3] |
|
186 result = getattr(self.repo, funcname)(*args, **kwargs) |
|
187 return result |
|
188 |
|
189 def process_cmds(self, cmds): |
|
190 """Callback intended to be used with ``on_recv``. |
|
191 |
|
192 Call ``delegate_to_repo`` on each command and send a pickled of |
|
193 each result recursively. |
|
194 |
|
195 Any exception are catched, pickled and sent. |
|
196 """ |
|
197 try: |
|
198 for cmd in cmds: |
|
199 result = self.process_cmd(cmd) |
|
200 self.send_data(result) |
|
201 except Exception, exc: |
|
202 traceback.print_exc() |
|
203 self.send_data(exc) |
|
204 |
|
205 def send_data(self, data): |
|
206 self.socket.send_pyobj(data) |
|
207 |
|
208 def quit(self, shutdown_repo=False): |
|
209 """stop the server""" |
|
210 self.info('Quitting ZMQ server') |
|
211 try: |
|
212 self.loop.stop() |
|
213 self.stream.on_recv(None) |
|
214 self.stream.close() |
|
215 except Exception, e: |
|
216 print e |
|
217 pass |
|
218 if shutdown_repo and not self.repo.shutting_down: |
|
219 event = QuitEvent() |
|
220 event.fire(self) |
|
221 |
|
222 # server utilitities ###################################################### |
|
223 |
|
224 def install_sig_handlers(self): |
|
225 """install signal handlers""" |
|
226 import signal |
|
227 self.info('installing signal handlers') |
|
228 signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True)) |
|
229 signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True)) |
|
230 |
|
231 |
|
232 # these are overridden by set_log_methods below |
|
233 # only defining here to prevent pylint from complaining |
|
234 @classmethod |
|
235 def info(cls, msg, *a, **kw): |
|
236 pass |
|
237 |
|
238 |
108 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
239 set_log_methods(Publisher, getLogger('cubicweb.zmq.pub')) |
109 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |
240 set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub')) |
|
241 set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo')) |