|
1 from test import test_support |
|
2 import unittest |
|
3 import dummy_threading as _threading |
|
4 import time |
|
5 |
|
6 class DummyThreadingTestCase(unittest.TestCase): |
|
7 |
|
8 class TestThread(_threading.Thread): |
|
9 |
|
10 def run(self): |
|
11 global running |
|
12 global sema |
|
13 global mutex |
|
14 # Uncomment if testing another module, such as the real 'threading' |
|
15 # module. |
|
16 #delay = random.random() * 2 |
|
17 delay = 0 |
|
18 if test_support.verbose: |
|
19 print 'task', self.name, 'will run for', delay, 'sec' |
|
20 sema.acquire() |
|
21 mutex.acquire() |
|
22 running += 1 |
|
23 if test_support.verbose: |
|
24 print running, 'tasks are running' |
|
25 mutex.release() |
|
26 time.sleep(delay) |
|
27 if test_support.verbose: |
|
28 print 'task', self.name, 'done' |
|
29 mutex.acquire() |
|
30 running -= 1 |
|
31 if test_support.verbose: |
|
32 print self.name, 'is finished.', running, 'tasks are running' |
|
33 mutex.release() |
|
34 sema.release() |
|
35 |
|
36 def setUp(self): |
|
37 self.numtasks = 10 |
|
38 global sema |
|
39 sema = _threading.BoundedSemaphore(value=3) |
|
40 global mutex |
|
41 mutex = _threading.RLock() |
|
42 global running |
|
43 running = 0 |
|
44 self.threads = [] |
|
45 |
|
46 def test_tasks(self): |
|
47 for i in range(self.numtasks): |
|
48 t = self.TestThread(name="<thread %d>"%i) |
|
49 self.threads.append(t) |
|
50 t.start() |
|
51 |
|
52 if test_support.verbose: |
|
53 print 'waiting for all tasks to complete' |
|
54 for t in self.threads: |
|
55 t.join() |
|
56 if test_support.verbose: |
|
57 print 'all tasks done' |
|
58 |
|
59 def test_main(): |
|
60 test_support.run_unittest(DummyThreadingTestCase) |
|
61 |
|
62 if __name__ == '__main__': |
|
63 test_main() |