|
1 """Unittests for the various HTTPServer modules. |
|
2 |
|
3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, |
|
4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. |
|
5 """ |
|
6 |
|
7 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
|
8 from SimpleHTTPServer import SimpleHTTPRequestHandler |
|
9 from CGIHTTPServer import CGIHTTPRequestHandler |
|
10 |
|
11 import os |
|
12 import sys |
|
13 import base64 |
|
14 import shutil |
|
15 import urllib |
|
16 import httplib |
|
17 import tempfile |
|
18 import threading |
|
19 |
|
20 import unittest |
|
21 from test import test_support |
|
22 |
|
23 |
|
24 class NoLogRequestHandler: |
|
25 def log_message(self, *args): |
|
26 # don't write log messages to stderr |
|
27 pass |
|
28 |
|
29 |
|
30 class TestServerThread(threading.Thread): |
|
31 def __init__(self, test_object, request_handler): |
|
32 threading.Thread.__init__(self) |
|
33 self.request_handler = request_handler |
|
34 self.test_object = test_object |
|
35 self.test_object.lock.acquire() |
|
36 |
|
37 def run(self): |
|
38 self.server = HTTPServer(('', 0), self.request_handler) |
|
39 self.test_object.PORT = self.server.socket.getsockname()[1] |
|
40 self.test_object.lock.release() |
|
41 try: |
|
42 self.server.serve_forever() |
|
43 finally: |
|
44 self.server.server_close() |
|
45 |
|
46 def stop(self): |
|
47 self.server.shutdown() |
|
48 |
|
49 |
|
50 class BaseTestCase(unittest.TestCase): |
|
51 def setUp(self): |
|
52 self.lock = threading.Lock() |
|
53 self.thread = TestServerThread(self, self.request_handler) |
|
54 self.thread.start() |
|
55 self.lock.acquire() |
|
56 |
|
57 def tearDown(self): |
|
58 self.lock.release() |
|
59 self.thread.stop() |
|
60 |
|
61 def request(self, uri, method='GET', body=None, headers={}): |
|
62 self.connection = httplib.HTTPConnection('localhost', self.PORT) |
|
63 self.connection.request(method, uri, body, headers) |
|
64 return self.connection.getresponse() |
|
65 |
|
66 |
|
67 class BaseHTTPServerTestCase(BaseTestCase): |
|
68 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): |
|
69 protocol_version = 'HTTP/1.1' |
|
70 default_request_version = 'HTTP/1.1' |
|
71 |
|
72 def do_TEST(self): |
|
73 self.send_response(204) |
|
74 self.send_header('Content-Type', 'text/html') |
|
75 self.send_header('Connection', 'close') |
|
76 self.end_headers() |
|
77 |
|
78 def do_KEEP(self): |
|
79 self.send_response(204) |
|
80 self.send_header('Content-Type', 'text/html') |
|
81 self.send_header('Connection', 'keep-alive') |
|
82 self.end_headers() |
|
83 |
|
84 def do_KEYERROR(self): |
|
85 self.send_error(999) |
|
86 |
|
87 def do_CUSTOM(self): |
|
88 self.send_response(999) |
|
89 self.send_header('Content-Type', 'text/html') |
|
90 self.send_header('Connection', 'close') |
|
91 self.end_headers() |
|
92 |
|
93 def setUp(self): |
|
94 BaseTestCase.setUp(self) |
|
95 self.con = httplib.HTTPConnection('localhost', self.PORT) |
|
96 self.con.connect() |
|
97 |
|
98 def test_command(self): |
|
99 self.con.request('GET', '/') |
|
100 res = self.con.getresponse() |
|
101 self.assertEquals(res.status, 501) |
|
102 |
|
103 def test_request_line_trimming(self): |
|
104 self.con._http_vsn_str = 'HTTP/1.1\n' |
|
105 self.con.putrequest('GET', '/') |
|
106 self.con.endheaders() |
|
107 res = self.con.getresponse() |
|
108 self.assertEquals(res.status, 501) |
|
109 |
|
110 def test_version_bogus(self): |
|
111 self.con._http_vsn_str = 'FUBAR' |
|
112 self.con.putrequest('GET', '/') |
|
113 self.con.endheaders() |
|
114 res = self.con.getresponse() |
|
115 self.assertEquals(res.status, 400) |
|
116 |
|
117 def test_version_digits(self): |
|
118 self.con._http_vsn_str = 'HTTP/9.9.9' |
|
119 self.con.putrequest('GET', '/') |
|
120 self.con.endheaders() |
|
121 res = self.con.getresponse() |
|
122 self.assertEquals(res.status, 400) |
|
123 |
|
124 def test_version_none_get(self): |
|
125 self.con._http_vsn_str = '' |
|
126 self.con.putrequest('GET', '/') |
|
127 self.con.endheaders() |
|
128 res = self.con.getresponse() |
|
129 self.assertEquals(res.status, 501) |
|
130 |
|
131 def test_version_none(self): |
|
132 self.con._http_vsn_str = '' |
|
133 self.con.putrequest('PUT', '/') |
|
134 self.con.endheaders() |
|
135 res = self.con.getresponse() |
|
136 self.assertEquals(res.status, 400) |
|
137 |
|
138 def test_version_invalid(self): |
|
139 self.con._http_vsn = 99 |
|
140 self.con._http_vsn_str = 'HTTP/9.9' |
|
141 self.con.putrequest('GET', '/') |
|
142 self.con.endheaders() |
|
143 res = self.con.getresponse() |
|
144 self.assertEquals(res.status, 505) |
|
145 |
|
146 def test_send_blank(self): |
|
147 self.con._http_vsn_str = '' |
|
148 self.con.putrequest('', '') |
|
149 self.con.endheaders() |
|
150 res = self.con.getresponse() |
|
151 self.assertEquals(res.status, 400) |
|
152 |
|
153 def test_header_close(self): |
|
154 self.con.putrequest('GET', '/') |
|
155 self.con.putheader('Connection', 'close') |
|
156 self.con.endheaders() |
|
157 res = self.con.getresponse() |
|
158 self.assertEquals(res.status, 501) |
|
159 |
|
160 def test_head_keep_alive(self): |
|
161 self.con._http_vsn_str = 'HTTP/1.1' |
|
162 self.con.putrequest('GET', '/') |
|
163 self.con.putheader('Connection', 'keep-alive') |
|
164 self.con.endheaders() |
|
165 res = self.con.getresponse() |
|
166 self.assertEquals(res.status, 501) |
|
167 |
|
168 def test_handler(self): |
|
169 self.con.request('TEST', '/') |
|
170 res = self.con.getresponse() |
|
171 self.assertEquals(res.status, 204) |
|
172 |
|
173 def test_return_header_keep_alive(self): |
|
174 self.con.request('KEEP', '/') |
|
175 res = self.con.getresponse() |
|
176 self.assertEquals(res.getheader('Connection'), 'keep-alive') |
|
177 self.con.request('TEST', '/') |
|
178 |
|
179 def test_internal_key_error(self): |
|
180 self.con.request('KEYERROR', '/') |
|
181 res = self.con.getresponse() |
|
182 self.assertEquals(res.status, 999) |
|
183 |
|
184 def test_return_custom_status(self): |
|
185 self.con.request('CUSTOM', '/') |
|
186 res = self.con.getresponse() |
|
187 self.assertEquals(res.status, 999) |
|
188 |
|
189 |
|
190 class SimpleHTTPServerTestCase(BaseTestCase): |
|
191 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): |
|
192 pass |
|
193 |
|
194 def setUp(self): |
|
195 BaseTestCase.setUp(self) |
|
196 self.cwd = os.getcwd() |
|
197 basetempdir = tempfile.gettempdir() |
|
198 os.chdir(basetempdir) |
|
199 self.data = 'We are the knights who say Ni!' |
|
200 self.tempdir = tempfile.mkdtemp(dir=basetempdir) |
|
201 self.tempdir_name = os.path.basename(self.tempdir) |
|
202 temp = open(os.path.join(self.tempdir, 'test'), 'wb') |
|
203 temp.write(self.data) |
|
204 temp.close() |
|
205 |
|
206 def tearDown(self): |
|
207 try: |
|
208 os.chdir(self.cwd) |
|
209 try: |
|
210 shutil.rmtree(self.tempdir) |
|
211 except: |
|
212 pass |
|
213 finally: |
|
214 BaseTestCase.tearDown(self) |
|
215 |
|
216 def check_status_and_reason(self, response, status, data=None): |
|
217 body = response.read() |
|
218 self.assert_(response) |
|
219 self.assertEquals(response.status, status) |
|
220 self.assert_(response.reason != None) |
|
221 if data: |
|
222 self.assertEqual(data, body) |
|
223 |
|
224 def test_get(self): |
|
225 #constructs the path relative to the root directory of the HTTPServer |
|
226 response = self.request(self.tempdir_name + '/test') |
|
227 self.check_status_and_reason(response, 200, data=self.data) |
|
228 response = self.request(self.tempdir_name + '/') |
|
229 self.check_status_and_reason(response, 200) |
|
230 response = self.request(self.tempdir_name) |
|
231 self.check_status_and_reason(response, 301) |
|
232 response = self.request('/ThisDoesNotExist') |
|
233 self.check_status_and_reason(response, 404) |
|
234 response = self.request('/' + 'ThisDoesNotExist' + '/') |
|
235 self.check_status_and_reason(response, 404) |
|
236 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w') |
|
237 response = self.request('/' + self.tempdir_name + '/') |
|
238 self.check_status_and_reason(response, 200) |
|
239 if os.name == 'posix': |
|
240 # chmod won't work as expected on Windows platforms |
|
241 os.chmod(self.tempdir, 0) |
|
242 response = self.request(self.tempdir_name + '/') |
|
243 self.check_status_and_reason(response, 404) |
|
244 os.chmod(self.tempdir, 0755) |
|
245 |
|
246 def test_head(self): |
|
247 response = self.request( |
|
248 self.tempdir_name + '/test', method='HEAD') |
|
249 self.check_status_and_reason(response, 200) |
|
250 self.assertEqual(response.getheader('content-length'), |
|
251 str(len(self.data))) |
|
252 self.assertEqual(response.getheader('content-type'), |
|
253 'application/octet-stream') |
|
254 |
|
255 def test_invalid_requests(self): |
|
256 response = self.request('/', method='FOO') |
|
257 self.check_status_and_reason(response, 501) |
|
258 # requests must be case sensitive,so this should fail too |
|
259 response = self.request('/', method='get') |
|
260 self.check_status_and_reason(response, 501) |
|
261 response = self.request('/', method='GETs') |
|
262 self.check_status_and_reason(response, 501) |
|
263 |
|
264 |
|
265 cgi_file1 = """\ |
|
266 #!%s |
|
267 |
|
268 print "Content-type: text/html" |
|
269 print |
|
270 print "Hello World" |
|
271 """ |
|
272 |
|
273 cgi_file2 = """\ |
|
274 #!%s |
|
275 import cgi |
|
276 |
|
277 print "Content-type: text/html" |
|
278 print |
|
279 |
|
280 form = cgi.FieldStorage() |
|
281 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\ |
|
282 form.getfirst("bacon")) |
|
283 """ |
|
284 |
|
285 class CGIHTTPServerTestCase(BaseTestCase): |
|
286 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): |
|
287 pass |
|
288 |
|
289 def setUp(self): |
|
290 BaseTestCase.setUp(self) |
|
291 self.parent_dir = tempfile.mkdtemp() |
|
292 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') |
|
293 os.mkdir(self.cgi_dir) |
|
294 |
|
295 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') |
|
296 with open(self.file1_path, 'w') as file1: |
|
297 file1.write(cgi_file1 % sys.executable) |
|
298 os.chmod(self.file1_path, 0777) |
|
299 |
|
300 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') |
|
301 with open(self.file2_path, 'w') as file2: |
|
302 file2.write(cgi_file2 % sys.executable) |
|
303 os.chmod(self.file2_path, 0777) |
|
304 |
|
305 self.cwd = os.getcwd() |
|
306 os.chdir(self.parent_dir) |
|
307 |
|
308 def tearDown(self): |
|
309 try: |
|
310 os.chdir(self.cwd) |
|
311 os.remove(self.file1_path) |
|
312 os.remove(self.file2_path) |
|
313 os.rmdir(self.cgi_dir) |
|
314 os.rmdir(self.parent_dir) |
|
315 finally: |
|
316 BaseTestCase.tearDown(self) |
|
317 |
|
318 def test_headers_and_content(self): |
|
319 res = self.request('/cgi-bin/file1.py') |
|
320 self.assertEquals(('Hello World\n', 'text/html', 200), \ |
|
321 (res.read(), res.getheader('Content-type'), res.status)) |
|
322 |
|
323 def test_post(self): |
|
324 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) |
|
325 headers = {'Content-type' : 'application/x-www-form-urlencoded'} |
|
326 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) |
|
327 |
|
328 self.assertEquals(res.read(), '1, python, 123456\n') |
|
329 |
|
330 def test_invaliduri(self): |
|
331 res = self.request('/cgi-bin/invalid') |
|
332 res.read() |
|
333 self.assertEquals(res.status, 404) |
|
334 |
|
335 def test_authorization(self): |
|
336 headers = {'Authorization' : 'Basic %s' % \ |
|
337 base64.b64encode('username:pass')} |
|
338 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) |
|
339 self.assertEquals(('Hello World\n', 'text/html', 200), \ |
|
340 (res.read(), res.getheader('Content-type'), res.status)) |
|
341 |
|
342 |
|
343 def test_main(verbose=None): |
|
344 try: |
|
345 cwd = os.getcwd() |
|
346 test_support.run_unittest(BaseHTTPServerTestCase, |
|
347 SimpleHTTPServerTestCase, |
|
348 CGIHTTPServerTestCase |
|
349 ) |
|
350 finally: |
|
351 os.chdir(cwd) |
|
352 |
|
353 if __name__ == '__main__': |
|
354 test_main() |