python-2.5.2/win32/Lib/threading.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 """Thread module emulating a subset of Java's threading model."""
       
     2 
       
     3 import sys as _sys
       
     4 
       
     5 try:
       
     6     import thread
       
     7 except ImportError:
       
     8     del _sys.modules[__name__]
       
     9     raise
       
    10 
       
    11 from time import time as _time, sleep as _sleep
       
    12 from traceback import format_exc as _format_exc
       
    13 from collections import deque
       
    14 
       
    15 # Rename some stuff so "from threading import *" is safe
       
    16 __all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
       
    17            'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
       
    18            'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
       
    19 
       
    20 _start_new_thread = thread.start_new_thread
       
    21 _allocate_lock = thread.allocate_lock
       
    22 _get_ident = thread.get_ident
       
    23 ThreadError = thread.error
       
    24 del thread
       
    25 
       
    26 
       
    27 # Debug support (adapted from ihooks.py).
       
    28 # All the major classes here derive from _Verbose.  We force that to
       
    29 # be a new-style class so that all the major classes here are new-style.
       
    30 # This helps debugging (type(instance) is more revealing for instances
       
    31 # of new-style classes).
       
    32 
       
    33 _VERBOSE = False
       
    34 
       
    35 if __debug__:
       
    36 
       
    37     class _Verbose(object):
       
    38 
       
    39         def __init__(self, verbose=None):
       
    40             if verbose is None:
       
    41                 verbose = _VERBOSE
       
    42             self.__verbose = verbose
       
    43 
       
    44         def _note(self, format, *args):
       
    45             if self.__verbose:
       
    46                 format = format % args
       
    47                 format = "%s: %s\n" % (
       
    48                     currentThread().getName(), format)
       
    49                 _sys.stderr.write(format)
       
    50 
       
    51 else:
       
    52     # Disable this when using "python -O"
       
    53     class _Verbose(object):
       
    54         def __init__(self, verbose=None):
       
    55             pass
       
    56         def _note(self, *args):
       
    57             pass
       
    58 
       
    59 # Support for profile and trace hooks
       
    60 
       
    61 _profile_hook = None
       
    62 _trace_hook = None
       
    63 
       
    64 def setprofile(func):
       
    65     global _profile_hook
       
    66     _profile_hook = func
       
    67 
       
    68 def settrace(func):
       
    69     global _trace_hook
       
    70     _trace_hook = func
       
    71 
       
    72 # Synchronization classes
       
    73 
       
    74 Lock = _allocate_lock
       
    75 
       
    76 def RLock(*args, **kwargs):
       
    77     return _RLock(*args, **kwargs)
       
    78 
       
    79 class _RLock(_Verbose):
       
    80 
       
    81     def __init__(self, verbose=None):
       
    82         _Verbose.__init__(self, verbose)
       
    83         self.__block = _allocate_lock()
       
    84         self.__owner = None
       
    85         self.__count = 0
       
    86 
       
    87     def __repr__(self):
       
    88         owner = self.__owner
       
    89         return "<%s(%s, %d)>" % (
       
    90                 self.__class__.__name__,
       
    91                 owner and owner.getName(),
       
    92                 self.__count)
       
    93 
       
    94     def acquire(self, blocking=1):
       
    95         me = currentThread()
       
    96         if self.__owner is me:
       
    97             self.__count = self.__count + 1
       
    98             if __debug__:
       
    99                 self._note("%s.acquire(%s): recursive success", self, blocking)
       
   100             return 1
       
   101         rc = self.__block.acquire(blocking)
       
   102         if rc:
       
   103             self.__owner = me
       
   104             self.__count = 1
       
   105             if __debug__:
       
   106                 self._note("%s.acquire(%s): initial success", self, blocking)
       
   107         else:
       
   108             if __debug__:
       
   109                 self._note("%s.acquire(%s): failure", self, blocking)
       
   110         return rc
       
   111 
       
   112     __enter__ = acquire
       
   113 
       
   114     def release(self):
       
   115         if self.__owner is not currentThread():
       
   116             raise RuntimeError("cannot release un-aquired lock")
       
   117         self.__count = count = self.__count - 1
       
   118         if not count:
       
   119             self.__owner = None
       
   120             self.__block.release()
       
   121             if __debug__:
       
   122                 self._note("%s.release(): final release", self)
       
   123         else:
       
   124             if __debug__:
       
   125                 self._note("%s.release(): non-final release", self)
       
   126 
       
   127     def __exit__(self, t, v, tb):
       
   128         self.release()
       
   129 
       
   130     # Internal methods used by condition variables
       
   131 
       
   132     def _acquire_restore(self, (count, owner)):
       
   133         self.__block.acquire()
       
   134         self.__count = count
       
   135         self.__owner = owner
       
   136         if __debug__:
       
   137             self._note("%s._acquire_restore()", self)
       
   138 
       
   139     def _release_save(self):
       
   140         if __debug__:
       
   141             self._note("%s._release_save()", self)
       
   142         count = self.__count
       
   143         self.__count = 0
       
   144         owner = self.__owner
       
   145         self.__owner = None
       
   146         self.__block.release()
       
   147         return (count, owner)
       
   148 
       
   149     def _is_owned(self):
       
   150         return self.__owner is currentThread()
       
   151 
       
   152 
       
   153 def Condition(*args, **kwargs):
       
   154     return _Condition(*args, **kwargs)
       
   155 
       
   156 class _Condition(_Verbose):
       
   157 
       
   158     def __init__(self, lock=None, verbose=None):
       
   159         _Verbose.__init__(self, verbose)
       
   160         if lock is None:
       
   161             lock = RLock()
       
   162         self.__lock = lock
       
   163         # Export the lock's acquire() and release() methods
       
   164         self.acquire = lock.acquire
       
   165         self.release = lock.release
       
   166         # If the lock defines _release_save() and/or _acquire_restore(),
       
   167         # these override the default implementations (which just call
       
   168         # release() and acquire() on the lock).  Ditto for _is_owned().
       
   169         try:
       
   170             self._release_save = lock._release_save
       
   171         except AttributeError:
       
   172             pass
       
   173         try:
       
   174             self._acquire_restore = lock._acquire_restore
       
   175         except AttributeError:
       
   176             pass
       
   177         try:
       
   178             self._is_owned = lock._is_owned
       
   179         except AttributeError:
       
   180             pass
       
   181         self.__waiters = []
       
   182 
       
   183     def __enter__(self):
       
   184         return self.__lock.__enter__()
       
   185 
       
   186     def __exit__(self, *args):
       
   187         return self.__lock.__exit__(*args)
       
   188 
       
   189     def __repr__(self):
       
   190         return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
       
   191 
       
   192     def _release_save(self):
       
   193         self.__lock.release()           # No state to save
       
   194 
       
   195     def _acquire_restore(self, x):
       
   196         self.__lock.acquire()           # Ignore saved state
       
   197 
       
   198     def _is_owned(self):
       
   199         # Return True if lock is owned by currentThread.
       
   200         # This method is called only if __lock doesn't have _is_owned().
       
   201         if self.__lock.acquire(0):
       
   202             self.__lock.release()
       
   203             return False
       
   204         else:
       
   205             return True
       
   206 
       
   207     def wait(self, timeout=None):
       
   208         if not self._is_owned():
       
   209             raise RuntimeError("cannot wait on un-aquired lock")
       
   210         waiter = _allocate_lock()
       
   211         waiter.acquire()
       
   212         self.__waiters.append(waiter)
       
   213         saved_state = self._release_save()
       
   214         try:    # restore state no matter what (e.g., KeyboardInterrupt)
       
   215             if timeout is None:
       
   216                 waiter.acquire()
       
   217                 if __debug__:
       
   218                     self._note("%s.wait(): got it", self)
       
   219             else:
       
   220                 # Balancing act:  We can't afford a pure busy loop, so we
       
   221                 # have to sleep; but if we sleep the whole timeout time,
       
   222                 # we'll be unresponsive.  The scheme here sleeps very
       
   223                 # little at first, longer as time goes on, but never longer
       
   224                 # than 20 times per second (or the timeout time remaining).
       
   225                 endtime = _time() + timeout
       
   226                 delay = 0.0005 # 500 us -> initial delay of 1 ms
       
   227                 while True:
       
   228                     gotit = waiter.acquire(0)
       
   229                     if gotit:
       
   230                         break
       
   231                     remaining = endtime - _time()
       
   232                     if remaining <= 0:
       
   233                         break
       
   234                     delay = min(delay * 2, remaining, .05)
       
   235                     _sleep(delay)
       
   236                 if not gotit:
       
   237                     if __debug__:
       
   238                         self._note("%s.wait(%s): timed out", self, timeout)
       
   239                     try:
       
   240                         self.__waiters.remove(waiter)
       
   241                     except ValueError:
       
   242                         pass
       
   243                 else:
       
   244                     if __debug__:
       
   245                         self._note("%s.wait(%s): got it", self, timeout)
       
   246         finally:
       
   247             self._acquire_restore(saved_state)
       
   248 
       
   249     def notify(self, n=1):
       
   250         if not self._is_owned():
       
   251             raise RuntimeError("cannot notify on un-aquired lock")
       
   252         __waiters = self.__waiters
       
   253         waiters = __waiters[:n]
       
   254         if not waiters:
       
   255             if __debug__:
       
   256                 self._note("%s.notify(): no waiters", self)
       
   257             return
       
   258         self._note("%s.notify(): notifying %d waiter%s", self, n,
       
   259                    n!=1 and "s" or "")
       
   260         for waiter in waiters:
       
   261             waiter.release()
       
   262             try:
       
   263                 __waiters.remove(waiter)
       
   264             except ValueError:
       
   265                 pass
       
   266 
       
   267     def notifyAll(self):
       
   268         self.notify(len(self.__waiters))
       
   269 
       
   270 
       
   271 def Semaphore(*args, **kwargs):
       
   272     return _Semaphore(*args, **kwargs)
       
   273 
       
   274 class _Semaphore(_Verbose):
       
   275 
       
   276     # After Tim Peters' semaphore class, but not quite the same (no maximum)
       
   277 
       
   278     def __init__(self, value=1, verbose=None):
       
   279         if value < 0:
       
   280             raise ValueError("semaphore initial value must be >= 0")
       
   281         _Verbose.__init__(self, verbose)
       
   282         self.__cond = Condition(Lock())
       
   283         self.__value = value
       
   284 
       
   285     def acquire(self, blocking=1):
       
   286         rc = False
       
   287         self.__cond.acquire()
       
   288         while self.__value == 0:
       
   289             if not blocking:
       
   290                 break
       
   291             if __debug__:
       
   292                 self._note("%s.acquire(%s): blocked waiting, value=%s",
       
   293                            self, blocking, self.__value)
       
   294             self.__cond.wait()
       
   295         else:
       
   296             self.__value = self.__value - 1
       
   297             if __debug__:
       
   298                 self._note("%s.acquire: success, value=%s",
       
   299                            self, self.__value)
       
   300             rc = True
       
   301         self.__cond.release()
       
   302         return rc
       
   303 
       
   304     __enter__ = acquire
       
   305 
       
   306     def release(self):
       
   307         self.__cond.acquire()
       
   308         self.__value = self.__value + 1
       
   309         if __debug__:
       
   310             self._note("%s.release: success, value=%s",
       
   311                        self, self.__value)
       
   312         self.__cond.notify()
       
   313         self.__cond.release()
       
   314 
       
   315     def __exit__(self, t, v, tb):
       
   316         self.release()
       
   317 
       
   318 
       
   319 def BoundedSemaphore(*args, **kwargs):
       
   320     return _BoundedSemaphore(*args, **kwargs)
       
   321 
       
   322 class _BoundedSemaphore(_Semaphore):
       
   323     """Semaphore that checks that # releases is <= # acquires"""
       
   324     def __init__(self, value=1, verbose=None):
       
   325         _Semaphore.__init__(self, value, verbose)
       
   326         self._initial_value = value
       
   327 
       
   328     def release(self):
       
   329         if self._Semaphore__value >= self._initial_value:
       
   330             raise ValueError, "Semaphore released too many times"
       
   331         return _Semaphore.release(self)
       
   332 
       
   333 
       
   334 def Event(*args, **kwargs):
       
   335     return _Event(*args, **kwargs)
       
   336 
       
   337 class _Event(_Verbose):
       
   338 
       
   339     # After Tim Peters' event class (without is_posted())
       
   340 
       
   341     def __init__(self, verbose=None):
       
   342         _Verbose.__init__(self, verbose)
       
   343         self.__cond = Condition(Lock())
       
   344         self.__flag = False
       
   345 
       
   346     def isSet(self):
       
   347         return self.__flag
       
   348 
       
   349     def set(self):
       
   350         self.__cond.acquire()
       
   351         try:
       
   352             self.__flag = True
       
   353             self.__cond.notifyAll()
       
   354         finally:
       
   355             self.__cond.release()
       
   356 
       
   357     def clear(self):
       
   358         self.__cond.acquire()
       
   359         try:
       
   360             self.__flag = False
       
   361         finally:
       
   362             self.__cond.release()
       
   363 
       
   364     def wait(self, timeout=None):
       
   365         self.__cond.acquire()
       
   366         try:
       
   367             if not self.__flag:
       
   368                 self.__cond.wait(timeout)
       
   369         finally:
       
   370             self.__cond.release()
       
   371 
       
   372 # Helper to generate new thread names
       
   373 _counter = 0
       
   374 def _newname(template="Thread-%d"):
       
   375     global _counter
       
   376     _counter = _counter + 1
       
   377     return template % _counter
       
   378 
       
   379 # Active thread administration
       
   380 _active_limbo_lock = _allocate_lock()
       
   381 _active = {}    # maps thread id to Thread object
       
   382 _limbo = {}
       
   383 
       
   384 
       
   385 # Main class for threads
       
   386 
       
   387 class Thread(_Verbose):
       
   388 
       
   389     __initialized = False
       
   390     # Need to store a reference to sys.exc_info for printing
       
   391     # out exceptions when a thread tries to use a global var. during interp.
       
   392     # shutdown and thus raises an exception about trying to perform some
       
   393     # operation on/with a NoneType
       
   394     __exc_info = _sys.exc_info
       
   395 
       
   396     def __init__(self, group=None, target=None, name=None,
       
   397                  args=(), kwargs=None, verbose=None):
       
   398         assert group is None, "group argument must be None for now"
       
   399         _Verbose.__init__(self, verbose)
       
   400         if kwargs is None:
       
   401             kwargs = {}
       
   402         self.__target = target
       
   403         self.__name = str(name or _newname())
       
   404         self.__args = args
       
   405         self.__kwargs = kwargs
       
   406         self.__daemonic = self._set_daemon()
       
   407         self.__started = False
       
   408         self.__stopped = False
       
   409         self.__block = Condition(Lock())
       
   410         self.__initialized = True
       
   411         # sys.stderr is not stored in the class like
       
   412         # sys.exc_info since it can be changed between instances
       
   413         self.__stderr = _sys.stderr
       
   414 
       
   415     def _set_daemon(self):
       
   416         # Overridden in _MainThread and _DummyThread
       
   417         return currentThread().isDaemon()
       
   418 
       
   419     def __repr__(self):
       
   420         assert self.__initialized, "Thread.__init__() was not called"
       
   421         status = "initial"
       
   422         if self.__started:
       
   423             status = "started"
       
   424         if self.__stopped:
       
   425             status = "stopped"
       
   426         if self.__daemonic:
       
   427             status = status + " daemon"
       
   428         return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
       
   429 
       
   430     def start(self):
       
   431         if not self.__initialized:
       
   432             raise RuntimeError("thread.__init__() not called")
       
   433         if self.__started:
       
   434             raise RuntimeError("thread already started")
       
   435         if __debug__:
       
   436             self._note("%s.start(): starting thread", self)
       
   437         _active_limbo_lock.acquire()
       
   438         _limbo[self] = self
       
   439         _active_limbo_lock.release()
       
   440         _start_new_thread(self.__bootstrap, ())
       
   441         self.__started = True
       
   442         _sleep(0.000001)    # 1 usec, to let the thread run (Solaris hack)
       
   443 
       
   444     def run(self):
       
   445         if self.__target:
       
   446             self.__target(*self.__args, **self.__kwargs)
       
   447 
       
   448     def __bootstrap(self):
       
   449         # Wrapper around the real bootstrap code that ignores
       
   450         # exceptions during interpreter cleanup.  Those typically
       
   451         # happen when a daemon thread wakes up at an unfortunate
       
   452         # moment, finds the world around it destroyed, and raises some
       
   453         # random exception *** while trying to report the exception in
       
   454         # __bootstrap_inner() below ***.  Those random exceptions
       
   455         # don't help anybody, and they confuse users, so we suppress
       
   456         # them.  We suppress them only when it appears that the world
       
   457         # indeed has already been destroyed, so that exceptions in
       
   458         # __bootstrap_inner() during normal business hours are properly
       
   459         # reported.  Also, we only suppress them for daemonic threads;
       
   460         # if a non-daemonic encounters this, something else is wrong.
       
   461         try:
       
   462             self.__bootstrap_inner()
       
   463         except:
       
   464             if self.__daemonic and _sys is None:
       
   465                 return
       
   466             raise
       
   467 
       
   468     def __bootstrap_inner(self):
       
   469         try:
       
   470             self.__started = True
       
   471             _active_limbo_lock.acquire()
       
   472             _active[_get_ident()] = self
       
   473             del _limbo[self]
       
   474             _active_limbo_lock.release()
       
   475             if __debug__:
       
   476                 self._note("%s.__bootstrap(): thread started", self)
       
   477 
       
   478             if _trace_hook:
       
   479                 self._note("%s.__bootstrap(): registering trace hook", self)
       
   480                 _sys.settrace(_trace_hook)
       
   481             if _profile_hook:
       
   482                 self._note("%s.__bootstrap(): registering profile hook", self)
       
   483                 _sys.setprofile(_profile_hook)
       
   484 
       
   485             try:
       
   486                 self.run()
       
   487             except SystemExit:
       
   488                 if __debug__:
       
   489                     self._note("%s.__bootstrap(): raised SystemExit", self)
       
   490             except:
       
   491                 if __debug__:
       
   492                     self._note("%s.__bootstrap(): unhandled exception", self)
       
   493                 # If sys.stderr is no more (most likely from interpreter
       
   494                 # shutdown) use self.__stderr.  Otherwise still use sys (as in
       
   495                 # _sys) in case sys.stderr was redefined since the creation of
       
   496                 # self.
       
   497                 if _sys:
       
   498                     _sys.stderr.write("Exception in thread %s:\n%s\n" %
       
   499                                       (self.getName(), _format_exc()))
       
   500                 else:
       
   501                     # Do the best job possible w/o a huge amt. of code to
       
   502                     # approximate a traceback (code ideas from
       
   503                     # Lib/traceback.py)
       
   504                     exc_type, exc_value, exc_tb = self.__exc_info()
       
   505                     try:
       
   506                         print>>self.__stderr, (
       
   507                             "Exception in thread " + self.getName() +
       
   508                             " (most likely raised during interpreter shutdown):")
       
   509                         print>>self.__stderr, (
       
   510                             "Traceback (most recent call last):")
       
   511                         while exc_tb:
       
   512                             print>>self.__stderr, (
       
   513                                 '  File "%s", line %s, in %s' %
       
   514                                 (exc_tb.tb_frame.f_code.co_filename,
       
   515                                     exc_tb.tb_lineno,
       
   516                                     exc_tb.tb_frame.f_code.co_name))
       
   517                             exc_tb = exc_tb.tb_next
       
   518                         print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
       
   519                     # Make sure that exc_tb gets deleted since it is a memory
       
   520                     # hog; deleting everything else is just for thoroughness
       
   521                     finally:
       
   522                         del exc_type, exc_value, exc_tb
       
   523             else:
       
   524                 if __debug__:
       
   525                     self._note("%s.__bootstrap(): normal return", self)
       
   526         finally:
       
   527             _active_limbo_lock.acquire()
       
   528             try:
       
   529                 self.__stop()
       
   530                 try:
       
   531                     # We don't call self.__delete() because it also
       
   532                     # grabs _active_limbo_lock.
       
   533                     del _active[_get_ident()]
       
   534                 except:
       
   535                     pass
       
   536             finally:
       
   537                 _active_limbo_lock.release()
       
   538 
       
   539     def __stop(self):
       
   540         self.__block.acquire()
       
   541         self.__stopped = True
       
   542         self.__block.notifyAll()
       
   543         self.__block.release()
       
   544 
       
   545     def __delete(self):
       
   546         "Remove current thread from the dict of currently running threads."
       
   547 
       
   548         # Notes about running with dummy_thread:
       
   549         #
       
   550         # Must take care to not raise an exception if dummy_thread is being
       
   551         # used (and thus this module is being used as an instance of
       
   552         # dummy_threading).  dummy_thread.get_ident() always returns -1 since
       
   553         # there is only one thread if dummy_thread is being used.  Thus
       
   554         # len(_active) is always <= 1 here, and any Thread instance created
       
   555         # overwrites the (if any) thread currently registered in _active.
       
   556         #
       
   557         # An instance of _MainThread is always created by 'threading'.  This
       
   558         # gets overwritten the instant an instance of Thread is created; both
       
   559         # threads return -1 from dummy_thread.get_ident() and thus have the
       
   560         # same key in the dict.  So when the _MainThread instance created by
       
   561         # 'threading' tries to clean itself up when atexit calls this method
       
   562         # it gets a KeyError if another Thread instance was created.
       
   563         #
       
   564         # This all means that KeyError from trying to delete something from
       
   565         # _active if dummy_threading is being used is a red herring.  But
       
   566         # since it isn't if dummy_threading is *not* being used then don't
       
   567         # hide the exception.
       
   568 
       
   569         _active_limbo_lock.acquire()
       
   570         try:
       
   571             try:
       
   572                 del _active[_get_ident()]
       
   573             except KeyError:
       
   574                 if 'dummy_threading' not in _sys.modules:
       
   575                     raise
       
   576         finally:
       
   577             _active_limbo_lock.release()
       
   578 
       
   579     def join(self, timeout=None):
       
   580         if not self.__initialized:
       
   581             raise RuntimeError("Thread.__init__() not called")
       
   582         if not self.__started:
       
   583             raise RuntimeError("cannot join thread before it is started")
       
   584         if self is currentThread():
       
   585             raise RuntimeError("cannot join current thread")
       
   586 
       
   587         if __debug__:
       
   588             if not self.__stopped:
       
   589                 self._note("%s.join(): waiting until thread stops", self)
       
   590         self.__block.acquire()
       
   591         try:
       
   592             if timeout is None:
       
   593                 while not self.__stopped:
       
   594                     self.__block.wait()
       
   595                 if __debug__:
       
   596                     self._note("%s.join(): thread stopped", self)
       
   597             else:
       
   598                 deadline = _time() + timeout
       
   599                 while not self.__stopped:
       
   600                     delay = deadline - _time()
       
   601                     if delay <= 0:
       
   602                         if __debug__:
       
   603                             self._note("%s.join(): timed out", self)
       
   604                         break
       
   605                     self.__block.wait(delay)
       
   606                 else:
       
   607                     if __debug__:
       
   608                         self._note("%s.join(): thread stopped", self)
       
   609         finally:
       
   610             self.__block.release()
       
   611 
       
   612     def getName(self):
       
   613         assert self.__initialized, "Thread.__init__() not called"
       
   614         return self.__name
       
   615 
       
   616     def setName(self, name):
       
   617         assert self.__initialized, "Thread.__init__() not called"
       
   618         self.__name = str(name)
       
   619 
       
   620     def isAlive(self):
       
   621         assert self.__initialized, "Thread.__init__() not called"
       
   622         return self.__started and not self.__stopped
       
   623 
       
   624     def isDaemon(self):
       
   625         assert self.__initialized, "Thread.__init__() not called"
       
   626         return self.__daemonic
       
   627 
       
   628     def setDaemon(self, daemonic):
       
   629         if not self.__initialized:
       
   630             raise RuntimeError("Thread.__init__() not called")
       
   631         if self.__started:
       
   632             raise RuntimeError("cannot set daemon status of active thread");
       
   633         self.__daemonic = daemonic
       
   634 
       
   635 # The timer class was contributed by Itamar Shtull-Trauring
       
   636 
       
   637 def Timer(*args, **kwargs):
       
   638     return _Timer(*args, **kwargs)
       
   639 
       
   640 class _Timer(Thread):
       
   641     """Call a function after a specified number of seconds:
       
   642 
       
   643     t = Timer(30.0, f, args=[], kwargs={})
       
   644     t.start()
       
   645     t.cancel() # stop the timer's action if it's still waiting
       
   646     """
       
   647 
       
   648     def __init__(self, interval, function, args=[], kwargs={}):
       
   649         Thread.__init__(self)
       
   650         self.interval = interval
       
   651         self.function = function
       
   652         self.args = args
       
   653         self.kwargs = kwargs
       
   654         self.finished = Event()
       
   655 
       
   656     def cancel(self):
       
   657         """Stop the timer if it hasn't finished yet"""
       
   658         self.finished.set()
       
   659 
       
   660     def run(self):
       
   661         self.finished.wait(self.interval)
       
   662         if not self.finished.isSet():
       
   663             self.function(*self.args, **self.kwargs)
       
   664         self.finished.set()
       
   665 
       
   666 # Special thread class to represent the main thread
       
   667 # This is garbage collected through an exit handler
       
   668 
       
   669 class _MainThread(Thread):
       
   670 
       
   671     def __init__(self):
       
   672         Thread.__init__(self, name="MainThread")
       
   673         self._Thread__started = True
       
   674         _active_limbo_lock.acquire()
       
   675         _active[_get_ident()] = self
       
   676         _active_limbo_lock.release()
       
   677 
       
   678     def _set_daemon(self):
       
   679         return False
       
   680 
       
   681     def _exitfunc(self):
       
   682         self._Thread__stop()
       
   683         t = _pickSomeNonDaemonThread()
       
   684         if t:
       
   685             if __debug__:
       
   686                 self._note("%s: waiting for other threads", self)
       
   687         while t:
       
   688             t.join()
       
   689             t = _pickSomeNonDaemonThread()
       
   690         if __debug__:
       
   691             self._note("%s: exiting", self)
       
   692         self._Thread__delete()
       
   693 
       
   694 def _pickSomeNonDaemonThread():
       
   695     for t in enumerate():
       
   696         if not t.isDaemon() and t.isAlive():
       
   697             return t
       
   698     return None
       
   699 
       
   700 
       
   701 # Dummy thread class to represent threads not started here.
       
   702 # These aren't garbage collected when they die, nor can they be waited for.
       
   703 # If they invoke anything in threading.py that calls currentThread(), they
       
   704 # leave an entry in the _active dict forever after.
       
   705 # Their purpose is to return *something* from currentThread().
       
   706 # They are marked as daemon threads so we won't wait for them
       
   707 # when we exit (conform previous semantics).
       
   708 
       
   709 class _DummyThread(Thread):
       
   710 
       
   711     def __init__(self):
       
   712         Thread.__init__(self, name=_newname("Dummy-%d"))
       
   713 
       
   714         # Thread.__block consumes an OS-level locking primitive, which
       
   715         # can never be used by a _DummyThread.  Since a _DummyThread
       
   716         # instance is immortal, that's bad, so release this resource.
       
   717         del self._Thread__block
       
   718 
       
   719         self._Thread__started = True
       
   720         _active_limbo_lock.acquire()
       
   721         _active[_get_ident()] = self
       
   722         _active_limbo_lock.release()
       
   723 
       
   724     def _set_daemon(self):
       
   725         return True
       
   726 
       
   727     def join(self, timeout=None):
       
   728         assert False, "cannot join a dummy thread"
       
   729 
       
   730 
       
   731 # Global API functions
       
   732 
       
   733 def currentThread():
       
   734     try:
       
   735         return _active[_get_ident()]
       
   736     except KeyError:
       
   737         ##print "currentThread(): no current thread for", _get_ident()
       
   738         return _DummyThread()
       
   739 
       
   740 def activeCount():
       
   741     _active_limbo_lock.acquire()
       
   742     count = len(_active) + len(_limbo)
       
   743     _active_limbo_lock.release()
       
   744     return count
       
   745 
       
   746 def enumerate():
       
   747     _active_limbo_lock.acquire()
       
   748     active = _active.values() + _limbo.values()
       
   749     _active_limbo_lock.release()
       
   750     return active
       
   751 
       
   752 from thread import stack_size
       
   753 
       
   754 # Create the main thread object,
       
   755 # and make it available for the interpreter
       
   756 # (Py_Main) as threading._shutdown.
       
   757 
       
   758 _shutdown = _MainThread()._exitfunc
       
   759 
       
   760 # get thread-local implementation, either from the thread
       
   761 # module, or from the python fallback
       
   762 
       
   763 try:
       
   764     from thread import _local as local
       
   765 except ImportError:
       
   766     from _threading_local import local
       
   767 
       
   768 
       
   769 # Self-test code
       
   770 
       
   771 def _test():
       
   772 
       
   773     class BoundedQueue(_Verbose):
       
   774 
       
   775         def __init__(self, limit):
       
   776             _Verbose.__init__(self)
       
   777             self.mon = RLock()
       
   778             self.rc = Condition(self.mon)
       
   779             self.wc = Condition(self.mon)
       
   780             self.limit = limit
       
   781             self.queue = deque()
       
   782 
       
   783         def put(self, item):
       
   784             self.mon.acquire()
       
   785             while len(self.queue) >= self.limit:
       
   786                 self._note("put(%s): queue full", item)
       
   787                 self.wc.wait()
       
   788             self.queue.append(item)
       
   789             self._note("put(%s): appended, length now %d",
       
   790                        item, len(self.queue))
       
   791             self.rc.notify()
       
   792             self.mon.release()
       
   793 
       
   794         def get(self):
       
   795             self.mon.acquire()
       
   796             while not self.queue:
       
   797                 self._note("get(): queue empty")
       
   798                 self.rc.wait()
       
   799             item = self.queue.popleft()
       
   800             self._note("get(): got %s, %d left", item, len(self.queue))
       
   801             self.wc.notify()
       
   802             self.mon.release()
       
   803             return item
       
   804 
       
   805     class ProducerThread(Thread):
       
   806 
       
   807         def __init__(self, queue, quota):
       
   808             Thread.__init__(self, name="Producer")
       
   809             self.queue = queue
       
   810             self.quota = quota
       
   811 
       
   812         def run(self):
       
   813             from random import random
       
   814             counter = 0
       
   815             while counter < self.quota:
       
   816                 counter = counter + 1
       
   817                 self.queue.put("%s.%d" % (self.getName(), counter))
       
   818                 _sleep(random() * 0.00001)
       
   819 
       
   820 
       
   821     class ConsumerThread(Thread):
       
   822 
       
   823         def __init__(self, queue, count):
       
   824             Thread.__init__(self, name="Consumer")
       
   825             self.queue = queue
       
   826             self.count = count
       
   827 
       
   828         def run(self):
       
   829             while self.count > 0:
       
   830                 item = self.queue.get()
       
   831                 print item
       
   832                 self.count = self.count - 1
       
   833 
       
   834     NP = 3
       
   835     QL = 4
       
   836     NI = 5
       
   837 
       
   838     Q = BoundedQueue(QL)
       
   839     P = []
       
   840     for i in range(NP):
       
   841         t = ProducerThread(Q, NI)
       
   842         t.setName("Producer-%d" % (i+1))
       
   843         P.append(t)
       
   844     C = ConsumerThread(Q, NI*NP)
       
   845     for t in P:
       
   846         t.start()
       
   847         _sleep(0.000001)
       
   848     C.start()
       
   849     for t in P:
       
   850         t.join()
       
   851     C.join()
       
   852 
       
   853 if __name__ == '__main__':
       
   854     _test()