1 # copyright 2003-2012 LOGILAB S.A. (Paris, FRANCE), all rights reserved. |
|
2 # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr |
|
3 # |
|
4 # This file is part of CubicWeb. |
|
5 # |
|
6 # CubicWeb is free software: you can redistribute it and/or modify it under the |
|
7 # terms of the GNU Lesser General Public License as published by the Free |
|
8 # Software Foundation, either version 2.1 of the License, or (at your option) |
|
9 # any later version. |
|
10 # |
|
11 # CubicWeb is distributed in the hope that it will be useful, but WITHOUT |
|
12 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
13 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more |
|
14 # details. |
|
15 # |
|
16 # You should have received a copy of the GNU Lesser General Public License along |
|
17 # with CubicWeb. If not, see <http://www.gnu.org/licenses/>. |
|
18 """twisted server for CubicWeb web instances""" |
|
19 __docformat__ = "restructuredtext en" |
|
20 |
|
21 import sys |
|
22 import select |
|
23 import traceback |
|
24 import threading |
|
25 from cgi import FieldStorage, parse_header |
|
26 |
|
27 from six.moves.urllib.parse import urlsplit, urlunsplit |
|
28 |
|
29 from cubicweb.statsd_logger import statsd_timeit |
|
30 |
|
31 from twisted.internet import reactor, task, threads |
|
32 from twisted.web import http, server |
|
33 from twisted.web import resource |
|
34 from twisted.web.server import NOT_DONE_YET |
|
35 |
|
36 |
|
37 from logilab.mtconverter import xml_escape |
|
38 from logilab.common.decorators import monkeypatch |
|
39 |
|
40 from cubicweb import ConfigurationError, CW_EVENT_MANAGER |
|
41 from cubicweb.utils import json_dumps |
|
42 from cubicweb.web import DirectResponse |
|
43 from cubicweb.web.application import CubicWebPublisher |
|
44 from cubicweb.etwist.request import CubicWebTwistedRequestAdapter |
|
45 from cubicweb.etwist.http import HTTPResponse |
|
46 |
|
47 def start_task(interval, func): |
|
48 lc = task.LoopingCall(func) |
|
49 # wait until interval has expired to actually start the task, else we have |
|
50 # to wait all tasks to be finished for the server to be actually started |
|
51 lc.start(interval, now=False) |
|
52 |
|
53 |
|
54 class CubicWebRootResource(resource.Resource): |
|
55 def __init__(self, config, repo): |
|
56 resource.Resource.__init__(self) |
|
57 self.config = config |
|
58 # instantiate publisher here and not in init_publisher to get some |
|
59 # checks done before daemonization (eg versions consistency) |
|
60 self.appli = CubicWebPublisher(repo, config) |
|
61 self.base_url = config['base-url'] |
|
62 self.https_url = config['https-url'] |
|
63 global MAX_POST_LENGTH |
|
64 MAX_POST_LENGTH = config['max-post-length'] |
|
65 |
|
66 def init_publisher(self): |
|
67 config = self.config |
|
68 # when we have an in-memory repository, clean unused sessions every XX |
|
69 # seconds and properly shutdown the server |
|
70 if config['repository-uri'] == 'inmemory://': |
|
71 if config.mode != 'test': |
|
72 reactor.addSystemEventTrigger('before', 'shutdown', |
|
73 self.shutdown_event) |
|
74 self.appli.repo.start_looping_tasks() |
|
75 self.set_url_rewriter() |
|
76 CW_EVENT_MANAGER.bind('after-registry-reload', self.set_url_rewriter) |
|
77 |
|
78 def start_service(self): |
|
79 start_task(self.appli.session_handler.clean_sessions_interval, |
|
80 self.appli.session_handler.clean_sessions) |
|
81 |
|
82 def set_url_rewriter(self): |
|
83 self.url_rewriter = self.appli.vreg['components'].select_or_none('urlrewriter') |
|
84 |
|
85 def shutdown_event(self): |
|
86 """callback fired when the server is shutting down to properly |
|
87 clean opened sessions |
|
88 """ |
|
89 self.appli.repo.shutdown() |
|
90 |
|
91 def getChild(self, path, request): |
|
92 """Indicate which resource to use to process down the URL's path""" |
|
93 return self |
|
94 |
|
95 def render(self, request): |
|
96 """Render a page from the root resource""" |
|
97 # reload modified files in debug mode |
|
98 if self.config.debugmode: |
|
99 self.config.uiprops.reload_if_needed() |
|
100 if self.https_url: |
|
101 self.config.https_uiprops.reload_if_needed() |
|
102 self.appli.vreg.reload_if_needed() |
|
103 if self.config['profile']: # default profiler don't trace threads |
|
104 return self.render_request(request) |
|
105 else: |
|
106 deferred = threads.deferToThread(self.render_request, request) |
|
107 return NOT_DONE_YET |
|
108 |
|
109 @statsd_timeit |
|
110 def render_request(self, request): |
|
111 try: |
|
112 # processing HUGE files (hundred of megabytes) in http.processReceived |
|
113 # blocks other HTTP requests processing |
|
114 # due to the clumsy & slow parsing algorithm of cgi.FieldStorage |
|
115 # so we deferred that part to the cubicweb thread |
|
116 request.process_multipart() |
|
117 return self._render_request(request) |
|
118 except Exception: |
|
119 trace = traceback.format_exc() |
|
120 return HTTPResponse(stream='<pre>%s</pre>' % xml_escape(trace), |
|
121 code=500, twisted_request=request) |
|
122 |
|
123 def _render_request(self, request): |
|
124 origpath = request.path |
|
125 host = request.host |
|
126 # dual http/https access handling: expect a rewrite rule to prepend |
|
127 # 'https' to the path to detect https access |
|
128 https = False |
|
129 if origpath.split('/', 2)[1] == 'https': |
|
130 origpath = origpath[6:] |
|
131 request.uri = request.uri[6:] |
|
132 https = True |
|
133 if self.url_rewriter is not None: |
|
134 # XXX should occur before authentication? |
|
135 path = self.url_rewriter.rewrite(host, origpath, request) |
|
136 request.uri.replace(origpath, path, 1) |
|
137 else: |
|
138 path = origpath |
|
139 req = CubicWebTwistedRequestAdapter(request, self.appli.vreg, https) |
|
140 try: |
|
141 ### Try to generate the actual request content |
|
142 content = self.appli.handle_request(req, path) |
|
143 except DirectResponse as ex: |
|
144 return ex.response |
|
145 # at last: create twisted object |
|
146 return HTTPResponse(code = req.status_out, |
|
147 headers = req.headers_out, |
|
148 stream = content, |
|
149 twisted_request=req._twreq) |
|
150 |
|
151 # these are overridden by set_log_methods below |
|
152 # only defining here to prevent pylint from complaining |
|
153 @classmethod |
|
154 def debug(cls, msg, *a, **kw): |
|
155 pass |
|
156 info = warning = error = critical = exception = debug |
|
157 |
|
158 |
|
159 JSON_PATHS = set(('json',)) |
|
160 FRAME_POST_PATHS = set(('validateform',)) |
|
161 |
|
162 orig_gotLength = http.Request.gotLength |
|
163 @monkeypatch(http.Request) |
|
164 def gotLength(self, length): |
|
165 orig_gotLength(self, length) |
|
166 if length > MAX_POST_LENGTH: # length is 0 on GET |
|
167 path = self.channel._path.split('?', 1)[0].rstrip('/').rsplit('/', 1)[-1] |
|
168 self.clientproto = 'HTTP/1.1' # not yet initialized |
|
169 self.channel.persistent = 0 # force connection close on cleanup |
|
170 self.setResponseCode(http.REQUEST_ENTITY_TOO_LARGE) |
|
171 if path in JSON_PATHS: # XXX better json path detection |
|
172 self.setHeader('content-type',"application/json") |
|
173 body = json_dumps({'reason': 'request max size exceeded'}) |
|
174 elif path in FRAME_POST_PATHS: # XXX better frame post path detection |
|
175 self.setHeader('content-type',"text/html") |
|
176 body = ('<script type="text/javascript">' |
|
177 'window.parent.handleFormValidationResponse(null, null, null, %s, null);' |
|
178 '</script>' % json_dumps( (False, 'request max size exceeded', None) )) |
|
179 else: |
|
180 self.setHeader('content-type',"text/html") |
|
181 body = ("<html><head><title>Processing Failed</title></head><body>" |
|
182 "<b>request max size exceeded</b></body></html>") |
|
183 self.setHeader('content-length', str(len(body))) |
|
184 self.write(body) |
|
185 # see request.finish(). Done here since we get error due to not full |
|
186 # initialized request |
|
187 self.finished = 1 |
|
188 if not self.queued: |
|
189 self._cleanup() |
|
190 for d in self.notifications: |
|
191 d.callback(None) |
|
192 self.notifications = [] |
|
193 |
|
194 @monkeypatch(http.Request) |
|
195 def requestReceived(self, command, path, version): |
|
196 """Called by channel when all data has been received. |
|
197 |
|
198 This method is not intended for users. |
|
199 """ |
|
200 self.content.seek(0, 0) |
|
201 self.args = {} |
|
202 self.files = {} |
|
203 self.stack = [] |
|
204 self.method, self.uri = command, path |
|
205 self.clientproto = version |
|
206 x = self.uri.split('?', 1) |
|
207 if len(x) == 1: |
|
208 self.path = self.uri |
|
209 else: |
|
210 self.path, argstring = x |
|
211 self.args = http.parse_qs(argstring, 1) |
|
212 # cache the client and server information, we'll need this later to be |
|
213 # serialized and sent with the request so CGIs will work remotely |
|
214 self.client = self.channel.transport.getPeer() |
|
215 self.host = self.channel.transport.getHost() |
|
216 # Argument processing |
|
217 ctype = self.getHeader('content-type') |
|
218 self._do_process_multipart = False |
|
219 if self.method == "POST" and ctype: |
|
220 key, pdict = parse_header(ctype) |
|
221 if key == 'application/x-www-form-urlencoded': |
|
222 self.args.update(http.parse_qs(self.content.read(), 1)) |
|
223 self.content.seek(0) |
|
224 elif key == 'multipart/form-data': |
|
225 # defer this as it can be extremely time consumming |
|
226 # with big files |
|
227 self._do_process_multipart = True |
|
228 self.process() |
|
229 |
|
230 @monkeypatch(http.Request) |
|
231 def process_multipart(self): |
|
232 if not self._do_process_multipart: |
|
233 return |
|
234 form = FieldStorage(self.content, self.received_headers, |
|
235 environ={'REQUEST_METHOD': 'POST'}, |
|
236 keep_blank_values=1, |
|
237 strict_parsing=1) |
|
238 for key in form: |
|
239 values = form[key] |
|
240 if not isinstance(values, list): |
|
241 values = [values] |
|
242 for value in values: |
|
243 if value.filename: |
|
244 if value.done != -1: # -1 is transfer has been interrupted |
|
245 self.files.setdefault(key, []).append((value.filename, value.file)) |
|
246 else: |
|
247 self.files.setdefault(key, []).append((None, None)) |
|
248 else: |
|
249 self.args.setdefault(key, []).append(value.value) |
|
250 |
|
251 from logging import getLogger |
|
252 from cubicweb import set_log_methods |
|
253 LOGGER = getLogger('cubicweb.twisted') |
|
254 set_log_methods(CubicWebRootResource, LOGGER) |
|
255 |
|
256 def run(config, debug=None, repo=None): |
|
257 # repo may by passed during test. |
|
258 # |
|
259 # Test has already created a repo object so we should not create a new one. |
|
260 # Explicitly passing the repo object avoid relying on the fragile |
|
261 # config.repository() cache. We could imagine making repo a mandatory |
|
262 # argument and receives it from the starting command directly. |
|
263 if debug is not None: |
|
264 config.debugmode = debug |
|
265 config.check_writeable_uid_directory(config.appdatahome) |
|
266 # create the site |
|
267 if repo is None: |
|
268 repo = config.repository() |
|
269 root_resource = CubicWebRootResource(config, repo) |
|
270 website = server.Site(root_resource) |
|
271 # serve it via standard HTTP on port set in the configuration |
|
272 port = config['port'] or 8080 |
|
273 interface = config['interface'] |
|
274 reactor.suggestThreadPoolSize(config['webserver-threadpool-size']) |
|
275 reactor.listenTCP(port, website, interface=interface) |
|
276 if not config.debugmode: |
|
277 if sys.platform == 'win32': |
|
278 raise ConfigurationError("Under windows, you must use the service management " |
|
279 "commands (e.g : 'net start my_instance)'") |
|
280 from logilab.common.daemon import daemonize |
|
281 LOGGER.info('instance started in the background on %s', root_resource.base_url) |
|
282 whichproc = daemonize(config['pid-file'], umask=config['umask']) |
|
283 if whichproc: # 1 = orig process, 2 = first fork, None = second fork (eg daemon process) |
|
284 return whichproc # parent process |
|
285 root_resource.init_publisher() # before changing uid |
|
286 if config['uid'] is not None: |
|
287 from logilab.common.daemon import setugid |
|
288 setugid(config['uid']) |
|
289 root_resource.start_service() |
|
290 LOGGER.info('instance started on %s', root_resource.base_url) |
|
291 # avoid annoying warnign if not in Main Thread |
|
292 signals = threading.currentThread().getName() == 'MainThread' |
|
293 if config['profile']: |
|
294 import cProfile |
|
295 cProfile.runctx('reactor.run(installSignalHandlers=%s)' % signals, |
|
296 globals(), locals(), config['profile']) |
|
297 else: |
|
298 reactor.run(installSignalHandlers=signals) |
|