--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/python-2.5.2/win32/Lib/test/test_threading.py Fri Apr 03 17:19:34 2009 +0100
@@ -0,0 +1,227 @@
+# Very rudimentary test of threading module
+
+import test.test_support
+from test.test_support import verbose
+import random
+import sys
+import threading
+import thread
+import time
+import unittest
+
+# A trivial mutable counter.
+class Counter(object):
+ def __init__(self):
+ self.value = 0
+ def inc(self):
+ self.value += 1
+ def dec(self):
+ self.value -= 1
+ def get(self):
+ return self.value
+
+class TestThread(threading.Thread):
+ def __init__(self, name, testcase, sema, mutex, nrunning):
+ threading.Thread.__init__(self, name=name)
+ self.testcase = testcase
+ self.sema = sema
+ self.mutex = mutex
+ self.nrunning = nrunning
+
+ def run(self):
+ delay = random.random() * 2
+ if verbose:
+ print 'task', self.getName(), 'will run for', delay, 'sec'
+
+ self.sema.acquire()
+
+ self.mutex.acquire()
+ self.nrunning.inc()
+ if verbose:
+ print self.nrunning.get(), 'tasks are running'
+ self.testcase.assert_(self.nrunning.get() <= 3)
+ self.mutex.release()
+
+ time.sleep(delay)
+ if verbose:
+ print 'task', self.getName(), 'done'
+
+ self.mutex.acquire()
+ self.nrunning.dec()
+ self.testcase.assert_(self.nrunning.get() >= 0)
+ if verbose:
+ print self.getName(), 'is finished.', self.nrunning.get(), \
+ 'tasks are running'
+ self.mutex.release()
+
+ self.sema.release()
+
+class ThreadTests(unittest.TestCase):
+
+ # Create a bunch of threads, let each do some work, wait until all are
+ # done.
+ def test_various_ops(self):
+ # This takes about n/3 seconds to run (about n/3 clumps of tasks,
+ # times about 1 second per clump).
+ NUMTASKS = 10
+
+ # no more than 3 of the 10 can run at once
+ sema = threading.BoundedSemaphore(value=3)
+ mutex = threading.RLock()
+ numrunning = Counter()
+
+ threads = []
+
+ for i in range(NUMTASKS):
+ t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
+ threads.append(t)
+ t.start()
+
+ if verbose:
+ print 'waiting for all tasks to complete'
+ for t in threads:
+ t.join(NUMTASKS)
+ self.assert_(not t.isAlive())
+ if verbose:
+ print 'all tasks done'
+ self.assertEqual(numrunning.get(), 0)
+
+ # run with a small(ish) thread stack size (256kB)
+ def test_various_ops_small_stack(self):
+ if verbose:
+ print 'with 256kB thread stack size...'
+ try:
+ threading.stack_size(262144)
+ except thread.error:
+ if verbose:
+ print 'platform does not support changing thread stack size'
+ return
+ self.test_various_ops()
+ threading.stack_size(0)
+
+ # run with a large thread stack size (1MB)
+ def test_various_ops_large_stack(self):
+ if verbose:
+ print 'with 1MB thread stack size...'
+ try:
+ threading.stack_size(0x100000)
+ except thread.error:
+ if verbose:
+ print 'platform does not support changing thread stack size'
+ return
+ self.test_various_ops()
+ threading.stack_size(0)
+
+ def test_foreign_thread(self):
+ # Check that a "foreign" thread can use the threading module.
+ def f(mutex):
+ # Acquiring an RLock forces an entry for the foreign
+ # thread to get made in the threading._active map.
+ r = threading.RLock()
+ r.acquire()
+ r.release()
+ mutex.release()
+
+ mutex = threading.Lock()
+ mutex.acquire()
+ tid = thread.start_new_thread(f, (mutex,))
+ # Wait for the thread to finish.
+ mutex.acquire()
+ self.assert_(tid in threading._active)
+ self.assert_(isinstance(threading._active[tid],
+ threading._DummyThread))
+ del threading._active[tid]
+
+ # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
+ # exposed at the Python level. This test relies on ctypes to get at it.
+ def test_PyThreadState_SetAsyncExc(self):
+ try:
+ import ctypes
+ except ImportError:
+ if verbose:
+ print "test_PyThreadState_SetAsyncExc can't import ctypes"
+ return # can't do anything
+
+ set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
+
+ class AsyncExc(Exception):
+ pass
+
+ exception = ctypes.py_object(AsyncExc)
+
+ # `worker_started` is set by the thread when it's inside a try/except
+ # block waiting to catch the asynchronously set AsyncExc exception.
+ # `worker_saw_exception` is set by the thread upon catching that
+ # exception.
+ worker_started = threading.Event()
+ worker_saw_exception = threading.Event()
+
+ class Worker(threading.Thread):
+ def run(self):
+ self.id = thread.get_ident()
+ self.finished = False
+
+ try:
+ while True:
+ worker_started.set()
+ time.sleep(0.1)
+ except AsyncExc:
+ self.finished = True
+ worker_saw_exception.set()
+
+ t = Worker()
+ t.setDaemon(True) # so if this fails, we don't hang Python at shutdown
+ t.start()
+ if verbose:
+ print " started worker thread"
+
+ # Try a thread id that doesn't make sense.
+ if verbose:
+ print " trying nonsensical thread id"
+ result = set_async_exc(ctypes.c_long(-1), exception)
+ self.assertEqual(result, 0) # no thread states modified
+
+ # Now raise an exception in the worker thread.
+ if verbose:
+ print " waiting for worker thread to get started"
+ worker_started.wait()
+ if verbose:
+ print " verifying worker hasn't exited"
+ self.assert_(not t.finished)
+ if verbose:
+ print " attempting to raise asynch exception in worker"
+ result = set_async_exc(ctypes.c_long(t.id), exception)
+ self.assertEqual(result, 1) # one thread state modified
+ if verbose:
+ print " waiting for worker to say it caught the exception"
+ worker_saw_exception.wait(timeout=10)
+ self.assert_(t.finished)
+ if verbose:
+ print " all OK -- joining worker"
+ if t.finished:
+ t.join()
+ # else the thread is still running, and we have no way to kill it
+
+ def test_enumerate_after_join(self):
+ # Try hard to trigger #1703448: a thread is still returned in
+ # threading.enumerate() after it has been join()ed.
+ enum = threading.enumerate
+ old_interval = sys.getcheckinterval()
+ sys.setcheckinterval(1)
+ try:
+ for i in xrange(1, 1000):
+ t = threading.Thread(target=lambda: None)
+ t.start()
+ t.join()
+ l = enum()
+ self.assertFalse(t in l,
+ "#1703448 triggered after %d trials: %s" % (i, l))
+ finally:
+ sys.setcheckinterval(old_interval)
+
+
+def test_main():
+ test.test_support.run_unittest(ThreadTests)
+
+if __name__ == "__main__":
+ test_main()