symbian-qemu-0.9.1-12/python-2.6.1/Lib/test/test_httpservers.py
changeset 1 2fb8b9db1c86
equal deleted inserted replaced
0:ffa851df0825 1:2fb8b9db1c86
       
     1 """Unittests for the various HTTPServer modules.
       
     2 
       
     3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
       
     4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
       
     5 """
       
     6 
       
     7 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
       
     8 from SimpleHTTPServer import SimpleHTTPRequestHandler
       
     9 from CGIHTTPServer import CGIHTTPRequestHandler
       
    10 
       
    11 import os
       
    12 import sys
       
    13 import base64
       
    14 import shutil
       
    15 import urllib
       
    16 import httplib
       
    17 import tempfile
       
    18 import threading
       
    19 
       
    20 import unittest
       
    21 from test import test_support
       
    22 
       
    23 
       
    24 class NoLogRequestHandler:
       
    25     def log_message(self, *args):
       
    26         # don't write log messages to stderr
       
    27         pass
       
    28 
       
    29 
       
    30 class TestServerThread(threading.Thread):
       
    31     def __init__(self, test_object, request_handler):
       
    32         threading.Thread.__init__(self)
       
    33         self.request_handler = request_handler
       
    34         self.test_object = test_object
       
    35         self.test_object.lock.acquire()
       
    36 
       
    37     def run(self):
       
    38         self.server = HTTPServer(('', 0), self.request_handler)
       
    39         self.test_object.PORT = self.server.socket.getsockname()[1]
       
    40         self.test_object.lock.release()
       
    41         try:
       
    42             self.server.serve_forever()
       
    43         finally:
       
    44             self.server.server_close()
       
    45 
       
    46     def stop(self):
       
    47         self.server.shutdown()
       
    48 
       
    49 
       
    50 class BaseTestCase(unittest.TestCase):
       
    51     def setUp(self):
       
    52         self.lock = threading.Lock()
       
    53         self.thread = TestServerThread(self, self.request_handler)
       
    54         self.thread.start()
       
    55         self.lock.acquire()
       
    56 
       
    57     def tearDown(self):
       
    58         self.lock.release()
       
    59         self.thread.stop()
       
    60 
       
    61     def request(self, uri, method='GET', body=None, headers={}):
       
    62         self.connection = httplib.HTTPConnection('localhost', self.PORT)
       
    63         self.connection.request(method, uri, body, headers)
       
    64         return self.connection.getresponse()
       
    65 
       
    66 
       
    67 class BaseHTTPServerTestCase(BaseTestCase):
       
    68     class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
       
    69         protocol_version = 'HTTP/1.1'
       
    70         default_request_version = 'HTTP/1.1'
       
    71 
       
    72         def do_TEST(self):
       
    73             self.send_response(204)
       
    74             self.send_header('Content-Type', 'text/html')
       
    75             self.send_header('Connection', 'close')
       
    76             self.end_headers()
       
    77 
       
    78         def do_KEEP(self):
       
    79             self.send_response(204)
       
    80             self.send_header('Content-Type', 'text/html')
       
    81             self.send_header('Connection', 'keep-alive')
       
    82             self.end_headers()
       
    83 
       
    84         def do_KEYERROR(self):
       
    85             self.send_error(999)
       
    86 
       
    87         def do_CUSTOM(self):
       
    88             self.send_response(999)
       
    89             self.send_header('Content-Type', 'text/html')
       
    90             self.send_header('Connection', 'close')
       
    91             self.end_headers()
       
    92 
       
    93     def setUp(self):
       
    94         BaseTestCase.setUp(self)
       
    95         self.con = httplib.HTTPConnection('localhost', self.PORT)
       
    96         self.con.connect()
       
    97 
       
    98     def test_command(self):
       
    99         self.con.request('GET', '/')
       
   100         res = self.con.getresponse()
       
   101         self.assertEquals(res.status, 501)
       
   102 
       
   103     def test_request_line_trimming(self):
       
   104         self.con._http_vsn_str = 'HTTP/1.1\n'
       
   105         self.con.putrequest('GET', '/')
       
   106         self.con.endheaders()
       
   107         res = self.con.getresponse()
       
   108         self.assertEquals(res.status, 501)
       
   109 
       
   110     def test_version_bogus(self):
       
   111         self.con._http_vsn_str = 'FUBAR'
       
   112         self.con.putrequest('GET', '/')
       
   113         self.con.endheaders()
       
   114         res = self.con.getresponse()
       
   115         self.assertEquals(res.status, 400)
       
   116 
       
   117     def test_version_digits(self):
       
   118         self.con._http_vsn_str = 'HTTP/9.9.9'
       
   119         self.con.putrequest('GET', '/')
       
   120         self.con.endheaders()
       
   121         res = self.con.getresponse()
       
   122         self.assertEquals(res.status, 400)
       
   123 
       
   124     def test_version_none_get(self):
       
   125         self.con._http_vsn_str = ''
       
   126         self.con.putrequest('GET', '/')
       
   127         self.con.endheaders()
       
   128         res = self.con.getresponse()
       
   129         self.assertEquals(res.status, 501)
       
   130 
       
   131     def test_version_none(self):
       
   132         self.con._http_vsn_str = ''
       
   133         self.con.putrequest('PUT', '/')
       
   134         self.con.endheaders()
       
   135         res = self.con.getresponse()
       
   136         self.assertEquals(res.status, 400)
       
   137 
       
   138     def test_version_invalid(self):
       
   139         self.con._http_vsn = 99
       
   140         self.con._http_vsn_str = 'HTTP/9.9'
       
   141         self.con.putrequest('GET', '/')
       
   142         self.con.endheaders()
       
   143         res = self.con.getresponse()
       
   144         self.assertEquals(res.status, 505)
       
   145 
       
   146     def test_send_blank(self):
       
   147         self.con._http_vsn_str = ''
       
   148         self.con.putrequest('', '')
       
   149         self.con.endheaders()
       
   150         res = self.con.getresponse()
       
   151         self.assertEquals(res.status, 400)
       
   152 
       
   153     def test_header_close(self):
       
   154         self.con.putrequest('GET', '/')
       
   155         self.con.putheader('Connection', 'close')
       
   156         self.con.endheaders()
       
   157         res = self.con.getresponse()
       
   158         self.assertEquals(res.status, 501)
       
   159 
       
   160     def test_head_keep_alive(self):
       
   161         self.con._http_vsn_str = 'HTTP/1.1'
       
   162         self.con.putrequest('GET', '/')
       
   163         self.con.putheader('Connection', 'keep-alive')
       
   164         self.con.endheaders()
       
   165         res = self.con.getresponse()
       
   166         self.assertEquals(res.status, 501)
       
   167 
       
   168     def test_handler(self):
       
   169         self.con.request('TEST', '/')
       
   170         res = self.con.getresponse()
       
   171         self.assertEquals(res.status, 204)
       
   172 
       
   173     def test_return_header_keep_alive(self):
       
   174         self.con.request('KEEP', '/')
       
   175         res = self.con.getresponse()
       
   176         self.assertEquals(res.getheader('Connection'), 'keep-alive')
       
   177         self.con.request('TEST', '/')
       
   178 
       
   179     def test_internal_key_error(self):
       
   180         self.con.request('KEYERROR', '/')
       
   181         res = self.con.getresponse()
       
   182         self.assertEquals(res.status, 999)
       
   183 
       
   184     def test_return_custom_status(self):
       
   185         self.con.request('CUSTOM', '/')
       
   186         res = self.con.getresponse()
       
   187         self.assertEquals(res.status, 999)
       
   188 
       
   189 
       
   190 class SimpleHTTPServerTestCase(BaseTestCase):
       
   191     class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
       
   192         pass
       
   193 
       
   194     def setUp(self):
       
   195         BaseTestCase.setUp(self)
       
   196         self.cwd = os.getcwd()
       
   197         basetempdir = tempfile.gettempdir()
       
   198         os.chdir(basetempdir)
       
   199         self.data = 'We are the knights who say Ni!'
       
   200         self.tempdir = tempfile.mkdtemp(dir=basetempdir)
       
   201         self.tempdir_name = os.path.basename(self.tempdir)
       
   202         temp = open(os.path.join(self.tempdir, 'test'), 'wb')
       
   203         temp.write(self.data)
       
   204         temp.close()
       
   205 
       
   206     def tearDown(self):
       
   207         try:
       
   208             os.chdir(self.cwd)
       
   209             try:
       
   210                 shutil.rmtree(self.tempdir)
       
   211             except:
       
   212                 pass
       
   213         finally:
       
   214             BaseTestCase.tearDown(self)
       
   215 
       
   216     def check_status_and_reason(self, response, status, data=None):
       
   217         body = response.read()
       
   218         self.assert_(response)
       
   219         self.assertEquals(response.status, status)
       
   220         self.assert_(response.reason != None)
       
   221         if data:
       
   222             self.assertEqual(data, body)
       
   223 
       
   224     def test_get(self):
       
   225         #constructs the path relative to the root directory of the HTTPServer
       
   226         response = self.request(self.tempdir_name + '/test')
       
   227         self.check_status_and_reason(response, 200, data=self.data)
       
   228         response = self.request(self.tempdir_name + '/')
       
   229         self.check_status_and_reason(response, 200)
       
   230         response = self.request(self.tempdir_name)
       
   231         self.check_status_and_reason(response, 301)
       
   232         response = self.request('/ThisDoesNotExist')
       
   233         self.check_status_and_reason(response, 404)
       
   234         response = self.request('/' + 'ThisDoesNotExist' + '/')
       
   235         self.check_status_and_reason(response, 404)
       
   236         f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
       
   237         response = self.request('/' + self.tempdir_name + '/')
       
   238         self.check_status_and_reason(response, 200)
       
   239         if os.name == 'posix':
       
   240             # chmod won't work as expected on Windows platforms
       
   241             os.chmod(self.tempdir, 0)
       
   242             response = self.request(self.tempdir_name + '/')
       
   243             self.check_status_and_reason(response, 404)
       
   244             os.chmod(self.tempdir, 0755)
       
   245 
       
   246     def test_head(self):
       
   247         response = self.request(
       
   248             self.tempdir_name + '/test', method='HEAD')
       
   249         self.check_status_and_reason(response, 200)
       
   250         self.assertEqual(response.getheader('content-length'),
       
   251                          str(len(self.data)))
       
   252         self.assertEqual(response.getheader('content-type'),
       
   253                          'application/octet-stream')
       
   254 
       
   255     def test_invalid_requests(self):
       
   256         response = self.request('/', method='FOO')
       
   257         self.check_status_and_reason(response, 501)
       
   258         # requests must be case sensitive,so this should fail too
       
   259         response = self.request('/', method='get')
       
   260         self.check_status_and_reason(response, 501)
       
   261         response = self.request('/', method='GETs')
       
   262         self.check_status_and_reason(response, 501)
       
   263 
       
   264 
       
   265 cgi_file1 = """\
       
   266 #!%s
       
   267 
       
   268 print "Content-type: text/html"
       
   269 print
       
   270 print "Hello World"
       
   271 """
       
   272 
       
   273 cgi_file2 = """\
       
   274 #!%s
       
   275 import cgi
       
   276 
       
   277 print "Content-type: text/html"
       
   278 print
       
   279 
       
   280 form = cgi.FieldStorage()
       
   281 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
       
   282               form.getfirst("bacon"))
       
   283 """
       
   284 
       
   285 class CGIHTTPServerTestCase(BaseTestCase):
       
   286     class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
       
   287         pass
       
   288 
       
   289     def setUp(self):
       
   290         BaseTestCase.setUp(self)
       
   291         self.parent_dir = tempfile.mkdtemp()
       
   292         self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
       
   293         os.mkdir(self.cgi_dir)
       
   294 
       
   295         self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
       
   296         with open(self.file1_path, 'w') as file1:
       
   297             file1.write(cgi_file1 % sys.executable)
       
   298         os.chmod(self.file1_path, 0777)
       
   299 
       
   300         self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
       
   301         with open(self.file2_path, 'w') as file2:
       
   302             file2.write(cgi_file2 % sys.executable)
       
   303         os.chmod(self.file2_path, 0777)
       
   304 
       
   305         self.cwd = os.getcwd()
       
   306         os.chdir(self.parent_dir)
       
   307 
       
   308     def tearDown(self):
       
   309         try:
       
   310             os.chdir(self.cwd)
       
   311             os.remove(self.file1_path)
       
   312             os.remove(self.file2_path)
       
   313             os.rmdir(self.cgi_dir)
       
   314             os.rmdir(self.parent_dir)
       
   315         finally:
       
   316             BaseTestCase.tearDown(self)
       
   317 
       
   318     def test_headers_and_content(self):
       
   319         res = self.request('/cgi-bin/file1.py')
       
   320         self.assertEquals(('Hello World\n', 'text/html', 200), \
       
   321              (res.read(), res.getheader('Content-type'), res.status))
       
   322 
       
   323     def test_post(self):
       
   324         params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
       
   325         headers = {'Content-type' : 'application/x-www-form-urlencoded'}
       
   326         res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
       
   327 
       
   328         self.assertEquals(res.read(), '1, python, 123456\n')
       
   329 
       
   330     def test_invaliduri(self):
       
   331         res = self.request('/cgi-bin/invalid')
       
   332         res.read()
       
   333         self.assertEquals(res.status, 404)
       
   334 
       
   335     def test_authorization(self):
       
   336         headers = {'Authorization' : 'Basic %s' % \
       
   337                 base64.b64encode('username:pass')}
       
   338         res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
       
   339         self.assertEquals(('Hello World\n', 'text/html', 200), \
       
   340              (res.read(), res.getheader('Content-type'), res.status))
       
   341 
       
   342 
       
   343 def test_main(verbose=None):
       
   344     try:
       
   345         cwd = os.getcwd()
       
   346         test_support.run_unittest(BaseHTTPServerTestCase,
       
   347                                   SimpleHTTPServerTestCase,
       
   348                                   CGIHTTPServerTestCase
       
   349                                   )
       
   350     finally:
       
   351         os.chdir(cwd)
       
   352 
       
   353 if __name__ == '__main__':
       
   354     test_main()