diff -r 000000000000 -r ae805ac0140d python-2.5.2/win32/Lib/test/test_socketserver.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/python-2.5.2/win32/Lib/test/test_socketserver.py Fri Apr 03 17:19:34 2009 +0100 @@ -0,0 +1,218 @@ +# Test suite for SocketServer.py + +from test import test_support +from test.test_support import (verbose, verify, TESTFN, TestSkipped, + reap_children) +test_support.requires('network') + +from SocketServer import * +import socket +import errno +import select +import time +import threading +import os + +NREQ = 3 +DELAY = 0.5 + +class MyMixinHandler: + def handle(self): + time.sleep(DELAY) + line = self.rfile.readline() + time.sleep(DELAY) + self.wfile.write(line) + +class MyStreamHandler(MyMixinHandler, StreamRequestHandler): + pass + +class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler): + pass + +class MyMixinServer: + def serve_a_few(self): + for i in range(NREQ): + self.handle_request() + def handle_error(self, request, client_address): + self.close_request(request) + self.server_close() + raise + +teststring = "hello world\n" + +def receive(sock, n, timeout=20): + r, w, x = select.select([sock], [], [], timeout) + if sock in r: + return sock.recv(n) + else: + raise RuntimeError, "timed out on %r" % (sock,) + +def testdgram(proto, addr): + s = socket.socket(proto, socket.SOCK_DGRAM) + s.sendto(teststring, addr) + buf = data = receive(s, 100) + while data and '\n' not in buf: + data = receive(s, 100) + buf += data + verify(buf == teststring) + s.close() + +def teststream(proto, addr): + s = socket.socket(proto, socket.SOCK_STREAM) + s.connect(addr) + s.sendall(teststring) + buf = data = receive(s, 100) + while data and '\n' not in buf: + data = receive(s, 100) + buf += data + verify(buf == teststring) + s.close() + +class ServerThread(threading.Thread): + def __init__(self, addr, svrcls, hdlrcls): + threading.Thread.__init__(self) + self.__addr = addr + self.__svrcls = svrcls + self.__hdlrcls = hdlrcls + def run(self): + class svrcls(MyMixinServer, self.__svrcls): + pass + if verbose: print "thread: creating server" + svr = svrcls(self.__addr, self.__hdlrcls) + # pull the address out of the server in case it changed + # this can happen if another process is using the port + addr = svr.server_address + if addr: + self.__addr = addr + if self.__addr != svr.socket.getsockname(): + raise RuntimeError('server_address was %s, expected %s' % + (self.__addr, svr.socket.getsockname())) + if verbose: print "thread: serving three times" + svr.serve_a_few() + if verbose: print "thread: done" + +seed = 0 +def pickport(): + global seed + seed += 1 + return 10000 + (os.getpid() % 1000)*10 + seed + +host = "localhost" +testfiles = [] +def pickaddr(proto): + if proto == socket.AF_INET: + return (host, pickport()) + else: + fn = TESTFN + str(pickport()) + if os.name == 'os2': + # AF_UNIX socket names on OS/2 require a specific prefix + # which can't include a drive letter and must also use + # backslashes as directory separators + if fn[1] == ':': + fn = fn[2:] + if fn[0] in (os.sep, os.altsep): + fn = fn[1:] + fn = os.path.join('\socket', fn) + if os.sep == '/': + fn = fn.replace(os.sep, os.altsep) + else: + fn = fn.replace(os.altsep, os.sep) + testfiles.append(fn) + return fn + +def cleanup(): + for fn in testfiles: + try: + os.remove(fn) + except os.error: + pass + testfiles[:] = [] + +def testloop(proto, servers, hdlrcls, testfunc): + for svrcls in servers: + addr = pickaddr(proto) + if verbose: + print "ADDR =", addr + print "CLASS =", svrcls + t = ServerThread(addr, svrcls, hdlrcls) + if verbose: print "server created" + t.start() + if verbose: print "server running" + for i in range(NREQ): + time.sleep(DELAY) + if verbose: print "test client", i + testfunc(proto, addr) + if verbose: print "waiting for server" + t.join() + if verbose: print "done" + +class ForgivingTCPServer(TCPServer): + # prevent errors if another process is using the port we want + def server_bind(self): + host, default_port = self.server_address + # this code shamelessly stolen from test.test_support + # the ports were changed to protect the innocent + import sys + for port in [default_port, 3434, 8798, 23833]: + try: + self.server_address = host, port + TCPServer.server_bind(self) + break + except socket.error, (err, msg): + if err != errno.EADDRINUSE: + raise + print >>sys.__stderr__, \ + ' WARNING: failed to listen on port %d, trying another' % port + +tcpservers = [ForgivingTCPServer, ThreadingTCPServer] +if hasattr(os, 'fork') and os.name not in ('os2',): + tcpservers.append(ForkingTCPServer) +udpservers = [UDPServer, ThreadingUDPServer] +if hasattr(os, 'fork') and os.name not in ('os2',): + udpservers.append(ForkingUDPServer) + +if not hasattr(socket, 'AF_UNIX'): + streamservers = [] + dgramservers = [] +else: + class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass + streamservers = [UnixStreamServer, ThreadingUnixStreamServer] + if hasattr(os, 'fork') and os.name not in ('os2',): + streamservers.append(ForkingUnixStreamServer) + class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass + dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer] + if hasattr(os, 'fork') and os.name not in ('os2',): + dgramservers.append(ForkingUnixDatagramServer) + +def sloppy_cleanup(): + # See http://python.org/sf/1540386 + # We need to reap children here otherwise a child from one server + # can be left running for the next server and cause a test failure. + time.sleep(DELAY) + reap_children() + +def testall(): + testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream) + sloppy_cleanup() + testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram) + if hasattr(socket, 'AF_UNIX'): + sloppy_cleanup() + testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream) + # Alas, on Linux (at least) recvfrom() doesn't return a meaningful + # client address so this cannot work: + ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram) + +def test_main(): + import imp + if imp.lock_held(): + # If the import lock is held, the threads will hang. + raise TestSkipped("can't run when import lock is held") + + try: + testall() + finally: + cleanup() + reap_children() + +if __name__ == "__main__": + test_main()