|
1 """A multi-producer, multi-consumer queue.""" |
|
2 |
|
3 from time import time as _time |
|
4 from collections import deque |
|
5 |
|
6 __all__ = ['Empty', 'Full', 'Queue'] |
|
7 |
|
8 class Empty(Exception): |
|
9 "Exception raised by Queue.get(block=0)/get_nowait()." |
|
10 pass |
|
11 |
|
12 class Full(Exception): |
|
13 "Exception raised by Queue.put(block=0)/put_nowait()." |
|
14 pass |
|
15 |
|
16 class Queue: |
|
17 """Create a queue object with a given maximum size. |
|
18 |
|
19 If maxsize is <= 0, the queue size is infinite. |
|
20 """ |
|
21 def __init__(self, maxsize=0): |
|
22 try: |
|
23 import threading |
|
24 except ImportError: |
|
25 import dummy_threading as threading |
|
26 self._init(maxsize) |
|
27 # mutex must be held whenever the queue is mutating. All methods |
|
28 # that acquire mutex must release it before returning. mutex |
|
29 # is shared between the three conditions, so acquiring and |
|
30 # releasing the conditions also acquires and releases mutex. |
|
31 self.mutex = threading.Lock() |
|
32 # Notify not_empty whenever an item is added to the queue; a |
|
33 # thread waiting to get is notified then. |
|
34 self.not_empty = threading.Condition(self.mutex) |
|
35 # Notify not_full whenever an item is removed from the queue; |
|
36 # a thread waiting to put is notified then. |
|
37 self.not_full = threading.Condition(self.mutex) |
|
38 # Notify all_tasks_done whenever the number of unfinished tasks |
|
39 # drops to zero; thread waiting to join() is notified to resume |
|
40 self.all_tasks_done = threading.Condition(self.mutex) |
|
41 self.unfinished_tasks = 0 |
|
42 |
|
43 def task_done(self): |
|
44 """Indicate that a formerly enqueued task is complete. |
|
45 |
|
46 Used by Queue consumer threads. For each get() used to fetch a task, |
|
47 a subsequent call to task_done() tells the queue that the processing |
|
48 on the task is complete. |
|
49 |
|
50 If a join() is currently blocking, it will resume when all items |
|
51 have been processed (meaning that a task_done() call was received |
|
52 for every item that had been put() into the queue). |
|
53 |
|
54 Raises a ValueError if called more times than there were items |
|
55 placed in the queue. |
|
56 """ |
|
57 self.all_tasks_done.acquire() |
|
58 try: |
|
59 unfinished = self.unfinished_tasks - 1 |
|
60 if unfinished <= 0: |
|
61 if unfinished < 0: |
|
62 raise ValueError('task_done() called too many times') |
|
63 self.all_tasks_done.notifyAll() |
|
64 self.unfinished_tasks = unfinished |
|
65 finally: |
|
66 self.all_tasks_done.release() |
|
67 |
|
68 def join(self): |
|
69 """Blocks until all items in the Queue have been gotten and processed. |
|
70 |
|
71 The count of unfinished tasks goes up whenever an item is added to the |
|
72 queue. The count goes down whenever a consumer thread calls task_done() |
|
73 to indicate the item was retrieved and all work on it is complete. |
|
74 |
|
75 When the count of unfinished tasks drops to zero, join() unblocks. |
|
76 """ |
|
77 self.all_tasks_done.acquire() |
|
78 try: |
|
79 while self.unfinished_tasks: |
|
80 self.all_tasks_done.wait() |
|
81 finally: |
|
82 self.all_tasks_done.release() |
|
83 |
|
84 def qsize(self): |
|
85 """Return the approximate size of the queue (not reliable!).""" |
|
86 self.mutex.acquire() |
|
87 n = self._qsize() |
|
88 self.mutex.release() |
|
89 return n |
|
90 |
|
91 def empty(self): |
|
92 """Return True if the queue is empty, False otherwise (not reliable!).""" |
|
93 self.mutex.acquire() |
|
94 n = self._empty() |
|
95 self.mutex.release() |
|
96 return n |
|
97 |
|
98 def full(self): |
|
99 """Return True if the queue is full, False otherwise (not reliable!).""" |
|
100 self.mutex.acquire() |
|
101 n = self._full() |
|
102 self.mutex.release() |
|
103 return n |
|
104 |
|
105 def put(self, item, block=True, timeout=None): |
|
106 """Put an item into the queue. |
|
107 |
|
108 If optional args 'block' is true and 'timeout' is None (the default), |
|
109 block if necessary until a free slot is available. If 'timeout' is |
|
110 a positive number, it blocks at most 'timeout' seconds and raises |
|
111 the Full exception if no free slot was available within that time. |
|
112 Otherwise ('block' is false), put an item on the queue if a free slot |
|
113 is immediately available, else raise the Full exception ('timeout' |
|
114 is ignored in that case). |
|
115 """ |
|
116 self.not_full.acquire() |
|
117 try: |
|
118 if not block: |
|
119 if self._full(): |
|
120 raise Full |
|
121 elif timeout is None: |
|
122 while self._full(): |
|
123 self.not_full.wait() |
|
124 else: |
|
125 if timeout < 0: |
|
126 raise ValueError("'timeout' must be a positive number") |
|
127 endtime = _time() + timeout |
|
128 while self._full(): |
|
129 remaining = endtime - _time() |
|
130 if remaining <= 0.0: |
|
131 raise Full |
|
132 self.not_full.wait(remaining) |
|
133 self._put(item) |
|
134 self.unfinished_tasks += 1 |
|
135 self.not_empty.notify() |
|
136 finally: |
|
137 self.not_full.release() |
|
138 |
|
139 def put_nowait(self, item): |
|
140 """Put an item into the queue without blocking. |
|
141 |
|
142 Only enqueue the item if a free slot is immediately available. |
|
143 Otherwise raise the Full exception. |
|
144 """ |
|
145 return self.put(item, False) |
|
146 |
|
147 def get(self, block=True, timeout=None): |
|
148 """Remove and return an item from the queue. |
|
149 |
|
150 If optional args 'block' is true and 'timeout' is None (the default), |
|
151 block if necessary until an item is available. If 'timeout' is |
|
152 a positive number, it blocks at most 'timeout' seconds and raises |
|
153 the Empty exception if no item was available within that time. |
|
154 Otherwise ('block' is false), return an item if one is immediately |
|
155 available, else raise the Empty exception ('timeout' is ignored |
|
156 in that case). |
|
157 """ |
|
158 self.not_empty.acquire() |
|
159 try: |
|
160 if not block: |
|
161 if self._empty(): |
|
162 raise Empty |
|
163 elif timeout is None: |
|
164 while self._empty(): |
|
165 self.not_empty.wait() |
|
166 else: |
|
167 if timeout < 0: |
|
168 raise ValueError("'timeout' must be a positive number") |
|
169 endtime = _time() + timeout |
|
170 while self._empty(): |
|
171 remaining = endtime - _time() |
|
172 if remaining <= 0.0: |
|
173 raise Empty |
|
174 self.not_empty.wait(remaining) |
|
175 item = self._get() |
|
176 self.not_full.notify() |
|
177 return item |
|
178 finally: |
|
179 self.not_empty.release() |
|
180 |
|
181 def get_nowait(self): |
|
182 """Remove and return an item from the queue without blocking. |
|
183 |
|
184 Only get an item if one is immediately available. Otherwise |
|
185 raise the Empty exception. |
|
186 """ |
|
187 return self.get(False) |
|
188 |
|
189 # Override these methods to implement other queue organizations |
|
190 # (e.g. stack or priority queue). |
|
191 # These will only be called with appropriate locks held |
|
192 |
|
193 # Initialize the queue representation |
|
194 def _init(self, maxsize): |
|
195 self.maxsize = maxsize |
|
196 self.queue = deque() |
|
197 |
|
198 def _qsize(self): |
|
199 return len(self.queue) |
|
200 |
|
201 # Check whether the queue is empty |
|
202 def _empty(self): |
|
203 return not self.queue |
|
204 |
|
205 # Check whether the queue is full |
|
206 def _full(self): |
|
207 return self.maxsize > 0 and len(self.queue) == self.maxsize |
|
208 |
|
209 # Put a new item in the queue |
|
210 def _put(self, item): |
|
211 self.queue.append(item) |
|
212 |
|
213 # Get an item from the queue |
|
214 def _get(self): |
|
215 return self.queue.popleft() |