|
1 """Thread module emulating a subset of Java's threading model.""" |
|
2 |
|
3 import sys as _sys |
|
4 |
|
5 try: |
|
6 import thread |
|
7 except ImportError: |
|
8 del _sys.modules[__name__] |
|
9 raise |
|
10 |
|
11 from time import time as _time, sleep as _sleep |
|
12 from traceback import format_exc as _format_exc |
|
13 from collections import deque |
|
14 |
|
15 # Rename some stuff so "from threading import *" is safe |
|
16 __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', |
|
17 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', |
|
18 'Timer', 'setprofile', 'settrace', 'local', 'stack_size'] |
|
19 |
|
20 _start_new_thread = thread.start_new_thread |
|
21 _allocate_lock = thread.allocate_lock |
|
22 _get_ident = thread.get_ident |
|
23 ThreadError = thread.error |
|
24 del thread |
|
25 |
|
26 |
|
27 # Debug support (adapted from ihooks.py). |
|
28 # All the major classes here derive from _Verbose. We force that to |
|
29 # be a new-style class so that all the major classes here are new-style. |
|
30 # This helps debugging (type(instance) is more revealing for instances |
|
31 # of new-style classes). |
|
32 |
|
33 _VERBOSE = False |
|
34 |
|
35 if __debug__: |
|
36 |
|
37 class _Verbose(object): |
|
38 |
|
39 def __init__(self, verbose=None): |
|
40 if verbose is None: |
|
41 verbose = _VERBOSE |
|
42 self.__verbose = verbose |
|
43 |
|
44 def _note(self, format, *args): |
|
45 if self.__verbose: |
|
46 format = format % args |
|
47 format = "%s: %s\n" % ( |
|
48 currentThread().getName(), format) |
|
49 _sys.stderr.write(format) |
|
50 |
|
51 else: |
|
52 # Disable this when using "python -O" |
|
53 class _Verbose(object): |
|
54 def __init__(self, verbose=None): |
|
55 pass |
|
56 def _note(self, *args): |
|
57 pass |
|
58 |
|
59 # Support for profile and trace hooks |
|
60 |
|
61 _profile_hook = None |
|
62 _trace_hook = None |
|
63 |
|
64 def setprofile(func): |
|
65 global _profile_hook |
|
66 _profile_hook = func |
|
67 |
|
68 def settrace(func): |
|
69 global _trace_hook |
|
70 _trace_hook = func |
|
71 |
|
72 # Synchronization classes |
|
73 |
|
74 Lock = _allocate_lock |
|
75 |
|
76 def RLock(*args, **kwargs): |
|
77 return _RLock(*args, **kwargs) |
|
78 |
|
79 class _RLock(_Verbose): |
|
80 |
|
81 def __init__(self, verbose=None): |
|
82 _Verbose.__init__(self, verbose) |
|
83 self.__block = _allocate_lock() |
|
84 self.__owner = None |
|
85 self.__count = 0 |
|
86 |
|
87 def __repr__(self): |
|
88 owner = self.__owner |
|
89 return "<%s(%s, %d)>" % ( |
|
90 self.__class__.__name__, |
|
91 owner and owner.getName(), |
|
92 self.__count) |
|
93 |
|
94 def acquire(self, blocking=1): |
|
95 me = currentThread() |
|
96 if self.__owner is me: |
|
97 self.__count = self.__count + 1 |
|
98 if __debug__: |
|
99 self._note("%s.acquire(%s): recursive success", self, blocking) |
|
100 return 1 |
|
101 rc = self.__block.acquire(blocking) |
|
102 if rc: |
|
103 self.__owner = me |
|
104 self.__count = 1 |
|
105 if __debug__: |
|
106 self._note("%s.acquire(%s): initial success", self, blocking) |
|
107 else: |
|
108 if __debug__: |
|
109 self._note("%s.acquire(%s): failure", self, blocking) |
|
110 return rc |
|
111 |
|
112 __enter__ = acquire |
|
113 |
|
114 def release(self): |
|
115 if self.__owner is not currentThread(): |
|
116 raise RuntimeError("cannot release un-aquired lock") |
|
117 self.__count = count = self.__count - 1 |
|
118 if not count: |
|
119 self.__owner = None |
|
120 self.__block.release() |
|
121 if __debug__: |
|
122 self._note("%s.release(): final release", self) |
|
123 else: |
|
124 if __debug__: |
|
125 self._note("%s.release(): non-final release", self) |
|
126 |
|
127 def __exit__(self, t, v, tb): |
|
128 self.release() |
|
129 |
|
130 # Internal methods used by condition variables |
|
131 |
|
132 def _acquire_restore(self, (count, owner)): |
|
133 self.__block.acquire() |
|
134 self.__count = count |
|
135 self.__owner = owner |
|
136 if __debug__: |
|
137 self._note("%s._acquire_restore()", self) |
|
138 |
|
139 def _release_save(self): |
|
140 if __debug__: |
|
141 self._note("%s._release_save()", self) |
|
142 count = self.__count |
|
143 self.__count = 0 |
|
144 owner = self.__owner |
|
145 self.__owner = None |
|
146 self.__block.release() |
|
147 return (count, owner) |
|
148 |
|
149 def _is_owned(self): |
|
150 return self.__owner is currentThread() |
|
151 |
|
152 |
|
153 def Condition(*args, **kwargs): |
|
154 return _Condition(*args, **kwargs) |
|
155 |
|
156 class _Condition(_Verbose): |
|
157 |
|
158 def __init__(self, lock=None, verbose=None): |
|
159 _Verbose.__init__(self, verbose) |
|
160 if lock is None: |
|
161 lock = RLock() |
|
162 self.__lock = lock |
|
163 # Export the lock's acquire() and release() methods |
|
164 self.acquire = lock.acquire |
|
165 self.release = lock.release |
|
166 # If the lock defines _release_save() and/or _acquire_restore(), |
|
167 # these override the default implementations (which just call |
|
168 # release() and acquire() on the lock). Ditto for _is_owned(). |
|
169 try: |
|
170 self._release_save = lock._release_save |
|
171 except AttributeError: |
|
172 pass |
|
173 try: |
|
174 self._acquire_restore = lock._acquire_restore |
|
175 except AttributeError: |
|
176 pass |
|
177 try: |
|
178 self._is_owned = lock._is_owned |
|
179 except AttributeError: |
|
180 pass |
|
181 self.__waiters = [] |
|
182 |
|
183 def __enter__(self): |
|
184 return self.__lock.__enter__() |
|
185 |
|
186 def __exit__(self, *args): |
|
187 return self.__lock.__exit__(*args) |
|
188 |
|
189 def __repr__(self): |
|
190 return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) |
|
191 |
|
192 def _release_save(self): |
|
193 self.__lock.release() # No state to save |
|
194 |
|
195 def _acquire_restore(self, x): |
|
196 self.__lock.acquire() # Ignore saved state |
|
197 |
|
198 def _is_owned(self): |
|
199 # Return True if lock is owned by currentThread. |
|
200 # This method is called only if __lock doesn't have _is_owned(). |
|
201 if self.__lock.acquire(0): |
|
202 self.__lock.release() |
|
203 return False |
|
204 else: |
|
205 return True |
|
206 |
|
207 def wait(self, timeout=None): |
|
208 if not self._is_owned(): |
|
209 raise RuntimeError("cannot wait on un-aquired lock") |
|
210 waiter = _allocate_lock() |
|
211 waiter.acquire() |
|
212 self.__waiters.append(waiter) |
|
213 saved_state = self._release_save() |
|
214 try: # restore state no matter what (e.g., KeyboardInterrupt) |
|
215 if timeout is None: |
|
216 waiter.acquire() |
|
217 if __debug__: |
|
218 self._note("%s.wait(): got it", self) |
|
219 else: |
|
220 # Balancing act: We can't afford a pure busy loop, so we |
|
221 # have to sleep; but if we sleep the whole timeout time, |
|
222 # we'll be unresponsive. The scheme here sleeps very |
|
223 # little at first, longer as time goes on, but never longer |
|
224 # than 20 times per second (or the timeout time remaining). |
|
225 endtime = _time() + timeout |
|
226 delay = 0.0005 # 500 us -> initial delay of 1 ms |
|
227 while True: |
|
228 gotit = waiter.acquire(0) |
|
229 if gotit: |
|
230 break |
|
231 remaining = endtime - _time() |
|
232 if remaining <= 0: |
|
233 break |
|
234 delay = min(delay * 2, remaining, .05) |
|
235 _sleep(delay) |
|
236 if not gotit: |
|
237 if __debug__: |
|
238 self._note("%s.wait(%s): timed out", self, timeout) |
|
239 try: |
|
240 self.__waiters.remove(waiter) |
|
241 except ValueError: |
|
242 pass |
|
243 else: |
|
244 if __debug__: |
|
245 self._note("%s.wait(%s): got it", self, timeout) |
|
246 finally: |
|
247 self._acquire_restore(saved_state) |
|
248 |
|
249 def notify(self, n=1): |
|
250 if not self._is_owned(): |
|
251 raise RuntimeError("cannot notify on un-aquired lock") |
|
252 __waiters = self.__waiters |
|
253 waiters = __waiters[:n] |
|
254 if not waiters: |
|
255 if __debug__: |
|
256 self._note("%s.notify(): no waiters", self) |
|
257 return |
|
258 self._note("%s.notify(): notifying %d waiter%s", self, n, |
|
259 n!=1 and "s" or "") |
|
260 for waiter in waiters: |
|
261 waiter.release() |
|
262 try: |
|
263 __waiters.remove(waiter) |
|
264 except ValueError: |
|
265 pass |
|
266 |
|
267 def notifyAll(self): |
|
268 self.notify(len(self.__waiters)) |
|
269 |
|
270 |
|
271 def Semaphore(*args, **kwargs): |
|
272 return _Semaphore(*args, **kwargs) |
|
273 |
|
274 class _Semaphore(_Verbose): |
|
275 |
|
276 # After Tim Peters' semaphore class, but not quite the same (no maximum) |
|
277 |
|
278 def __init__(self, value=1, verbose=None): |
|
279 if value < 0: |
|
280 raise ValueError("semaphore initial value must be >= 0") |
|
281 _Verbose.__init__(self, verbose) |
|
282 self.__cond = Condition(Lock()) |
|
283 self.__value = value |
|
284 |
|
285 def acquire(self, blocking=1): |
|
286 rc = False |
|
287 self.__cond.acquire() |
|
288 while self.__value == 0: |
|
289 if not blocking: |
|
290 break |
|
291 if __debug__: |
|
292 self._note("%s.acquire(%s): blocked waiting, value=%s", |
|
293 self, blocking, self.__value) |
|
294 self.__cond.wait() |
|
295 else: |
|
296 self.__value = self.__value - 1 |
|
297 if __debug__: |
|
298 self._note("%s.acquire: success, value=%s", |
|
299 self, self.__value) |
|
300 rc = True |
|
301 self.__cond.release() |
|
302 return rc |
|
303 |
|
304 __enter__ = acquire |
|
305 |
|
306 def release(self): |
|
307 self.__cond.acquire() |
|
308 self.__value = self.__value + 1 |
|
309 if __debug__: |
|
310 self._note("%s.release: success, value=%s", |
|
311 self, self.__value) |
|
312 self.__cond.notify() |
|
313 self.__cond.release() |
|
314 |
|
315 def __exit__(self, t, v, tb): |
|
316 self.release() |
|
317 |
|
318 |
|
319 def BoundedSemaphore(*args, **kwargs): |
|
320 return _BoundedSemaphore(*args, **kwargs) |
|
321 |
|
322 class _BoundedSemaphore(_Semaphore): |
|
323 """Semaphore that checks that # releases is <= # acquires""" |
|
324 def __init__(self, value=1, verbose=None): |
|
325 _Semaphore.__init__(self, value, verbose) |
|
326 self._initial_value = value |
|
327 |
|
328 def release(self): |
|
329 if self._Semaphore__value >= self._initial_value: |
|
330 raise ValueError, "Semaphore released too many times" |
|
331 return _Semaphore.release(self) |
|
332 |
|
333 |
|
334 def Event(*args, **kwargs): |
|
335 return _Event(*args, **kwargs) |
|
336 |
|
337 class _Event(_Verbose): |
|
338 |
|
339 # After Tim Peters' event class (without is_posted()) |
|
340 |
|
341 def __init__(self, verbose=None): |
|
342 _Verbose.__init__(self, verbose) |
|
343 self.__cond = Condition(Lock()) |
|
344 self.__flag = False |
|
345 |
|
346 def isSet(self): |
|
347 return self.__flag |
|
348 |
|
349 def set(self): |
|
350 self.__cond.acquire() |
|
351 try: |
|
352 self.__flag = True |
|
353 self.__cond.notifyAll() |
|
354 finally: |
|
355 self.__cond.release() |
|
356 |
|
357 def clear(self): |
|
358 self.__cond.acquire() |
|
359 try: |
|
360 self.__flag = False |
|
361 finally: |
|
362 self.__cond.release() |
|
363 |
|
364 def wait(self, timeout=None): |
|
365 self.__cond.acquire() |
|
366 try: |
|
367 if not self.__flag: |
|
368 self.__cond.wait(timeout) |
|
369 finally: |
|
370 self.__cond.release() |
|
371 |
|
372 # Helper to generate new thread names |
|
373 _counter = 0 |
|
374 def _newname(template="Thread-%d"): |
|
375 global _counter |
|
376 _counter = _counter + 1 |
|
377 return template % _counter |
|
378 |
|
379 # Active thread administration |
|
380 _active_limbo_lock = _allocate_lock() |
|
381 _active = {} # maps thread id to Thread object |
|
382 _limbo = {} |
|
383 |
|
384 |
|
385 # Main class for threads |
|
386 |
|
387 class Thread(_Verbose): |
|
388 |
|
389 __initialized = False |
|
390 # Need to store a reference to sys.exc_info for printing |
|
391 # out exceptions when a thread tries to use a global var. during interp. |
|
392 # shutdown and thus raises an exception about trying to perform some |
|
393 # operation on/with a NoneType |
|
394 __exc_info = _sys.exc_info |
|
395 |
|
396 def __init__(self, group=None, target=None, name=None, |
|
397 args=(), kwargs=None, verbose=None): |
|
398 assert group is None, "group argument must be None for now" |
|
399 _Verbose.__init__(self, verbose) |
|
400 if kwargs is None: |
|
401 kwargs = {} |
|
402 self.__target = target |
|
403 self.__name = str(name or _newname()) |
|
404 self.__args = args |
|
405 self.__kwargs = kwargs |
|
406 self.__daemonic = self._set_daemon() |
|
407 self.__started = False |
|
408 self.__stopped = False |
|
409 self.__block = Condition(Lock()) |
|
410 self.__initialized = True |
|
411 # sys.stderr is not stored in the class like |
|
412 # sys.exc_info since it can be changed between instances |
|
413 self.__stderr = _sys.stderr |
|
414 |
|
415 def _set_daemon(self): |
|
416 # Overridden in _MainThread and _DummyThread |
|
417 return currentThread().isDaemon() |
|
418 |
|
419 def __repr__(self): |
|
420 assert self.__initialized, "Thread.__init__() was not called" |
|
421 status = "initial" |
|
422 if self.__started: |
|
423 status = "started" |
|
424 if self.__stopped: |
|
425 status = "stopped" |
|
426 if self.__daemonic: |
|
427 status = status + " daemon" |
|
428 return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) |
|
429 |
|
430 def start(self): |
|
431 if not self.__initialized: |
|
432 raise RuntimeError("thread.__init__() not called") |
|
433 if self.__started: |
|
434 raise RuntimeError("thread already started") |
|
435 if __debug__: |
|
436 self._note("%s.start(): starting thread", self) |
|
437 _active_limbo_lock.acquire() |
|
438 _limbo[self] = self |
|
439 _active_limbo_lock.release() |
|
440 _start_new_thread(self.__bootstrap, ()) |
|
441 self.__started = True |
|
442 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) |
|
443 |
|
444 def run(self): |
|
445 if self.__target: |
|
446 self.__target(*self.__args, **self.__kwargs) |
|
447 |
|
448 def __bootstrap(self): |
|
449 # Wrapper around the real bootstrap code that ignores |
|
450 # exceptions during interpreter cleanup. Those typically |
|
451 # happen when a daemon thread wakes up at an unfortunate |
|
452 # moment, finds the world around it destroyed, and raises some |
|
453 # random exception *** while trying to report the exception in |
|
454 # __bootstrap_inner() below ***. Those random exceptions |
|
455 # don't help anybody, and they confuse users, so we suppress |
|
456 # them. We suppress them only when it appears that the world |
|
457 # indeed has already been destroyed, so that exceptions in |
|
458 # __bootstrap_inner() during normal business hours are properly |
|
459 # reported. Also, we only suppress them for daemonic threads; |
|
460 # if a non-daemonic encounters this, something else is wrong. |
|
461 try: |
|
462 self.__bootstrap_inner() |
|
463 except: |
|
464 if self.__daemonic and _sys is None: |
|
465 return |
|
466 raise |
|
467 |
|
468 def __bootstrap_inner(self): |
|
469 try: |
|
470 self.__started = True |
|
471 _active_limbo_lock.acquire() |
|
472 _active[_get_ident()] = self |
|
473 del _limbo[self] |
|
474 _active_limbo_lock.release() |
|
475 if __debug__: |
|
476 self._note("%s.__bootstrap(): thread started", self) |
|
477 |
|
478 if _trace_hook: |
|
479 self._note("%s.__bootstrap(): registering trace hook", self) |
|
480 _sys.settrace(_trace_hook) |
|
481 if _profile_hook: |
|
482 self._note("%s.__bootstrap(): registering profile hook", self) |
|
483 _sys.setprofile(_profile_hook) |
|
484 |
|
485 try: |
|
486 self.run() |
|
487 except SystemExit: |
|
488 if __debug__: |
|
489 self._note("%s.__bootstrap(): raised SystemExit", self) |
|
490 except: |
|
491 if __debug__: |
|
492 self._note("%s.__bootstrap(): unhandled exception", self) |
|
493 # If sys.stderr is no more (most likely from interpreter |
|
494 # shutdown) use self.__stderr. Otherwise still use sys (as in |
|
495 # _sys) in case sys.stderr was redefined since the creation of |
|
496 # self. |
|
497 if _sys: |
|
498 _sys.stderr.write("Exception in thread %s:\n%s\n" % |
|
499 (self.getName(), _format_exc())) |
|
500 else: |
|
501 # Do the best job possible w/o a huge amt. of code to |
|
502 # approximate a traceback (code ideas from |
|
503 # Lib/traceback.py) |
|
504 exc_type, exc_value, exc_tb = self.__exc_info() |
|
505 try: |
|
506 print>>self.__stderr, ( |
|
507 "Exception in thread " + self.getName() + |
|
508 " (most likely raised during interpreter shutdown):") |
|
509 print>>self.__stderr, ( |
|
510 "Traceback (most recent call last):") |
|
511 while exc_tb: |
|
512 print>>self.__stderr, ( |
|
513 ' File "%s", line %s, in %s' % |
|
514 (exc_tb.tb_frame.f_code.co_filename, |
|
515 exc_tb.tb_lineno, |
|
516 exc_tb.tb_frame.f_code.co_name)) |
|
517 exc_tb = exc_tb.tb_next |
|
518 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) |
|
519 # Make sure that exc_tb gets deleted since it is a memory |
|
520 # hog; deleting everything else is just for thoroughness |
|
521 finally: |
|
522 del exc_type, exc_value, exc_tb |
|
523 else: |
|
524 if __debug__: |
|
525 self._note("%s.__bootstrap(): normal return", self) |
|
526 finally: |
|
527 _active_limbo_lock.acquire() |
|
528 try: |
|
529 self.__stop() |
|
530 try: |
|
531 # We don't call self.__delete() because it also |
|
532 # grabs _active_limbo_lock. |
|
533 del _active[_get_ident()] |
|
534 except: |
|
535 pass |
|
536 finally: |
|
537 _active_limbo_lock.release() |
|
538 |
|
539 def __stop(self): |
|
540 self.__block.acquire() |
|
541 self.__stopped = True |
|
542 self.__block.notifyAll() |
|
543 self.__block.release() |
|
544 |
|
545 def __delete(self): |
|
546 "Remove current thread from the dict of currently running threads." |
|
547 |
|
548 # Notes about running with dummy_thread: |
|
549 # |
|
550 # Must take care to not raise an exception if dummy_thread is being |
|
551 # used (and thus this module is being used as an instance of |
|
552 # dummy_threading). dummy_thread.get_ident() always returns -1 since |
|
553 # there is only one thread if dummy_thread is being used. Thus |
|
554 # len(_active) is always <= 1 here, and any Thread instance created |
|
555 # overwrites the (if any) thread currently registered in _active. |
|
556 # |
|
557 # An instance of _MainThread is always created by 'threading'. This |
|
558 # gets overwritten the instant an instance of Thread is created; both |
|
559 # threads return -1 from dummy_thread.get_ident() and thus have the |
|
560 # same key in the dict. So when the _MainThread instance created by |
|
561 # 'threading' tries to clean itself up when atexit calls this method |
|
562 # it gets a KeyError if another Thread instance was created. |
|
563 # |
|
564 # This all means that KeyError from trying to delete something from |
|
565 # _active if dummy_threading is being used is a red herring. But |
|
566 # since it isn't if dummy_threading is *not* being used then don't |
|
567 # hide the exception. |
|
568 |
|
569 _active_limbo_lock.acquire() |
|
570 try: |
|
571 try: |
|
572 del _active[_get_ident()] |
|
573 except KeyError: |
|
574 if 'dummy_threading' not in _sys.modules: |
|
575 raise |
|
576 finally: |
|
577 _active_limbo_lock.release() |
|
578 |
|
579 def join(self, timeout=None): |
|
580 if not self.__initialized: |
|
581 raise RuntimeError("Thread.__init__() not called") |
|
582 if not self.__started: |
|
583 raise RuntimeError("cannot join thread before it is started") |
|
584 if self is currentThread(): |
|
585 raise RuntimeError("cannot join current thread") |
|
586 |
|
587 if __debug__: |
|
588 if not self.__stopped: |
|
589 self._note("%s.join(): waiting until thread stops", self) |
|
590 self.__block.acquire() |
|
591 try: |
|
592 if timeout is None: |
|
593 while not self.__stopped: |
|
594 self.__block.wait() |
|
595 if __debug__: |
|
596 self._note("%s.join(): thread stopped", self) |
|
597 else: |
|
598 deadline = _time() + timeout |
|
599 while not self.__stopped: |
|
600 delay = deadline - _time() |
|
601 if delay <= 0: |
|
602 if __debug__: |
|
603 self._note("%s.join(): timed out", self) |
|
604 break |
|
605 self.__block.wait(delay) |
|
606 else: |
|
607 if __debug__: |
|
608 self._note("%s.join(): thread stopped", self) |
|
609 finally: |
|
610 self.__block.release() |
|
611 |
|
612 def getName(self): |
|
613 assert self.__initialized, "Thread.__init__() not called" |
|
614 return self.__name |
|
615 |
|
616 def setName(self, name): |
|
617 assert self.__initialized, "Thread.__init__() not called" |
|
618 self.__name = str(name) |
|
619 |
|
620 def isAlive(self): |
|
621 assert self.__initialized, "Thread.__init__() not called" |
|
622 return self.__started and not self.__stopped |
|
623 |
|
624 def isDaemon(self): |
|
625 assert self.__initialized, "Thread.__init__() not called" |
|
626 return self.__daemonic |
|
627 |
|
628 def setDaemon(self, daemonic): |
|
629 if not self.__initialized: |
|
630 raise RuntimeError("Thread.__init__() not called") |
|
631 if self.__started: |
|
632 raise RuntimeError("cannot set daemon status of active thread"); |
|
633 self.__daemonic = daemonic |
|
634 |
|
635 # The timer class was contributed by Itamar Shtull-Trauring |
|
636 |
|
637 def Timer(*args, **kwargs): |
|
638 return _Timer(*args, **kwargs) |
|
639 |
|
640 class _Timer(Thread): |
|
641 """Call a function after a specified number of seconds: |
|
642 |
|
643 t = Timer(30.0, f, args=[], kwargs={}) |
|
644 t.start() |
|
645 t.cancel() # stop the timer's action if it's still waiting |
|
646 """ |
|
647 |
|
648 def __init__(self, interval, function, args=[], kwargs={}): |
|
649 Thread.__init__(self) |
|
650 self.interval = interval |
|
651 self.function = function |
|
652 self.args = args |
|
653 self.kwargs = kwargs |
|
654 self.finished = Event() |
|
655 |
|
656 def cancel(self): |
|
657 """Stop the timer if it hasn't finished yet""" |
|
658 self.finished.set() |
|
659 |
|
660 def run(self): |
|
661 self.finished.wait(self.interval) |
|
662 if not self.finished.isSet(): |
|
663 self.function(*self.args, **self.kwargs) |
|
664 self.finished.set() |
|
665 |
|
666 # Special thread class to represent the main thread |
|
667 # This is garbage collected through an exit handler |
|
668 |
|
669 class _MainThread(Thread): |
|
670 |
|
671 def __init__(self): |
|
672 Thread.__init__(self, name="MainThread") |
|
673 self._Thread__started = True |
|
674 _active_limbo_lock.acquire() |
|
675 _active[_get_ident()] = self |
|
676 _active_limbo_lock.release() |
|
677 |
|
678 def _set_daemon(self): |
|
679 return False |
|
680 |
|
681 def _exitfunc(self): |
|
682 self._Thread__stop() |
|
683 t = _pickSomeNonDaemonThread() |
|
684 if t: |
|
685 if __debug__: |
|
686 self._note("%s: waiting for other threads", self) |
|
687 while t: |
|
688 t.join() |
|
689 t = _pickSomeNonDaemonThread() |
|
690 if __debug__: |
|
691 self._note("%s: exiting", self) |
|
692 self._Thread__delete() |
|
693 |
|
694 def _pickSomeNonDaemonThread(): |
|
695 for t in enumerate(): |
|
696 if not t.isDaemon() and t.isAlive(): |
|
697 return t |
|
698 return None |
|
699 |
|
700 |
|
701 # Dummy thread class to represent threads not started here. |
|
702 # These aren't garbage collected when they die, nor can they be waited for. |
|
703 # If they invoke anything in threading.py that calls currentThread(), they |
|
704 # leave an entry in the _active dict forever after. |
|
705 # Their purpose is to return *something* from currentThread(). |
|
706 # They are marked as daemon threads so we won't wait for them |
|
707 # when we exit (conform previous semantics). |
|
708 |
|
709 class _DummyThread(Thread): |
|
710 |
|
711 def __init__(self): |
|
712 Thread.__init__(self, name=_newname("Dummy-%d")) |
|
713 |
|
714 # Thread.__block consumes an OS-level locking primitive, which |
|
715 # can never be used by a _DummyThread. Since a _DummyThread |
|
716 # instance is immortal, that's bad, so release this resource. |
|
717 del self._Thread__block |
|
718 |
|
719 self._Thread__started = True |
|
720 _active_limbo_lock.acquire() |
|
721 _active[_get_ident()] = self |
|
722 _active_limbo_lock.release() |
|
723 |
|
724 def _set_daemon(self): |
|
725 return True |
|
726 |
|
727 def join(self, timeout=None): |
|
728 assert False, "cannot join a dummy thread" |
|
729 |
|
730 |
|
731 # Global API functions |
|
732 |
|
733 def currentThread(): |
|
734 try: |
|
735 return _active[_get_ident()] |
|
736 except KeyError: |
|
737 ##print "currentThread(): no current thread for", _get_ident() |
|
738 return _DummyThread() |
|
739 |
|
740 def activeCount(): |
|
741 _active_limbo_lock.acquire() |
|
742 count = len(_active) + len(_limbo) |
|
743 _active_limbo_lock.release() |
|
744 return count |
|
745 |
|
746 def enumerate(): |
|
747 _active_limbo_lock.acquire() |
|
748 active = _active.values() + _limbo.values() |
|
749 _active_limbo_lock.release() |
|
750 return active |
|
751 |
|
752 from thread import stack_size |
|
753 |
|
754 # Create the main thread object, |
|
755 # and make it available for the interpreter |
|
756 # (Py_Main) as threading._shutdown. |
|
757 |
|
758 _shutdown = _MainThread()._exitfunc |
|
759 |
|
760 # get thread-local implementation, either from the thread |
|
761 # module, or from the python fallback |
|
762 |
|
763 try: |
|
764 from thread import _local as local |
|
765 except ImportError: |
|
766 from _threading_local import local |
|
767 |
|
768 |
|
769 # Self-test code |
|
770 |
|
771 def _test(): |
|
772 |
|
773 class BoundedQueue(_Verbose): |
|
774 |
|
775 def __init__(self, limit): |
|
776 _Verbose.__init__(self) |
|
777 self.mon = RLock() |
|
778 self.rc = Condition(self.mon) |
|
779 self.wc = Condition(self.mon) |
|
780 self.limit = limit |
|
781 self.queue = deque() |
|
782 |
|
783 def put(self, item): |
|
784 self.mon.acquire() |
|
785 while len(self.queue) >= self.limit: |
|
786 self._note("put(%s): queue full", item) |
|
787 self.wc.wait() |
|
788 self.queue.append(item) |
|
789 self._note("put(%s): appended, length now %d", |
|
790 item, len(self.queue)) |
|
791 self.rc.notify() |
|
792 self.mon.release() |
|
793 |
|
794 def get(self): |
|
795 self.mon.acquire() |
|
796 while not self.queue: |
|
797 self._note("get(): queue empty") |
|
798 self.rc.wait() |
|
799 item = self.queue.popleft() |
|
800 self._note("get(): got %s, %d left", item, len(self.queue)) |
|
801 self.wc.notify() |
|
802 self.mon.release() |
|
803 return item |
|
804 |
|
805 class ProducerThread(Thread): |
|
806 |
|
807 def __init__(self, queue, quota): |
|
808 Thread.__init__(self, name="Producer") |
|
809 self.queue = queue |
|
810 self.quota = quota |
|
811 |
|
812 def run(self): |
|
813 from random import random |
|
814 counter = 0 |
|
815 while counter < self.quota: |
|
816 counter = counter + 1 |
|
817 self.queue.put("%s.%d" % (self.getName(), counter)) |
|
818 _sleep(random() * 0.00001) |
|
819 |
|
820 |
|
821 class ConsumerThread(Thread): |
|
822 |
|
823 def __init__(self, queue, count): |
|
824 Thread.__init__(self, name="Consumer") |
|
825 self.queue = queue |
|
826 self.count = count |
|
827 |
|
828 def run(self): |
|
829 while self.count > 0: |
|
830 item = self.queue.get() |
|
831 print item |
|
832 self.count = self.count - 1 |
|
833 |
|
834 NP = 3 |
|
835 QL = 4 |
|
836 NI = 5 |
|
837 |
|
838 Q = BoundedQueue(QL) |
|
839 P = [] |
|
840 for i in range(NP): |
|
841 t = ProducerThread(Q, NI) |
|
842 t.setName("Producer-%d" % (i+1)) |
|
843 P.append(t) |
|
844 C = ConsumerThread(Q, NI*NP) |
|
845 for t in P: |
|
846 t.start() |
|
847 _sleep(0.000001) |
|
848 C.start() |
|
849 for t in P: |
|
850 t.join() |
|
851 C.join() |
|
852 |
|
853 if __name__ == '__main__': |
|
854 _test() |