1 # copyright 2003-2016 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """twisted server for CubicWeb web instances""" |
|
19 |
|
20 import sys |
|
21 import traceback |
|
22 import threading |
|
23 from cgi import FieldStorage, parse_header |
|
24 from functools import partial |
|
25 import warnings |
|
26 |
|
27 from cubicweb.statsd_logger import statsd_timeit |
|
28 |
|
29 from twisted.internet import reactor, task, threads |
|
30 from twisted.web import http, server |
|
31 from twisted.web import resource |
|
32 from twisted.web.server import NOT_DONE_YET |
|
33 |
|
34 |
|
35 from logilab.mtconverter import xml_escape |
|
36 from logilab.common.decorators import monkeypatch |
|
37 |
|
38 from cubicweb import ConfigurationError, CW_EVENT_MANAGER |
|
39 from cubicweb.utils import json_dumps |
|
40 from cubicweb.web import DirectResponse |
|
41 from cubicweb.web.application import CubicWebPublisher |
|
42 from cubicweb.etwist.request import CubicWebTwistedRequestAdapter |
|
43 from cubicweb.etwist.http import HTTPResponse |
|
44 |
|
45 |
|
46 def start_task(interval, func): |
|
47 lc = task.LoopingCall(func) |
|
48 # wait until interval has expired to actually start the task, else we have |
|
49 # to wait all tasks to be finished for the server to be actually started |
|
50 lc.start(interval, now=False) |
|
51 |
|
52 |
|
53 class CubicWebRootResource(resource.Resource): |
|
54 def __init__(self, config, repo): |
|
55 resource.Resource.__init__(self) |
|
56 self.config = config |
|
57 # instantiate publisher here and not in init_publisher to get some |
|
58 # checks done before daemonization (eg versions consistency) |
|
59 self.appli = CubicWebPublisher(repo, config) |
|
60 self.base_url = config['base-url'] |
|
61 global MAX_POST_LENGTH |
|
62 MAX_POST_LENGTH = config['max-post-length'] |
|
63 |
|
64 def init_publisher(self): |
|
65 config = self.config |
|
66 # when we have an in-memory repository, clean unused sessions every XX |
|
67 # seconds and properly shutdown the server |
|
68 if config['repository-uri'] == 'inmemory://': |
|
69 if config.mode != 'test': |
|
70 reactor.addSystemEventTrigger('before', 'shutdown', |
|
71 self.shutdown_event) |
|
72 warnings.warn( |
|
73 'twisted server does not start repository looping tasks anymore; ' |
|
74 'use the standalone "scheduler" command if needed' |
|
75 ) |
|
76 self.set_url_rewriter() |
|
77 CW_EVENT_MANAGER.bind('after-registry-reload', self.set_url_rewriter) |
|
78 |
|
79 def start_service(self): |
|
80 start_task(self.appli.session_handler.clean_sessions_interval, |
|
81 self.appli.session_handler.clean_sessions) |
|
82 |
|
83 def set_url_rewriter(self): |
|
84 self.url_rewriter = self.appli.vreg['components'].select_or_none('urlrewriter') |
|
85 |
|
86 def shutdown_event(self): |
|
87 """callback fired when the server is shutting down to properly |
|
88 clean opened sessions |
|
89 """ |
|
90 self.appli.repo.shutdown() |
|
91 |
|
92 def getChild(self, path, request): |
|
93 """Indicate which resource to use to process down the URL's path""" |
|
94 return self |
|
95 |
|
96 def on_request_finished_ko(self, request, reason): |
|
97 # annotate the twisted request so that we're able later to check for |
|
98 # failure without having to dig into request's internal attributes such |
|
99 # as _disconnected |
|
100 request.cw_failed = True |
|
101 self.warning('request finished abnormally: %s', reason) |
|
102 |
|
103 def render(self, request): |
|
104 """Render a page from the root resource""" |
|
105 finish_deferred = request.notifyFinish() |
|
106 finish_deferred.addErrback(partial(self.on_request_finished_ko, request)) |
|
107 # reload modified files in debug mode |
|
108 if self.config.debugmode: |
|
109 self.config.uiprops.reload_if_needed() |
|
110 self.appli.vreg.reload_if_needed() |
|
111 if self.config['profile']: # default profiler don't trace threads |
|
112 return self.render_request(request) |
|
113 else: |
|
114 deferred = threads.deferToThread(self.render_request, request) |
|
115 return NOT_DONE_YET |
|
116 |
|
117 @statsd_timeit |
|
118 def render_request(self, request): |
|
119 try: |
|
120 # processing HUGE files (hundred of megabytes) in http.processReceived |
|
121 # blocks other HTTP requests processing |
|
122 # due to the clumsy & slow parsing algorithm of cgi.FieldStorage |
|
123 # so we deferred that part to the cubicweb thread |
|
124 request.process_multipart() |
|
125 return self._render_request(request) |
|
126 except Exception: |
|
127 trace = traceback.format_exc() |
|
128 return HTTPResponse(stream='<pre>%s</pre>' % xml_escape(trace), |
|
129 code=500, twisted_request=request) |
|
130 |
|
131 def _render_request(self, request): |
|
132 origpath = request.path |
|
133 host = request.host |
|
134 if self.url_rewriter is not None: |
|
135 # XXX should occur before authentication? |
|
136 path = self.url_rewriter.rewrite(host, origpath, request) |
|
137 request.uri.replace(origpath, path, 1) |
|
138 req = CubicWebTwistedRequestAdapter(request, self.appli.vreg) |
|
139 try: |
|
140 ### Try to generate the actual request content |
|
141 content = self.appli.handle_request(req) |
|
142 except DirectResponse as ex: |
|
143 return ex.response |
|
144 # at last: create twisted object |
|
145 return HTTPResponse(code = req.status_out, |
|
146 headers = req.headers_out, |
|
147 stream = content, |
|
148 twisted_request=req._twreq) |
|
149 |
|
150 # these are overridden by set_log_methods below |
|
151 # only defining here to prevent pylint from complaining |
|
152 @classmethod |
|
153 def debug(cls, msg, *a, **kw): |
|
154 pass |
|
155 info = warning = error = critical = exception = debug |
|
156 |
|
157 |
|
158 JSON_PATHS = set(('json',)) |
|
159 FRAME_POST_PATHS = set(('validateform',)) |
|
160 |
|
161 orig_gotLength = http.Request.gotLength |
|
162 @monkeypatch(http.Request) |
|
163 def gotLength(self, length): |
|
164 orig_gotLength(self, length) |
|
165 if length > MAX_POST_LENGTH: # length is 0 on GET |
|
166 path = self.channel._path.split('?', 1)[0].rstrip('/').rsplit('/', 1)[-1] |
|
167 self.clientproto = 'HTTP/1.1' # not yet initialized |
|
168 self.channel.persistent = 0 # force connection close on cleanup |
|
169 self.setResponseCode(http.REQUEST_ENTITY_TOO_LARGE) |
|
170 if path in JSON_PATHS: # XXX better json path detection |
|
171 self.setHeader('content-type',"application/json") |
|
172 body = json_dumps({'reason': 'request max size exceeded'}) |
|
173 elif path in FRAME_POST_PATHS: # XXX better frame post path detection |
|
174 self.setHeader('content-type',"text/html") |
|
175 body = ('<script type="text/javascript">' |
|
176 'window.parent.handleFormValidationResponse(null, null, null, %s, null);' |
|
177 '</script>' % json_dumps( (False, 'request max size exceeded', None) )) |
|
178 else: |
|
179 self.setHeader('content-type',"text/html") |
|
180 body = ("<html><head><title>Processing Failed</title></head><body>" |
|
181 "<b>request max size exceeded</b></body></html>") |
|
182 self.setHeader('content-length', str(len(body))) |
|
183 self.write(body) |
|
184 # see request.finish(). Done here since we get error due to not full |
|
185 # initialized request |
|
186 self.finished = 1 |
|
187 if not self.queued: |
|
188 self._cleanup() |
|
189 for d in self.notifications: |
|
190 d.callback(None) |
|
191 self.notifications = [] |
|
192 |
|
193 @monkeypatch(http.Request) |
|
194 def requestReceived(self, command, path, version): |
|
195 """Called by channel when all data has been received. |
|
196 |
|
197 This method is not intended for users. |
|
198 """ |
|
199 self.content.seek(0, 0) |
|
200 self.args = {} |
|
201 self.files = {} |
|
202 self.stack = [] |
|
203 self.method, self.uri = command, path |
|
204 self.clientproto = version |
|
205 x = self.uri.split('?', 1) |
|
206 if len(x) == 1: |
|
207 self.path = self.uri |
|
208 else: |
|
209 self.path, argstring = x |
|
210 self.args = http.parse_qs(argstring, 1) |
|
211 # cache the client and server information, we'll need this later to be |
|
212 # serialized and sent with the request so CGIs will work remotely |
|
213 self.client = self.channel.transport.getPeer() |
|
214 self.host = self.channel.transport.getHost() |
|
215 # Argument processing |
|
216 ctype = self.getHeader('content-type') |
|
217 self._do_process_multipart = False |
|
218 if self.method == "POST" and ctype: |
|
219 key, pdict = parse_header(ctype) |
|
220 if key == 'application/x-www-form-urlencoded': |
|
221 self.args.update(http.parse_qs(self.content.read(), 1)) |
|
222 self.content.seek(0) |
|
223 elif key == 'multipart/form-data': |
|
224 # defer this as it can be extremely time consumming |
|
225 # with big files |
|
226 self._do_process_multipart = True |
|
227 self.process() |
|
228 |
|
229 @monkeypatch(http.Request) |
|
230 def process_multipart(self): |
|
231 if not self._do_process_multipart: |
|
232 return |
|
233 form = FieldStorage(self.content, self.received_headers, |
|
234 environ={'REQUEST_METHOD': 'POST'}, |
|
235 keep_blank_values=1, |
|
236 strict_parsing=1) |
|
237 for key in form: |
|
238 values = form[key] |
|
239 if not isinstance(values, list): |
|
240 values = [values] |
|
241 for value in values: |
|
242 if value.filename: |
|
243 if value.done != -1: # -1 is transfer has been interrupted |
|
244 self.files.setdefault(key, []).append((value.filename, value.file)) |
|
245 else: |
|
246 self.files.setdefault(key, []).append((None, None)) |
|
247 else: |
|
248 self.args.setdefault(key, []).append(value.value) |
|
249 |
|
250 from logging import getLogger |
|
251 from cubicweb import set_log_methods |
|
252 LOGGER = getLogger('cubicweb.twisted') |
|
253 set_log_methods(CubicWebRootResource, LOGGER) |
|
254 |
|
255 def run(config, debug=None, repo=None): |
|
256 # repo may by passed during test. |
|
257 # |
|
258 # Test has already created a repo object so we should not create a new one. |
|
259 # Explicitly passing the repo object avoid relying on the fragile |
|
260 # config.repository() cache. We could imagine making repo a mandatory |
|
261 # argument and receives it from the starting command directly. |
|
262 if debug is not None: |
|
263 config.debugmode = debug |
|
264 config.check_writeable_uid_directory(config.appdatahome) |
|
265 # create the site |
|
266 if repo is None: |
|
267 repo = config.repository() |
|
268 root_resource = CubicWebRootResource(config, repo) |
|
269 website = server.Site(root_resource) |
|
270 # serve it via standard HTTP on port set in the configuration |
|
271 port = config['port'] or 8080 |
|
272 interface = config['interface'] |
|
273 reactor.suggestThreadPoolSize(config['webserver-threadpool-size']) |
|
274 reactor.listenTCP(port, website, interface=interface) |
|
275 if not config.debugmode: |
|
276 if sys.platform == 'win32': |
|
277 raise ConfigurationError("Under windows, you must use the service management " |
|
278 "commands (e.g : 'net start my_instance)'") |
|
279 from logilab.common.daemon import daemonize |
|
280 LOGGER.info('instance started in the background on %s', root_resource.base_url) |
|
281 whichproc = daemonize(config['pid-file'], umask=config['umask']) |
|
282 if whichproc: # 1 = orig process, 2 = first fork, None = second fork (eg daemon process) |
|
283 return whichproc # parent process |
|
284 root_resource.init_publisher() # before changing uid |
|
285 if config['uid'] is not None: |
|
286 from logilab.common.daemon import setugid |
|
287 setugid(config['uid']) |
|
288 root_resource.start_service() |
|
289 LOGGER.info('instance started on %s', root_resource.base_url) |
|
290 # avoid annoying warnign if not in Main Thread |
|
291 signals = threading.currentThread().getName() == 'MainThread' |
|
292 if config['profile']: |
|
293 import cProfile |
|
294 cProfile.runctx('reactor.run(installSignalHandlers=%s)' % signals, |
|
295 globals(), locals(), config['profile']) |
|
296 else: |
|
297 reactor.run(installSignalHandlers=signals) |
|