python-2.5.2/win32/Lib/Queue.py
changeset 0 ae805ac0140d
equal deleted inserted replaced
-1:000000000000 0:ae805ac0140d
       
     1 """A multi-producer, multi-consumer queue."""
       
     2 
       
     3 from time import time as _time
       
     4 from collections import deque
       
     5 
       
     6 __all__ = ['Empty', 'Full', 'Queue']
       
     7 
       
     8 class Empty(Exception):
       
     9     "Exception raised by Queue.get(block=0)/get_nowait()."
       
    10     pass
       
    11 
       
    12 class Full(Exception):
       
    13     "Exception raised by Queue.put(block=0)/put_nowait()."
       
    14     pass
       
    15 
       
    16 class Queue:
       
    17     """Create a queue object with a given maximum size.
       
    18 
       
    19     If maxsize is <= 0, the queue size is infinite.
       
    20     """
       
    21     def __init__(self, maxsize=0):
       
    22         try:
       
    23             import threading
       
    24         except ImportError:
       
    25             import dummy_threading as threading
       
    26         self._init(maxsize)
       
    27         # mutex must be held whenever the queue is mutating.  All methods
       
    28         # that acquire mutex must release it before returning.  mutex
       
    29         # is shared between the three conditions, so acquiring and
       
    30         # releasing the conditions also acquires and releases mutex.
       
    31         self.mutex = threading.Lock()
       
    32         # Notify not_empty whenever an item is added to the queue; a
       
    33         # thread waiting to get is notified then.
       
    34         self.not_empty = threading.Condition(self.mutex)
       
    35         # Notify not_full whenever an item is removed from the queue;
       
    36         # a thread waiting to put is notified then.
       
    37         self.not_full = threading.Condition(self.mutex)
       
    38         # Notify all_tasks_done whenever the number of unfinished tasks
       
    39         # drops to zero; thread waiting to join() is notified to resume
       
    40         self.all_tasks_done = threading.Condition(self.mutex)
       
    41         self.unfinished_tasks = 0
       
    42 
       
    43     def task_done(self):
       
    44         """Indicate that a formerly enqueued task is complete.
       
    45 
       
    46         Used by Queue consumer threads.  For each get() used to fetch a task,
       
    47         a subsequent call to task_done() tells the queue that the processing
       
    48         on the task is complete.
       
    49 
       
    50         If a join() is currently blocking, it will resume when all items
       
    51         have been processed (meaning that a task_done() call was received
       
    52         for every item that had been put() into the queue).
       
    53 
       
    54         Raises a ValueError if called more times than there were items
       
    55         placed in the queue.
       
    56         """
       
    57         self.all_tasks_done.acquire()
       
    58         try:
       
    59             unfinished = self.unfinished_tasks - 1
       
    60             if unfinished <= 0:
       
    61                 if unfinished < 0:
       
    62                     raise ValueError('task_done() called too many times')
       
    63                 self.all_tasks_done.notifyAll()
       
    64             self.unfinished_tasks = unfinished
       
    65         finally:
       
    66             self.all_tasks_done.release()
       
    67 
       
    68     def join(self):
       
    69         """Blocks until all items in the Queue have been gotten and processed.
       
    70 
       
    71         The count of unfinished tasks goes up whenever an item is added to the
       
    72         queue. The count goes down whenever a consumer thread calls task_done()
       
    73         to indicate the item was retrieved and all work on it is complete.
       
    74 
       
    75         When the count of unfinished tasks drops to zero, join() unblocks.
       
    76         """
       
    77         self.all_tasks_done.acquire()
       
    78         try:
       
    79             while self.unfinished_tasks:
       
    80                 self.all_tasks_done.wait()
       
    81         finally:
       
    82             self.all_tasks_done.release()
       
    83 
       
    84     def qsize(self):
       
    85         """Return the approximate size of the queue (not reliable!)."""
       
    86         self.mutex.acquire()
       
    87         n = self._qsize()
       
    88         self.mutex.release()
       
    89         return n
       
    90 
       
    91     def empty(self):
       
    92         """Return True if the queue is empty, False otherwise (not reliable!)."""
       
    93         self.mutex.acquire()
       
    94         n = self._empty()
       
    95         self.mutex.release()
       
    96         return n
       
    97 
       
    98     def full(self):
       
    99         """Return True if the queue is full, False otherwise (not reliable!)."""
       
   100         self.mutex.acquire()
       
   101         n = self._full()
       
   102         self.mutex.release()
       
   103         return n
       
   104 
       
   105     def put(self, item, block=True, timeout=None):
       
   106         """Put an item into the queue.
       
   107 
       
   108         If optional args 'block' is true and 'timeout' is None (the default),
       
   109         block if necessary until a free slot is available. If 'timeout' is
       
   110         a positive number, it blocks at most 'timeout' seconds and raises
       
   111         the Full exception if no free slot was available within that time.
       
   112         Otherwise ('block' is false), put an item on the queue if a free slot
       
   113         is immediately available, else raise the Full exception ('timeout'
       
   114         is ignored in that case).
       
   115         """
       
   116         self.not_full.acquire()
       
   117         try:
       
   118             if not block:
       
   119                 if self._full():
       
   120                     raise Full
       
   121             elif timeout is None:
       
   122                 while self._full():
       
   123                     self.not_full.wait()
       
   124             else:
       
   125                 if timeout < 0:
       
   126                     raise ValueError("'timeout' must be a positive number")
       
   127                 endtime = _time() + timeout
       
   128                 while self._full():
       
   129                     remaining = endtime - _time()
       
   130                     if remaining <= 0.0:
       
   131                         raise Full
       
   132                     self.not_full.wait(remaining)
       
   133             self._put(item)
       
   134             self.unfinished_tasks += 1
       
   135             self.not_empty.notify()
       
   136         finally:
       
   137             self.not_full.release()
       
   138 
       
   139     def put_nowait(self, item):
       
   140         """Put an item into the queue without blocking.
       
   141 
       
   142         Only enqueue the item if a free slot is immediately available.
       
   143         Otherwise raise the Full exception.
       
   144         """
       
   145         return self.put(item, False)
       
   146 
       
   147     def get(self, block=True, timeout=None):
       
   148         """Remove and return an item from the queue.
       
   149 
       
   150         If optional args 'block' is true and 'timeout' is None (the default),
       
   151         block if necessary until an item is available. If 'timeout' is
       
   152         a positive number, it blocks at most 'timeout' seconds and raises
       
   153         the Empty exception if no item was available within that time.
       
   154         Otherwise ('block' is false), return an item if one is immediately
       
   155         available, else raise the Empty exception ('timeout' is ignored
       
   156         in that case).
       
   157         """
       
   158         self.not_empty.acquire()
       
   159         try:
       
   160             if not block:
       
   161                 if self._empty():
       
   162                     raise Empty
       
   163             elif timeout is None:
       
   164                 while self._empty():
       
   165                     self.not_empty.wait()
       
   166             else:
       
   167                 if timeout < 0:
       
   168                     raise ValueError("'timeout' must be a positive number")
       
   169                 endtime = _time() + timeout
       
   170                 while self._empty():
       
   171                     remaining = endtime - _time()
       
   172                     if remaining <= 0.0:
       
   173                         raise Empty
       
   174                     self.not_empty.wait(remaining)
       
   175             item = self._get()
       
   176             self.not_full.notify()
       
   177             return item
       
   178         finally:
       
   179             self.not_empty.release()
       
   180 
       
   181     def get_nowait(self):
       
   182         """Remove and return an item from the queue without blocking.
       
   183 
       
   184         Only get an item if one is immediately available. Otherwise
       
   185         raise the Empty exception.
       
   186         """
       
   187         return self.get(False)
       
   188 
       
   189     # Override these methods to implement other queue organizations
       
   190     # (e.g. stack or priority queue).
       
   191     # These will only be called with appropriate locks held
       
   192 
       
   193     # Initialize the queue representation
       
   194     def _init(self, maxsize):
       
   195         self.maxsize = maxsize
       
   196         self.queue = deque()
       
   197 
       
   198     def _qsize(self):
       
   199         return len(self.queue)
       
   200 
       
   201     # Check whether the queue is empty
       
   202     def _empty(self):
       
   203         return not self.queue
       
   204 
       
   205     # Check whether the queue is full
       
   206     def _full(self):
       
   207         return self.maxsize > 0 and len(self.queue) == self.maxsize
       
   208 
       
   209     # Put a new item in the queue
       
   210     def _put(self, item):
       
   211         self.queue.append(item)
       
   212 
       
   213     # Get an item from the queue
       
   214     def _get(self):
       
   215         return self.queue.popleft()