|
1 :mod:`multiprocessing` --- Process-based "threading" interface |
|
2 ============================================================== |
|
3 |
|
4 .. module:: multiprocessing |
|
5 :synopsis: Process-based "threading" interface. |
|
6 |
|
7 .. versionadded:: 2.6 |
|
8 |
|
9 |
|
10 Introduction |
|
11 ---------------------- |
|
12 |
|
13 :mod:`multiprocessing` is a package that supports spawning processes using an |
|
14 API similar to the :mod:`threading` module. The :mod:`multiprocessing` package |
|
15 offers both local and remote concurrency, effectively side-stepping the |
|
16 :term:`Global Interpreter Lock` by using subprocesses instead of threads. Due |
|
17 to this, the :mod:`multiprocessing` module allows the programmer to fully |
|
18 leverage multiple processors on a given machine. It runs on both Unix and |
|
19 Windows. |
|
20 |
|
21 .. warning:: |
|
22 |
|
23 Some of this package's functionality requires a functioning shared semaphore |
|
24 implementation on the host operating system. Without one, the |
|
25 :mod:`multiprocessing.synchronize` module will be disabled, and attempts to |
|
26 import it will result in an :exc:`ImportError`. See |
|
27 :issue:`3770` for additional information. |
|
28 |
|
29 .. note:: |
|
30 |
|
31 Functionality within this package requires that the ``__main__`` method be |
|
32 importable by the children. This is covered in :ref:`multiprocessing-programming` |
|
33 however it is worth pointing out here. This means that some examples, such |
|
34 as the :class:`multiprocessing.Pool` examples will not work in the |
|
35 interactive interpreter. For example:: |
|
36 |
|
37 >>> from multiprocessing import Pool |
|
38 >>> p = Pool(5) |
|
39 >>> def f(x): |
|
40 ... return x*x |
|
41 ... |
|
42 >>> p.map(f, [1,2,3]) |
|
43 Process PoolWorker-1: |
|
44 Process PoolWorker-2: |
|
45 Traceback (most recent call last): |
|
46 Traceback (most recent call last): |
|
47 AttributeError: 'module' object has no attribute 'f' |
|
48 AttributeError: 'module' object has no attribute 'f' |
|
49 AttributeError: 'module' object has no attribute 'f' |
|
50 |
|
51 |
|
52 The :class:`Process` class |
|
53 ~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
54 |
|
55 In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process` |
|
56 object and then calling its :meth:`~Process.start` method. :class:`Process` |
|
57 follows the API of :class:`threading.Thread`. A trivial example of a |
|
58 multiprocess program is :: |
|
59 |
|
60 from multiprocessing import Process |
|
61 |
|
62 def f(name): |
|
63 print 'hello', name |
|
64 |
|
65 if __name__ == '__main__': |
|
66 p = Process(target=f, args=('bob',)) |
|
67 p.start() |
|
68 p.join() |
|
69 |
|
70 To show the individual process IDs involved, here is an expanded example:: |
|
71 |
|
72 from multiprocessing import Process |
|
73 import os |
|
74 |
|
75 def info(title): |
|
76 print title |
|
77 print 'module name:', __name__ |
|
78 print 'parent process:', os.getppid() |
|
79 print 'process id:', os.getpid() |
|
80 |
|
81 def f(name): |
|
82 info('function f') |
|
83 print 'hello', name |
|
84 |
|
85 if __name__ == '__main__': |
|
86 info('main line') |
|
87 p = Process(target=f, args=('bob',)) |
|
88 p.start() |
|
89 p.join() |
|
90 |
|
91 For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is |
|
92 necessary, see :ref:`multiprocessing-programming`. |
|
93 |
|
94 |
|
95 |
|
96 Exchanging objects between processes |
|
97 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
98 |
|
99 :mod:`multiprocessing` supports two types of communication channel between |
|
100 processes: |
|
101 |
|
102 **Queues** |
|
103 |
|
104 The :class:`Queue` class is a near clone of :class:`Queue.Queue`. For |
|
105 example:: |
|
106 |
|
107 from multiprocessing import Process, Queue |
|
108 |
|
109 def f(q): |
|
110 q.put([42, None, 'hello']) |
|
111 |
|
112 if __name__ == '__main__': |
|
113 q = Queue() |
|
114 p = Process(target=f, args=(q,)) |
|
115 p.start() |
|
116 print q.get() # prints "[42, None, 'hello']" |
|
117 p.join() |
|
118 |
|
119 Queues are thread and process safe. |
|
120 |
|
121 **Pipes** |
|
122 |
|
123 The :func:`Pipe` function returns a pair of connection objects connected by a |
|
124 pipe which by default is duplex (two-way). For example:: |
|
125 |
|
126 from multiprocessing import Process, Pipe |
|
127 |
|
128 def f(conn): |
|
129 conn.send([42, None, 'hello']) |
|
130 conn.close() |
|
131 |
|
132 if __name__ == '__main__': |
|
133 parent_conn, child_conn = Pipe() |
|
134 p = Process(target=f, args=(child_conn,)) |
|
135 p.start() |
|
136 print parent_conn.recv() # prints "[42, None, 'hello']" |
|
137 p.join() |
|
138 |
|
139 The two connection objects returned by :func:`Pipe` represent the two ends of |
|
140 the pipe. Each connection object has :meth:`~Connection.send` and |
|
141 :meth:`~Connection.recv` methods (among others). Note that data in a pipe |
|
142 may become corrupted if two processes (or threads) try to read from or write |
|
143 to the *same* end of the pipe at the same time. Of course there is no risk |
|
144 of corruption from processes using different ends of the pipe at the same |
|
145 time. |
|
146 |
|
147 |
|
148 Synchronization between processes |
|
149 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
150 |
|
151 :mod:`multiprocessing` contains equivalents of all the synchronization |
|
152 primitives from :mod:`threading`. For instance one can use a lock to ensure |
|
153 that only one process prints to standard output at a time:: |
|
154 |
|
155 from multiprocessing import Process, Lock |
|
156 |
|
157 def f(l, i): |
|
158 l.acquire() |
|
159 print 'hello world', i |
|
160 l.release() |
|
161 |
|
162 if __name__ == '__main__': |
|
163 lock = Lock() |
|
164 |
|
165 for num in range(10): |
|
166 Process(target=f, args=(lock, num)).start() |
|
167 |
|
168 Without using the lock output from the different processes is liable to get all |
|
169 mixed up. |
|
170 |
|
171 |
|
172 Sharing state between processes |
|
173 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
174 |
|
175 As mentioned above, when doing concurrent programming it is usually best to |
|
176 avoid using shared state as far as possible. This is particularly true when |
|
177 using multiple processes. |
|
178 |
|
179 However, if you really do need to use some shared data then |
|
180 :mod:`multiprocessing` provides a couple of ways of doing so. |
|
181 |
|
182 **Shared memory** |
|
183 |
|
184 Data can be stored in a shared memory map using :class:`Value` or |
|
185 :class:`Array`. For example, the following code :: |
|
186 |
|
187 from multiprocessing import Process, Value, Array |
|
188 |
|
189 def f(n, a): |
|
190 n.value = 3.1415927 |
|
191 for i in range(len(a)): |
|
192 a[i] = -a[i] |
|
193 |
|
194 if __name__ == '__main__': |
|
195 num = Value('d', 0.0) |
|
196 arr = Array('i', range(10)) |
|
197 |
|
198 p = Process(target=f, args=(num, arr)) |
|
199 p.start() |
|
200 p.join() |
|
201 |
|
202 print num.value |
|
203 print arr[:] |
|
204 |
|
205 will print :: |
|
206 |
|
207 3.1415927 |
|
208 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] |
|
209 |
|
210 The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are |
|
211 typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a |
|
212 double precision float and ``'i'`` indicates a signed integer. These shared |
|
213 objects will be process and thread safe. |
|
214 |
|
215 For more flexibility in using shared memory one can use the |
|
216 :mod:`multiprocessing.sharedctypes` module which supports the creation of |
|
217 arbitrary ctypes objects allocated from shared memory. |
|
218 |
|
219 **Server process** |
|
220 |
|
221 A manager object returned by :func:`Manager` controls a server process which |
|
222 holds Python objects and allows other processes to manipulate them using |
|
223 proxies. |
|
224 |
|
225 A manager returned by :func:`Manager` will support types :class:`list`, |
|
226 :class:`dict`, :class:`Namespace`, :class:`Lock`, :class:`RLock`, |
|
227 :class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`, |
|
228 :class:`Event`, :class:`Queue`, :class:`Value` and :class:`Array`. For |
|
229 example, :: |
|
230 |
|
231 from multiprocessing import Process, Manager |
|
232 |
|
233 def f(d, l): |
|
234 d[1] = '1' |
|
235 d['2'] = 2 |
|
236 d[0.25] = None |
|
237 l.reverse() |
|
238 |
|
239 if __name__ == '__main__': |
|
240 manager = Manager() |
|
241 |
|
242 d = manager.dict() |
|
243 l = manager.list(range(10)) |
|
244 |
|
245 p = Process(target=f, args=(d, l)) |
|
246 p.start() |
|
247 p.join() |
|
248 |
|
249 print d |
|
250 print l |
|
251 |
|
252 will print :: |
|
253 |
|
254 {0.25: None, 1: '1', '2': 2} |
|
255 [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] |
|
256 |
|
257 Server process managers are more flexible than using shared memory objects |
|
258 because they can be made to support arbitrary object types. Also, a single |
|
259 manager can be shared by processes on different computers over a network. |
|
260 They are, however, slower than using shared memory. |
|
261 |
|
262 |
|
263 Using a pool of workers |
|
264 ~~~~~~~~~~~~~~~~~~~~~~~ |
|
265 |
|
266 The :class:`~multiprocessing.pool.Pool` class represents a pool of worker |
|
267 processes. It has methods which allows tasks to be offloaded to the worker |
|
268 processes in a few different ways. |
|
269 |
|
270 For example:: |
|
271 |
|
272 from multiprocessing import Pool |
|
273 |
|
274 def f(x): |
|
275 return x*x |
|
276 |
|
277 if __name__ == '__main__': |
|
278 pool = Pool(processes=4) # start 4 worker processes |
|
279 result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously |
|
280 print result.get(timeout=1) # prints "100" unless your computer is *very* slow |
|
281 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" |
|
282 |
|
283 |
|
284 Reference |
|
285 --------- |
|
286 |
|
287 The :mod:`multiprocessing` package mostly replicates the API of the |
|
288 :mod:`threading` module. |
|
289 |
|
290 |
|
291 :class:`Process` and exceptions |
|
292 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
293 |
|
294 .. class:: Process([group[, target[, name[, args[, kwargs]]]]]) |
|
295 |
|
296 Process objects represent activity that is run in a separate process. The |
|
297 :class:`Process` class has equivalents of all the methods of |
|
298 :class:`threading.Thread`. |
|
299 |
|
300 The constructor should always be called with keyword arguments. *group* |
|
301 should always be ``None``; it exists solely for compatibility with |
|
302 :class:`threading.Thread`. *target* is the callable object to be invoked by |
|
303 the :meth:`run()` method. It defaults to ``None``, meaning nothing is |
|
304 called. *name* is the process name. By default, a unique name is constructed |
|
305 of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\ |
|
306 :sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length |
|
307 is determined by the *generation* of the process. *args* is the argument |
|
308 tuple for the target invocation. *kwargs* is a dictionary of keyword |
|
309 arguments for the target invocation. By default, no arguments are passed to |
|
310 *target*. |
|
311 |
|
312 If a subclass overrides the constructor, it must make sure it invokes the |
|
313 base class constructor (:meth:`Process.__init__`) before doing anything else |
|
314 to the process. |
|
315 |
|
316 .. method:: run() |
|
317 |
|
318 Method representing the process's activity. |
|
319 |
|
320 You may override this method in a subclass. The standard :meth:`run` |
|
321 method invokes the callable object passed to the object's constructor as |
|
322 the target argument, if any, with sequential and keyword arguments taken |
|
323 from the *args* and *kwargs* arguments, respectively. |
|
324 |
|
325 .. method:: start() |
|
326 |
|
327 Start the process's activity. |
|
328 |
|
329 This must be called at most once per process object. It arranges for the |
|
330 object's :meth:`run` method to be invoked in a separate process. |
|
331 |
|
332 .. method:: join([timeout]) |
|
333 |
|
334 Block the calling thread until the process whose :meth:`join` method is |
|
335 called terminates or until the optional timeout occurs. |
|
336 |
|
337 If *timeout* is ``None`` then there is no timeout. |
|
338 |
|
339 A process can be joined many times. |
|
340 |
|
341 A process cannot join itself because this would cause a deadlock. It is |
|
342 an error to attempt to join a process before it has been started. |
|
343 |
|
344 .. attribute:: name |
|
345 |
|
346 The process's name. |
|
347 |
|
348 The name is a string used for identification purposes only. It has no |
|
349 semantics. Multiple processes may be given the same name. The initial |
|
350 name is set by the constructor. |
|
351 |
|
352 .. method:: is_alive |
|
353 |
|
354 Return whether the process is alive. |
|
355 |
|
356 Roughly, a process object is alive from the moment the :meth:`start` |
|
357 method returns until the child process terminates. |
|
358 |
|
359 .. attribute:: daemon |
|
360 |
|
361 The process's daemon flag, a Boolean value. This must be called before |
|
362 :meth:`start` is called. |
|
363 |
|
364 The initial value is inherited from the creating process. |
|
365 |
|
366 When a process exits, it attempts to terminate all of its daemonic child |
|
367 processes. |
|
368 |
|
369 Note that a daemonic process is not allowed to create child processes. |
|
370 Otherwise a daemonic process would leave its children orphaned if it gets |
|
371 terminated when its parent process exits. |
|
372 |
|
373 In addition to the :class:`Threading.Thread` API, :class:`Process` objects |
|
374 also support the following attributes and methods: |
|
375 |
|
376 .. attribute:: pid |
|
377 |
|
378 Return the process ID. Before the process is spawned, this will be |
|
379 ``None``. |
|
380 |
|
381 .. attribute:: exitcode |
|
382 |
|
383 The child's exit code. This will be ``None`` if the process has not yet |
|
384 terminated. A negative value *-N* indicates that the child was terminated |
|
385 by signal *N*. |
|
386 |
|
387 .. attribute:: authkey |
|
388 |
|
389 The process's authentication key (a byte string). |
|
390 |
|
391 When :mod:`multiprocessing` is initialized the main process is assigned a |
|
392 random string using :func:`os.random`. |
|
393 |
|
394 When a :class:`Process` object is created, it will inherit the |
|
395 authentication key of its parent process, although this may be changed by |
|
396 setting :attr:`authkey` to another byte string. |
|
397 |
|
398 See :ref:`multiprocessing-auth-keys`. |
|
399 |
|
400 .. method:: terminate() |
|
401 |
|
402 Terminate the process. On Unix this is done using the ``SIGTERM`` signal; |
|
403 on Windows :cfunc:`TerminateProcess` is used. Note that exit handlers and |
|
404 finally clauses, etc., will not be executed. |
|
405 |
|
406 Note that descendant processes of the process will *not* be terminated -- |
|
407 they will simply become orphaned. |
|
408 |
|
409 .. warning:: |
|
410 |
|
411 If this method is used when the associated process is using a pipe or |
|
412 queue then the pipe or queue is liable to become corrupted and may |
|
413 become unusable by other process. Similarly, if the process has |
|
414 acquired a lock or semaphore etc. then terminating it is liable to |
|
415 cause other processes to deadlock. |
|
416 |
|
417 Note that the :meth:`start`, :meth:`join`, :meth:`is_alive` and |
|
418 :attr:`exit_code` methods should only be called by the process that created |
|
419 the process object. |
|
420 |
|
421 Example usage of some of the methods of :class:`Process`:: |
|
422 |
|
423 >>> import multiprocessing, time, signal |
|
424 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) |
|
425 >>> print p, p.is_alive() |
|
426 <Process(Process-1, initial)> False |
|
427 >>> p.start() |
|
428 >>> print p, p.is_alive() |
|
429 <Process(Process-1, started)> True |
|
430 >>> p.terminate() |
|
431 >>> print p, p.is_alive() |
|
432 <Process(Process-1, stopped[SIGTERM])> False |
|
433 >>> p.exitcode == -signal.SIGTERM |
|
434 True |
|
435 |
|
436 |
|
437 .. exception:: BufferTooShort |
|
438 |
|
439 Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied |
|
440 buffer object is too small for the message read. |
|
441 |
|
442 If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give |
|
443 the message as a byte string. |
|
444 |
|
445 |
|
446 Pipes and Queues |
|
447 ~~~~~~~~~~~~~~~~ |
|
448 |
|
449 When using multiple processes, one generally uses message passing for |
|
450 communication between processes and avoids having to use any synchronization |
|
451 primitives like locks. |
|
452 |
|
453 For passing messages one can use :func:`Pipe` (for a connection between two |
|
454 processes) or a queue (which allows multiple producers and consumers). |
|
455 |
|
456 The :class:`Queue` and :class:`JoinableQueue` types are multi-producer, |
|
457 multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the |
|
458 standard library. They differ in that :class:`Queue` lacks the |
|
459 :meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join` methods introduced |
|
460 into Python 2.5's :class:`Queue.Queue` class. |
|
461 |
|
462 If you use :class:`JoinableQueue` then you **must** call |
|
463 :meth:`JoinableQueue.task_done` for each task removed from the queue or else the |
|
464 semaphore used to count the number of unfinished tasks may eventually overflow |
|
465 raising an exception. |
|
466 |
|
467 Note that one can also create a shared queue by using a manager object -- see |
|
468 :ref:`multiprocessing-managers`. |
|
469 |
|
470 .. note:: |
|
471 |
|
472 :mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and |
|
473 :exc:`Queue.Full` exceptions to signal a timeout. They are not available in |
|
474 the :mod:`multiprocessing` namespace so you need to import them from |
|
475 :mod:`Queue`. |
|
476 |
|
477 |
|
478 .. warning:: |
|
479 |
|
480 If a process is killed using :meth:`Process.terminate` or :func:`os.kill` |
|
481 while it is trying to use a :class:`Queue`, then the data in the queue is |
|
482 likely to become corrupted. This may cause any other processes to get an |
|
483 exception when it tries to use the queue later on. |
|
484 |
|
485 .. warning:: |
|
486 |
|
487 As mentioned above, if a child process has put items on a queue (and it has |
|
488 not used :meth:`JoinableQueue.cancel_join_thread`), then that process will |
|
489 not terminate until all buffered items have been flushed to the pipe. |
|
490 |
|
491 This means that if you try joining that process you may get a deadlock unless |
|
492 you are sure that all items which have been put on the queue have been |
|
493 consumed. Similarly, if the child process is non-daemonic then the parent |
|
494 process may hang on exit when it tries to join all its non-daemonic children. |
|
495 |
|
496 Note that a queue created using a manager does not have this issue. See |
|
497 :ref:`multiprocessing-programming`. |
|
498 |
|
499 For an example of the usage of queues for interprocess communication see |
|
500 :ref:`multiprocessing-examples`. |
|
501 |
|
502 |
|
503 .. function:: Pipe([duplex]) |
|
504 |
|
505 Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing |
|
506 the ends of a pipe. |
|
507 |
|
508 If *duplex* is ``True`` (the default) then the pipe is bidirectional. If |
|
509 *duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be |
|
510 used for receiving messages and ``conn2`` can only be used for sending |
|
511 messages. |
|
512 |
|
513 |
|
514 .. class:: Queue([maxsize]) |
|
515 |
|
516 Returns a process shared queue implemented using a pipe and a few |
|
517 locks/semaphores. When a process first puts an item on the queue a feeder |
|
518 thread is started which transfers objects from a buffer into the pipe. |
|
519 |
|
520 The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the |
|
521 standard library's :mod:`Queue` module are raised to signal timeouts. |
|
522 |
|
523 :class:`Queue` implements all the methods of :class:`Queue.Queue` except for |
|
524 :meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join`. |
|
525 |
|
526 .. method:: qsize() |
|
527 |
|
528 Return the approximate size of the queue. Because of |
|
529 multithreading/multiprocessing semantics, this number is not reliable. |
|
530 |
|
531 Note that this may raise :exc:`NotImplementedError` on Unix platforms like |
|
532 Mac OS X where ``sem_getvalue()`` is not implemented. |
|
533 |
|
534 .. method:: empty() |
|
535 |
|
536 Return ``True`` if the queue is empty, ``False`` otherwise. Because of |
|
537 multithreading/multiprocessing semantics, this is not reliable. |
|
538 |
|
539 .. method:: full() |
|
540 |
|
541 Return ``True`` if the queue is full, ``False`` otherwise. Because of |
|
542 multithreading/multiprocessing semantics, this is not reliable. |
|
543 |
|
544 .. method:: put(item[, block[, timeout]]) |
|
545 |
|
546 Put item into the queue. If the optional argument *block* is ``True`` |
|
547 (the default) and *timeout* is ``None`` (the default), block if necessary until |
|
548 a free slot is available. If *timeout* is a positive number, it blocks at |
|
549 most *timeout* seconds and raises the :exc:`Queue.Full` exception if no |
|
550 free slot was available within that time. Otherwise (*block* is |
|
551 ``False``), put an item on the queue if a free slot is immediately |
|
552 available, else raise the :exc:`Queue.Full` exception (*timeout* is |
|
553 ignored in that case). |
|
554 |
|
555 .. method:: put_nowait(item) |
|
556 |
|
557 Equivalent to ``put(item, False)``. |
|
558 |
|
559 .. method:: get([block[, timeout]]) |
|
560 |
|
561 Remove and return an item from the queue. If optional args *block* is |
|
562 ``True`` (the default) and *timeout* is ``None`` (the default), block if |
|
563 necessary until an item is available. If *timeout* is a positive number, |
|
564 it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty` |
|
565 exception if no item was available within that time. Otherwise (block is |
|
566 ``False``), return an item if one is immediately available, else raise the |
|
567 :exc:`Queue.Empty` exception (*timeout* is ignored in that case). |
|
568 |
|
569 .. method:: get_nowait() |
|
570 get_no_wait() |
|
571 |
|
572 Equivalent to ``get(False)``. |
|
573 |
|
574 :class:`multiprocessing.Queue` has a few additional methods not found in |
|
575 :class:`Queue.Queue`. These methods are usually unnecessary for most |
|
576 code: |
|
577 |
|
578 .. method:: close() |
|
579 |
|
580 Indicate that no more data will be put on this queue by the current |
|
581 process. The background thread will quit once it has flushed all buffered |
|
582 data to the pipe. This is called automatically when the queue is garbage |
|
583 collected. |
|
584 |
|
585 .. method:: join_thread() |
|
586 |
|
587 Join the background thread. This can only be used after :meth:`close` has |
|
588 been called. It blocks until the background thread exits, ensuring that |
|
589 all data in the buffer has been flushed to the pipe. |
|
590 |
|
591 By default if a process is not the creator of the queue then on exit it |
|
592 will attempt to join the queue's background thread. The process can call |
|
593 :meth:`cancel_join_thread` to make :meth:`join_thread` do nothing. |
|
594 |
|
595 .. method:: cancel_join_thread() |
|
596 |
|
597 Prevent :meth:`join_thread` from blocking. In particular, this prevents |
|
598 the background thread from being joined automatically when the process |
|
599 exits -- see :meth:`join_thread`. |
|
600 |
|
601 |
|
602 .. class:: JoinableQueue([maxsize]) |
|
603 |
|
604 :class:`JoinableQueue`, a :class:`Queue` subclass, is a queue which |
|
605 additionally has :meth:`task_done` and :meth:`join` methods. |
|
606 |
|
607 .. method:: task_done() |
|
608 |
|
609 Indicate that a formerly enqueued task is complete. Used by queue consumer |
|
610 threads. For each :meth:`~Queue.get` used to fetch a task, a subsequent |
|
611 call to :meth:`task_done` tells the queue that the processing on the task |
|
612 is complete. |
|
613 |
|
614 If a :meth:`~Queue.join` is currently blocking, it will resume when all |
|
615 items have been processed (meaning that a :meth:`task_done` call was |
|
616 received for every item that had been :meth:`~Queue.put` into the queue). |
|
617 |
|
618 Raises a :exc:`ValueError` if called more times than there were items |
|
619 placed in the queue. |
|
620 |
|
621 |
|
622 .. method:: join() |
|
623 |
|
624 Block until all items in the queue have been gotten and processed. |
|
625 |
|
626 The count of unfinished tasks goes up whenever an item is added to the |
|
627 queue. The count goes down whenever a consumer thread calls |
|
628 :meth:`task_done` to indicate that the item was retrieved and all work on |
|
629 it is complete. When the count of unfinished tasks drops to zero, |
|
630 :meth:`~Queue.join` unblocks. |
|
631 |
|
632 |
|
633 Miscellaneous |
|
634 ~~~~~~~~~~~~~ |
|
635 |
|
636 .. function:: active_children() |
|
637 |
|
638 Return list of all live children of the current process. |
|
639 |
|
640 Calling this has the side affect of "joining" any processes which have |
|
641 already finished. |
|
642 |
|
643 .. function:: cpu_count() |
|
644 |
|
645 Return the number of CPUs in the system. May raise |
|
646 :exc:`NotImplementedError`. |
|
647 |
|
648 .. function:: current_process() |
|
649 |
|
650 Return the :class:`Process` object corresponding to the current process. |
|
651 |
|
652 An analogue of :func:`threading.current_thread`. |
|
653 |
|
654 .. function:: freeze_support() |
|
655 |
|
656 Add support for when a program which uses :mod:`multiprocessing` has been |
|
657 frozen to produce a Windows executable. (Has been tested with **py2exe**, |
|
658 **PyInstaller** and **cx_Freeze**.) |
|
659 |
|
660 One needs to call this function straight after the ``if __name__ == |
|
661 '__main__'`` line of the main module. For example:: |
|
662 |
|
663 from multiprocessing import Process, freeze_support |
|
664 |
|
665 def f(): |
|
666 print 'hello world!' |
|
667 |
|
668 if __name__ == '__main__': |
|
669 freeze_support() |
|
670 Process(target=f).start() |
|
671 |
|
672 If the ``freeze_support()`` line is missed out then trying to run the frozen |
|
673 executable will raise :exc:`RuntimeError`. |
|
674 |
|
675 If the module is being run normally by the Python interpreter then |
|
676 :func:`freeze_support` has no effect. |
|
677 |
|
678 .. function:: set_executable() |
|
679 |
|
680 Sets the path of the python interpreter to use when starting a child process. |
|
681 (By default :data:`sys.executable` is used). Embedders will probably need to |
|
682 do some thing like :: |
|
683 |
|
684 setExecutable(os.path.join(sys.exec_prefix, 'pythonw.exe')) |
|
685 |
|
686 before they can create child processes. (Windows only) |
|
687 |
|
688 |
|
689 .. note:: |
|
690 |
|
691 :mod:`multiprocessing` contains no analogues of |
|
692 :func:`threading.active_count`, :func:`threading.enumerate`, |
|
693 :func:`threading.settrace`, :func:`threading.setprofile`, |
|
694 :class:`threading.Timer`, or :class:`threading.local`. |
|
695 |
|
696 |
|
697 Connection Objects |
|
698 ~~~~~~~~~~~~~~~~~~ |
|
699 |
|
700 Connection objects allow the sending and receiving of picklable objects or |
|
701 strings. They can be thought of as message oriented connected sockets. |
|
702 |
|
703 Connection objects usually created using :func:`Pipe` -- see also |
|
704 :ref:`multiprocessing-listeners-clients`. |
|
705 |
|
706 .. class:: Connection |
|
707 |
|
708 .. method:: send(obj) |
|
709 |
|
710 Send an object to the other end of the connection which should be read |
|
711 using :meth:`recv`. |
|
712 |
|
713 The object must be picklable. |
|
714 |
|
715 .. method:: recv() |
|
716 |
|
717 Return an object sent from the other end of the connection using |
|
718 :meth:`send`. Raises :exc:`EOFError` if there is nothing left to receive |
|
719 and the other end was closed. |
|
720 |
|
721 .. method:: fileno() |
|
722 |
|
723 Returns the file descriptor or handle used by the connection. |
|
724 |
|
725 .. method:: close() |
|
726 |
|
727 Close the connection. |
|
728 |
|
729 This is called automatically when the connection is garbage collected. |
|
730 |
|
731 .. method:: poll([timeout]) |
|
732 |
|
733 Return whether there is any data available to be read. |
|
734 |
|
735 If *timeout* is not specified then it will return immediately. If |
|
736 *timeout* is a number then this specifies the maximum time in seconds to |
|
737 block. If *timeout* is ``None`` then an infinite timeout is used. |
|
738 |
|
739 .. method:: send_bytes(buffer[, offset[, size]]) |
|
740 |
|
741 Send byte data from an object supporting the buffer interface as a |
|
742 complete message. |
|
743 |
|
744 If *offset* is given then data is read from that position in *buffer*. If |
|
745 *size* is given then that many bytes will be read from buffer. |
|
746 |
|
747 .. method:: recv_bytes([maxlength]) |
|
748 |
|
749 Return a complete message of byte data sent from the other end of the |
|
750 connection as a string. Raises :exc:`EOFError` if there is nothing left |
|
751 to receive and the other end has closed. |
|
752 |
|
753 If *maxlength* is specified and the message is longer than *maxlength* |
|
754 then :exc:`IOError` is raised and the connection will no longer be |
|
755 readable. |
|
756 |
|
757 .. method:: recv_bytes_into(buffer[, offset]) |
|
758 |
|
759 Read into *buffer* a complete message of byte data sent from the other end |
|
760 of the connection and return the number of bytes in the message. Raises |
|
761 :exc:`EOFError` if there is nothing left to receive and the other end was |
|
762 closed. |
|
763 |
|
764 *buffer* must be an object satisfying the writable buffer interface. If |
|
765 *offset* is given then the message will be written into the buffer from |
|
766 *that position. Offset must be a non-negative integer less than the |
|
767 *length of *buffer* (in bytes). |
|
768 |
|
769 If the buffer is too short then a :exc:`BufferTooShort` exception is |
|
770 raised and the complete message is available as ``e.args[0]`` where ``e`` |
|
771 is the exception instance. |
|
772 |
|
773 |
|
774 For example: |
|
775 |
|
776 >>> from multiprocessing import Pipe |
|
777 >>> a, b = Pipe() |
|
778 >>> a.send([1, 'hello', None]) |
|
779 >>> b.recv() |
|
780 [1, 'hello', None] |
|
781 >>> b.send_bytes('thank you') |
|
782 >>> a.recv_bytes() |
|
783 'thank you' |
|
784 >>> import array |
|
785 >>> arr1 = array.array('i', range(5)) |
|
786 >>> arr2 = array.array('i', [0] * 10) |
|
787 >>> a.send_bytes(arr1) |
|
788 >>> count = b.recv_bytes_into(arr2) |
|
789 >>> assert count == len(arr1) * arr1.itemsize |
|
790 >>> arr2 |
|
791 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0]) |
|
792 |
|
793 |
|
794 .. warning:: |
|
795 |
|
796 The :meth:`Connection.recv` method automatically unpickles the data it |
|
797 receives, which can be a security risk unless you can trust the process |
|
798 which sent the message. |
|
799 |
|
800 Therefore, unless the connection object was produced using :func:`Pipe` you |
|
801 should only use the :meth:`~Connection.recv` and :meth:`~Connection.send` |
|
802 methods after performing some sort of authentication. See |
|
803 :ref:`multiprocessing-auth-keys`. |
|
804 |
|
805 .. warning:: |
|
806 |
|
807 If a process is killed while it is trying to read or write to a pipe then |
|
808 the data in the pipe is likely to become corrupted, because it may become |
|
809 impossible to be sure where the message boundaries lie. |
|
810 |
|
811 |
|
812 Synchronization primitives |
|
813 ~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
814 |
|
815 Generally synchronization primitives are not as necessary in a multiprocess |
|
816 program as they are in a multithreaded program. See the documentation for |
|
817 :mod:`threading` module. |
|
818 |
|
819 Note that one can also create synchronization primitives by using a manager |
|
820 object -- see :ref:`multiprocessing-managers`. |
|
821 |
|
822 .. class:: BoundedSemaphore([value]) |
|
823 |
|
824 A bounded semaphore object: a clone of :class:`threading.BoundedSemaphore`. |
|
825 |
|
826 (On Mac OS X this is indistinguishable from :class:`Semaphore` because |
|
827 ``sem_getvalue()`` is not implemented on that platform). |
|
828 |
|
829 .. class:: Condition([lock]) |
|
830 |
|
831 A condition variable: a clone of :class:`threading.Condition`. |
|
832 |
|
833 If *lock* is specified then it should be a :class:`Lock` or :class:`RLock` |
|
834 object from :mod:`multiprocessing`. |
|
835 |
|
836 .. class:: Event() |
|
837 |
|
838 A clone of :class:`threading.Event`. |
|
839 |
|
840 .. class:: Lock() |
|
841 |
|
842 A non-recursive lock object: a clone of :class:`threading.Lock`. |
|
843 |
|
844 .. class:: RLock() |
|
845 |
|
846 A recursive lock object: a clone of :class:`threading.RLock`. |
|
847 |
|
848 .. class:: Semaphore([value]) |
|
849 |
|
850 A bounded semaphore object: a clone of :class:`threading.Semaphore`. |
|
851 |
|
852 .. note:: |
|
853 |
|
854 The :meth:`acquire` method of :class:`BoundedSemaphore`, :class:`Lock`, |
|
855 :class:`RLock` and :class:`Semaphore` has a timeout parameter not supported |
|
856 by the equivalents in :mod:`threading`. The signature is |
|
857 ``acquire(block=True, timeout=None)`` with keyword parameters being |
|
858 acceptable. If *block* is ``True`` and *timeout* is not ``None`` then it |
|
859 specifies a timeout in seconds. If *block* is ``False`` then *timeout* is |
|
860 ignored. |
|
861 |
|
862 Note that on OS/X ``sem_timedwait`` is unsupported, so timeout arguments |
|
863 for these will be ignored. |
|
864 |
|
865 .. note:: |
|
866 |
|
867 If the SIGINT signal generated by Ctrl-C arrives while the main thread is |
|
868 blocked by a call to :meth:`BoundedSemaphore.acquire`, :meth:`Lock.acquire`, |
|
869 :meth:`RLock.acquire`, :meth:`Semaphore.acquire`, :meth:`Condition.acquire` |
|
870 or :meth:`Condition.wait` then the call will be immediately interrupted and |
|
871 :exc:`KeyboardInterrupt` will be raised. |
|
872 |
|
873 This differs from the behaviour of :mod:`threading` where SIGINT will be |
|
874 ignored while the equivalent blocking calls are in progress. |
|
875 |
|
876 |
|
877 Shared :mod:`ctypes` Objects |
|
878 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
879 |
|
880 It is possible to create shared objects using shared memory which can be |
|
881 inherited by child processes. |
|
882 |
|
883 .. function:: Value(typecode_or_type[, *args, lock]]) |
|
884 |
|
885 Return a :mod:`ctypes` object allocated from shared memory. By default the |
|
886 return value is actually a synchronized wrapper for the object. |
|
887 |
|
888 *typecode_or_type* determines the type of the returned object: it is either a |
|
889 ctypes type or a one character typecode of the kind used by the :mod:`array` |
|
890 module. *\*args* is passed on to the constructor for the type. |
|
891 |
|
892 If *lock* is ``True`` (the default) then a new lock object is created to |
|
893 synchronize access to the value. If *lock* is a :class:`Lock` or |
|
894 :class:`RLock` object then that will be used to synchronize access to the |
|
895 value. If *lock* is ``False`` then access to the returned object will not be |
|
896 automatically protected by a lock, so it will not necessarily be |
|
897 "process-safe". |
|
898 |
|
899 Note that *lock* is a keyword-only argument. |
|
900 |
|
901 .. function:: Array(typecode_or_type, size_or_initializer, *, lock=True) |
|
902 |
|
903 Return a ctypes array allocated from shared memory. By default the return |
|
904 value is actually a synchronized wrapper for the array. |
|
905 |
|
906 *typecode_or_type* determines the type of the elements of the returned array: |
|
907 it is either a ctypes type or a one character typecode of the kind used by |
|
908 the :mod:`array` module. If *size_or_initializer* is an integer, then it |
|
909 determines the length of the array, and the array will be initially zeroed. |
|
910 Otherwise, *size_or_initializer* is a sequence which is used to initialize |
|
911 the array and whose length determines the length of the array. |
|
912 |
|
913 If *lock* is ``True`` (the default) then a new lock object is created to |
|
914 synchronize access to the value. If *lock* is a :class:`Lock` or |
|
915 :class:`RLock` object then that will be used to synchronize access to the |
|
916 value. If *lock* is ``False`` then access to the returned object will not be |
|
917 automatically protected by a lock, so it will not necessarily be |
|
918 "process-safe". |
|
919 |
|
920 Note that *lock* is a keyword only argument. |
|
921 |
|
922 Note that an array of :data:`ctypes.c_char` has *value* and *rawvalue* |
|
923 attributes which allow one to use it to store and retrieve strings. |
|
924 |
|
925 |
|
926 The :mod:`multiprocessing.sharedctypes` module |
|
927 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |
|
928 |
|
929 .. module:: multiprocessing.sharedctypes |
|
930 :synopsis: Allocate ctypes objects from shared memory. |
|
931 |
|
932 The :mod:`multiprocessing.sharedctypes` module provides functions for allocating |
|
933 :mod:`ctypes` objects from shared memory which can be inherited by child |
|
934 processes. |
|
935 |
|
936 .. note:: |
|
937 |
|
938 Although it is possible to store a pointer in shared memory remember that |
|
939 this will refer to a location in the address space of a specific process. |
|
940 However, the pointer is quite likely to be invalid in the context of a second |
|
941 process and trying to dereference the pointer from the second process may |
|
942 cause a crash. |
|
943 |
|
944 .. function:: RawArray(typecode_or_type, size_or_initializer) |
|
945 |
|
946 Return a ctypes array allocated from shared memory. |
|
947 |
|
948 *typecode_or_type* determines the type of the elements of the returned array: |
|
949 it is either a ctypes type or a one character typecode of the kind used by |
|
950 the :mod:`array` module. If *size_or_initializer* is an integer then it |
|
951 determines the length of the array, and the array will be initially zeroed. |
|
952 Otherwise *size_or_initializer* is a sequence which is used to initialize the |
|
953 array and whose length determines the length of the array. |
|
954 |
|
955 Note that setting and getting an element is potentially non-atomic -- use |
|
956 :func:`Array` instead to make sure that access is automatically synchronized |
|
957 using a lock. |
|
958 |
|
959 .. function:: RawValue(typecode_or_type, *args) |
|
960 |
|
961 Return a ctypes object allocated from shared memory. |
|
962 |
|
963 *typecode_or_type* determines the type of the returned object: it is either a |
|
964 ctypes type or a one character typecode of the kind used by the :mod:`array` |
|
965 module. */*args* is passed on to the constructor for the type. |
|
966 |
|
967 Note that setting and getting the value is potentially non-atomic -- use |
|
968 :func:`Value` instead to make sure that access is automatically synchronized |
|
969 using a lock. |
|
970 |
|
971 Note that an array of :data:`ctypes.c_char` has ``value`` and ``rawvalue`` |
|
972 attributes which allow one to use it to store and retrieve strings -- see |
|
973 documentation for :mod:`ctypes`. |
|
974 |
|
975 .. function:: Array(typecode_or_type, size_or_initializer[, *args[, lock]]) |
|
976 |
|
977 The same as :func:`RawArray` except that depending on the value of *lock* a |
|
978 process-safe synchronization wrapper may be returned instead of a raw ctypes |
|
979 array. |
|
980 |
|
981 If *lock* is ``True`` (the default) then a new lock object is created to |
|
982 synchronize access to the value. If *lock* is a :class:`Lock` or |
|
983 :class:`RLock` object then that will be used to synchronize access to the |
|
984 value. If *lock* is ``False`` then access to the returned object will not be |
|
985 automatically protected by a lock, so it will not necessarily be |
|
986 "process-safe". |
|
987 |
|
988 Note that *lock* is a keyword-only argument. |
|
989 |
|
990 .. function:: Value(typecode_or_type, *args[, lock]) |
|
991 |
|
992 The same as :func:`RawValue` except that depending on the value of *lock* a |
|
993 process-safe synchronization wrapper may be returned instead of a raw ctypes |
|
994 object. |
|
995 |
|
996 If *lock* is ``True`` (the default) then a new lock object is created to |
|
997 synchronize access to the value. If *lock* is a :class:`Lock` or |
|
998 :class:`RLock` object then that will be used to synchronize access to the |
|
999 value. If *lock* is ``False`` then access to the returned object will not be |
|
1000 automatically protected by a lock, so it will not necessarily be |
|
1001 "process-safe". |
|
1002 |
|
1003 Note that *lock* is a keyword-only argument. |
|
1004 |
|
1005 .. function:: copy(obj) |
|
1006 |
|
1007 Return a ctypes object allocated from shared memory which is a copy of the |
|
1008 ctypes object *obj*. |
|
1009 |
|
1010 .. function:: synchronized(obj[, lock]) |
|
1011 |
|
1012 Return a process-safe wrapper object for a ctypes object which uses *lock* to |
|
1013 synchronize access. If *lock* is ``None`` (the default) then a |
|
1014 :class:`multiprocessing.RLock` object is created automatically. |
|
1015 |
|
1016 A synchronized wrapper will have two methods in addition to those of the |
|
1017 object it wraps: :meth:`get_obj` returns the wrapped object and |
|
1018 :meth:`get_lock` returns the lock object used for synchronization. |
|
1019 |
|
1020 Note that accessing the ctypes object through the wrapper can be a lot slower |
|
1021 than accessing the raw ctypes object. |
|
1022 |
|
1023 |
|
1024 The table below compares the syntax for creating shared ctypes objects from |
|
1025 shared memory with the normal ctypes syntax. (In the table ``MyStruct`` is some |
|
1026 subclass of :class:`ctypes.Structure`.) |
|
1027 |
|
1028 ==================== ========================== =========================== |
|
1029 ctypes sharedctypes using type sharedctypes using typecode |
|
1030 ==================== ========================== =========================== |
|
1031 c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4) |
|
1032 MyStruct(4, 6) RawValue(MyStruct, 4, 6) |
|
1033 (c_short * 7)() RawArray(c_short, 7) RawArray('h', 7) |
|
1034 (c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray('i', (9, 2, 8)) |
|
1035 ==================== ========================== =========================== |
|
1036 |
|
1037 |
|
1038 Below is an example where a number of ctypes objects are modified by a child |
|
1039 process:: |
|
1040 |
|
1041 from multiprocessing import Process, Lock |
|
1042 from multiprocessing.sharedctypes import Value, Array |
|
1043 from ctypes import Structure, c_double |
|
1044 |
|
1045 class Point(Structure): |
|
1046 _fields_ = [('x', c_double), ('y', c_double)] |
|
1047 |
|
1048 def modify(n, x, s, A): |
|
1049 n.value **= 2 |
|
1050 x.value **= 2 |
|
1051 s.value = s.value.upper() |
|
1052 for a in A: |
|
1053 a.x **= 2 |
|
1054 a.y **= 2 |
|
1055 |
|
1056 if __name__ == '__main__': |
|
1057 lock = Lock() |
|
1058 |
|
1059 n = Value('i', 7) |
|
1060 x = Value(ctypes.c_double, 1.0/3.0, lock=False) |
|
1061 s = Array('c', 'hello world', lock=lock) |
|
1062 A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock) |
|
1063 |
|
1064 p = Process(target=modify, args=(n, x, s, A)) |
|
1065 p.start() |
|
1066 p.join() |
|
1067 |
|
1068 print n.value |
|
1069 print x.value |
|
1070 print s.value |
|
1071 print [(a.x, a.y) for a in A] |
|
1072 |
|
1073 |
|
1074 .. highlightlang:: none |
|
1075 |
|
1076 The results printed are :: |
|
1077 |
|
1078 49 |
|
1079 0.1111111111111111 |
|
1080 HELLO WORLD |
|
1081 [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)] |
|
1082 |
|
1083 .. highlightlang:: python |
|
1084 |
|
1085 |
|
1086 .. _multiprocessing-managers: |
|
1087 |
|
1088 Managers |
|
1089 ~~~~~~~~ |
|
1090 |
|
1091 Managers provide a way to create data which can be shared between different |
|
1092 processes. A manager object controls a server process which manages *shared |
|
1093 objects*. Other processes can access the shared objects by using proxies. |
|
1094 |
|
1095 .. function:: multiprocessing.Manager() |
|
1096 |
|
1097 Returns a started :class:`~multiprocessing.managers.SyncManager` object which |
|
1098 can be used for sharing objects between processes. The returned manager |
|
1099 object corresponds to a spawned child process and has methods which will |
|
1100 create shared objects and return corresponding proxies. |
|
1101 |
|
1102 .. module:: multiprocessing.managers |
|
1103 :synopsis: Share data between process with shared objects. |
|
1104 |
|
1105 Manager processes will be shutdown as soon as they are garbage collected or |
|
1106 their parent process exits. The manager classes are defined in the |
|
1107 :mod:`multiprocessing.managers` module: |
|
1108 |
|
1109 .. class:: BaseManager([address[, authkey]]) |
|
1110 |
|
1111 Create a BaseManager object. |
|
1112 |
|
1113 Once created one should call :meth:`start` or :meth:`serve_forever` to ensure |
|
1114 that the manager object refers to a started manager process. |
|
1115 |
|
1116 *address* is the address on which the manager process listens for new |
|
1117 connections. If *address* is ``None`` then an arbitrary one is chosen. |
|
1118 |
|
1119 *authkey* is the authentication key which will be used to check the validity |
|
1120 of incoming connections to the server process. If *authkey* is ``None`` then |
|
1121 ``current_process().authkey``. Otherwise *authkey* is used and it |
|
1122 must be a string. |
|
1123 |
|
1124 .. method:: start() |
|
1125 |
|
1126 Start a subprocess to start the manager. |
|
1127 |
|
1128 .. method:: serve_forever() |
|
1129 |
|
1130 Run the server in the current process. |
|
1131 |
|
1132 .. method:: from_address(address, authkey) |
|
1133 |
|
1134 A class method which creates a manager object referring to a pre-existing |
|
1135 server process which is using the given address and authentication key. |
|
1136 |
|
1137 .. method:: get_server() |
|
1138 |
|
1139 Returns a :class:`Server` object which represents the actual server under |
|
1140 the control of the Manager. The :class:`Server` object supports the |
|
1141 :meth:`serve_forever` method:: |
|
1142 |
|
1143 >>> from multiprocessing.managers import BaseManager |
|
1144 >>> m = BaseManager(address=('', 50000), authkey='abc')) |
|
1145 >>> server = m.get_server() |
|
1146 >>> s.serve_forever() |
|
1147 |
|
1148 :class:`Server` additionally have an :attr:`address` attribute. |
|
1149 |
|
1150 .. method:: connect() |
|
1151 |
|
1152 Connect a local manager object to a remote manager process:: |
|
1153 |
|
1154 >>> from multiprocessing.managers import BaseManager |
|
1155 >>> m = BaseManager(address='127.0.0.1', authkey='abc)) |
|
1156 >>> m.connect() |
|
1157 |
|
1158 .. method:: shutdown() |
|
1159 |
|
1160 Stop the process used by the manager. This is only available if |
|
1161 :meth:`start` has been used to start the server process. |
|
1162 |
|
1163 This can be called multiple times. |
|
1164 |
|
1165 .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]]) |
|
1166 |
|
1167 A classmethod which can be used for registering a type or callable with |
|
1168 the manager class. |
|
1169 |
|
1170 *typeid* is a "type identifier" which is used to identify a particular |
|
1171 type of shared object. This must be a string. |
|
1172 |
|
1173 *callable* is a callable used for creating objects for this type |
|
1174 identifier. If a manager instance will be created using the |
|
1175 :meth:`from_address` classmethod or if the *create_method* argument is |
|
1176 ``False`` then this can be left as ``None``. |
|
1177 |
|
1178 *proxytype* is a subclass of :class:`BaseProxy` which is used to create |
|
1179 proxies for shared objects with this *typeid*. If ``None`` then a proxy |
|
1180 class is created automatically. |
|
1181 |
|
1182 *exposed* is used to specify a sequence of method names which proxies for |
|
1183 this typeid should be allowed to access using |
|
1184 :meth:`BaseProxy._callMethod`. (If *exposed* is ``None`` then |
|
1185 :attr:`proxytype._exposed_` is used instead if it exists.) In the case |
|
1186 where no exposed list is specified, all "public methods" of the shared |
|
1187 object will be accessible. (Here a "public method" means any attribute |
|
1188 which has a :meth:`__call__` method and whose name does not begin with |
|
1189 ``'_'``.) |
|
1190 |
|
1191 *method_to_typeid* is a mapping used to specify the return type of those |
|
1192 exposed methods which should return a proxy. It maps method names to |
|
1193 typeid strings. (If *method_to_typeid* is ``None`` then |
|
1194 :attr:`proxytype._method_to_typeid_` is used instead if it exists.) If a |
|
1195 method's name is not a key of this mapping or if the mapping is ``None`` |
|
1196 then the object returned by the method will be copied by value. |
|
1197 |
|
1198 *create_method* determines whether a method should be created with name |
|
1199 *typeid* which can be used to tell the server process to create a new |
|
1200 shared object and return a proxy for it. By default it is ``True``. |
|
1201 |
|
1202 :class:`BaseManager` instances also have one read-only property: |
|
1203 |
|
1204 .. attribute:: address |
|
1205 |
|
1206 The address used by the manager. |
|
1207 |
|
1208 |
|
1209 .. class:: SyncManager |
|
1210 |
|
1211 A subclass of :class:`BaseManager` which can be used for the synchronization |
|
1212 of processes. Objects of this type are returned by |
|
1213 :func:`multiprocessing.Manager`. |
|
1214 |
|
1215 It also supports creation of shared lists and dictionaries. |
|
1216 |
|
1217 .. method:: BoundedSemaphore([value]) |
|
1218 |
|
1219 Create a shared :class:`threading.BoundedSemaphore` object and return a |
|
1220 proxy for it. |
|
1221 |
|
1222 .. method:: Condition([lock]) |
|
1223 |
|
1224 Create a shared :class:`threading.Condition` object and return a proxy for |
|
1225 it. |
|
1226 |
|
1227 If *lock* is supplied then it should be a proxy for a |
|
1228 :class:`threading.Lock` or :class:`threading.RLock` object. |
|
1229 |
|
1230 .. method:: Event() |
|
1231 |
|
1232 Create a shared :class:`threading.Event` object and return a proxy for it. |
|
1233 |
|
1234 .. method:: Lock() |
|
1235 |
|
1236 Create a shared :class:`threading.Lock` object and return a proxy for it. |
|
1237 |
|
1238 .. method:: Namespace() |
|
1239 |
|
1240 Create a shared :class:`Namespace` object and return a proxy for it. |
|
1241 |
|
1242 .. method:: Queue([maxsize]) |
|
1243 |
|
1244 Create a shared :class:`Queue.Queue` object and return a proxy for it. |
|
1245 |
|
1246 .. method:: RLock() |
|
1247 |
|
1248 Create a shared :class:`threading.RLock` object and return a proxy for it. |
|
1249 |
|
1250 .. method:: Semaphore([value]) |
|
1251 |
|
1252 Create a shared :class:`threading.Semaphore` object and return a proxy for |
|
1253 it. |
|
1254 |
|
1255 .. method:: Array(typecode, sequence) |
|
1256 |
|
1257 Create an array and return a proxy for it. |
|
1258 |
|
1259 .. method:: Value(typecode, value) |
|
1260 |
|
1261 Create an object with a writable ``value`` attribute and return a proxy |
|
1262 for it. |
|
1263 |
|
1264 .. method:: dict() |
|
1265 dict(mapping) |
|
1266 dict(sequence) |
|
1267 |
|
1268 Create a shared ``dict`` object and return a proxy for it. |
|
1269 |
|
1270 .. method:: list() |
|
1271 list(sequence) |
|
1272 |
|
1273 Create a shared ``list`` object and return a proxy for it. |
|
1274 |
|
1275 |
|
1276 Namespace objects |
|
1277 >>>>>>>>>>>>>>>>> |
|
1278 |
|
1279 A namespace object has no public methods, but does have writable attributes. |
|
1280 Its representation shows the values of its attributes. |
|
1281 |
|
1282 However, when using a proxy for a namespace object, an attribute beginning with |
|
1283 ``'_'`` will be an attribute of the proxy and not an attribute of the referent:: |
|
1284 |
|
1285 >>> manager = multiprocessing.Manager() |
|
1286 >>> Global = manager.Namespace() |
|
1287 >>> Global.x = 10 |
|
1288 >>> Global.y = 'hello' |
|
1289 >>> Global._z = 12.3 # this is an attribute of the proxy |
|
1290 >>> print Global |
|
1291 Namespace(x=10, y='hello') |
|
1292 |
|
1293 |
|
1294 Customized managers |
|
1295 >>>>>>>>>>>>>>>>>>> |
|
1296 |
|
1297 To create one's own manager, one creates a subclass of :class:`BaseManager` and |
|
1298 use the :meth:`~BaseManager.resgister` classmethod to register new types or |
|
1299 callables with the manager class. For example:: |
|
1300 |
|
1301 from multiprocessing.managers import BaseManager |
|
1302 |
|
1303 class MathsClass(object): |
|
1304 def add(self, x, y): |
|
1305 return x + y |
|
1306 def mul(self, x, y): |
|
1307 return x * y |
|
1308 |
|
1309 class MyManager(BaseManager): |
|
1310 pass |
|
1311 |
|
1312 MyManager.register('Maths', MathsClass) |
|
1313 |
|
1314 if __name__ == '__main__': |
|
1315 manager = MyManager() |
|
1316 manager.start() |
|
1317 maths = manager.Maths() |
|
1318 print maths.add(4, 3) # prints 7 |
|
1319 print maths.mul(7, 8) # prints 56 |
|
1320 |
|
1321 |
|
1322 Using a remote manager |
|
1323 >>>>>>>>>>>>>>>>>>>>>> |
|
1324 |
|
1325 It is possible to run a manager server on one machine and have clients use it |
|
1326 from other machines (assuming that the firewalls involved allow it). |
|
1327 |
|
1328 Running the following commands creates a server for a single shared queue which |
|
1329 remote clients can access:: |
|
1330 |
|
1331 >>> from multiprocessing.managers import BaseManager |
|
1332 >>> import Queue |
|
1333 >>> queue = Queue.Queue() |
|
1334 >>> class QueueManager(BaseManager): pass |
|
1335 ... |
|
1336 >>> QueueManager.register('get_queue', callable=lambda:queue) |
|
1337 >>> m = QueueManager(address=('', 50000), authkey='abracadabra') |
|
1338 >>> s = m.get_server() |
|
1339 >>> s.serveForever() |
|
1340 |
|
1341 One client can access the server as follows:: |
|
1342 |
|
1343 >>> from multiprocessing.managers import BaseManager |
|
1344 >>> class QueueManager(BaseManager): pass |
|
1345 ... |
|
1346 >>> QueueManager.register('get_queue') |
|
1347 >>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra') |
|
1348 >>> m.connect() |
|
1349 >>> queue = m.get_queue() |
|
1350 >>> queue.put('hello') |
|
1351 |
|
1352 Another client can also use it:: |
|
1353 |
|
1354 >>> from multiprocessing.managers import BaseManager |
|
1355 >>> class QueueManager(BaseManager): pass |
|
1356 ... |
|
1357 >>> QueueManager.register('getQueue') |
|
1358 >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra') |
|
1359 >>> queue = m.getQueue() |
|
1360 >>> queue.get() |
|
1361 'hello' |
|
1362 |
|
1363 Local processes can also access that queue, using the code from above on the |
|
1364 client to access it remotely:: |
|
1365 |
|
1366 >>> from multiprocessing import Process, Queue |
|
1367 >>> from multiprocessing.managers import BaseManager |
|
1368 >>> class Worker(Process): |
|
1369 ... def __init__(self, q): |
|
1370 ... self.q = q |
|
1371 ... super(Worker, self).__init__() |
|
1372 ... def run(self): |
|
1373 ... self.q.put('local hello') |
|
1374 ... |
|
1375 >>> queue = Queue() |
|
1376 >>> w = Worker(queue) |
|
1377 >>> w.start() |
|
1378 >>> class QueueManager(BaseManager): pass |
|
1379 ... |
|
1380 >>> QueueManager.register('get_queue', callable=lambda: queue) |
|
1381 >>> m = QueueManager(address=('', 50000), authkey='abracadabra') |
|
1382 >>> s = m.get_server() |
|
1383 >>> s.serve_forever() |
|
1384 |
|
1385 Proxy Objects |
|
1386 ~~~~~~~~~~~~~ |
|
1387 |
|
1388 A proxy is an object which *refers* to a shared object which lives (presumably) |
|
1389 in a different process. The shared object is said to be the *referent* of the |
|
1390 proxy. Multiple proxy objects may have the same referent. |
|
1391 |
|
1392 A proxy object has methods which invoke corresponding methods of its referent |
|
1393 (although not every method of the referent will necessarily be available through |
|
1394 the proxy). A proxy can usually be used in most of the same ways that its |
|
1395 referent can:: |
|
1396 |
|
1397 >>> from multiprocessing import Manager |
|
1398 >>> manager = Manager() |
|
1399 >>> l = manager.list([i*i for i in range(10)]) |
|
1400 >>> print l |
|
1401 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] |
|
1402 >>> print repr(l) |
|
1403 <ListProxy object, typeid 'list' at 0xb799974c> |
|
1404 >>> l[4] |
|
1405 16 |
|
1406 >>> l[2:5] |
|
1407 [4, 9, 16] |
|
1408 |
|
1409 Notice that applying :func:`str` to a proxy will return the representation of |
|
1410 the referent, whereas applying :func:`repr` will return the representation of |
|
1411 the proxy. |
|
1412 |
|
1413 An important feature of proxy objects is that they are picklable so they can be |
|
1414 passed between processes. Note, however, that if a proxy is sent to the |
|
1415 corresponding manager's process then unpickling it will produce the referent |
|
1416 itself. This means, for example, that one shared object can contain a second:: |
|
1417 |
|
1418 >>> a = manager.list() |
|
1419 >>> b = manager.list() |
|
1420 >>> a.append(b) # referent of a now contains referent of b |
|
1421 >>> print a, b |
|
1422 [[]] [] |
|
1423 >>> b.append('hello') |
|
1424 >>> print a, b |
|
1425 [['hello']] ['hello'] |
|
1426 |
|
1427 .. note:: |
|
1428 |
|
1429 The proxy types in :mod:`multiprocessing` do nothing to support comparisons |
|
1430 by value. So, for instance, :: |
|
1431 |
|
1432 manager.list([1,2,3]) == [1,2,3] |
|
1433 |
|
1434 will return ``False``. One should just use a copy of the referent instead |
|
1435 when making comparisons. |
|
1436 |
|
1437 .. class:: BaseProxy |
|
1438 |
|
1439 Proxy objects are instances of subclasses of :class:`BaseProxy`. |
|
1440 |
|
1441 .. method:: _call_method(methodname[, args[, kwds]]) |
|
1442 |
|
1443 Call and return the result of a method of the proxy's referent. |
|
1444 |
|
1445 If ``proxy`` is a proxy whose referent is ``obj`` then the expression :: |
|
1446 |
|
1447 proxy._call_method(methodname, args, kwds) |
|
1448 |
|
1449 will evaluate the expression :: |
|
1450 |
|
1451 getattr(obj, methodname)(*args, **kwds) |
|
1452 |
|
1453 in the manager's process. |
|
1454 |
|
1455 The returned value will be a copy of the result of the call or a proxy to |
|
1456 a new shared object -- see documentation for the *method_to_typeid* |
|
1457 argument of :meth:`BaseManager.register`. |
|
1458 |
|
1459 If an exception is raised by the call, then then is re-raised by |
|
1460 :meth:`_call_method`. If some other exception is raised in the manager's |
|
1461 process then this is converted into a :exc:`RemoteError` exception and is |
|
1462 raised by :meth:`_call_method`. |
|
1463 |
|
1464 Note in particular that an exception will be raised if *methodname* has |
|
1465 not been *exposed* |
|
1466 |
|
1467 An example of the usage of :meth:`_call_method`:: |
|
1468 |
|
1469 >>> l = manager.list(range(10)) |
|
1470 >>> l._call_method('__len__') |
|
1471 10 |
|
1472 >>> l._call_method('__getslice__', (2, 7)) # equiv to `l[2:7]` |
|
1473 [2, 3, 4, 5, 6] |
|
1474 >>> l._call_method('__getitem__', (20,)) # equiv to `l[20]` |
|
1475 Traceback (most recent call last): |
|
1476 ... |
|
1477 IndexError: list index out of range |
|
1478 |
|
1479 .. method:: _get_value() |
|
1480 |
|
1481 Return a copy of the referent. |
|
1482 |
|
1483 If the referent is unpicklable then this will raise an exception. |
|
1484 |
|
1485 .. method:: __repr__ |
|
1486 |
|
1487 Return a representation of the proxy object. |
|
1488 |
|
1489 .. method:: __str__ |
|
1490 |
|
1491 Return the representation of the referent. |
|
1492 |
|
1493 |
|
1494 Cleanup |
|
1495 >>>>>>> |
|
1496 |
|
1497 A proxy object uses a weakref callback so that when it gets garbage collected it |
|
1498 deregisters itself from the manager which owns its referent. |
|
1499 |
|
1500 A shared object gets deleted from the manager process when there are no longer |
|
1501 any proxies referring to it. |
|
1502 |
|
1503 |
|
1504 Process Pools |
|
1505 ~~~~~~~~~~~~~ |
|
1506 |
|
1507 .. module:: multiprocessing.pool |
|
1508 :synopsis: Create pools of processes. |
|
1509 |
|
1510 One can create a pool of processes which will carry out tasks submitted to it |
|
1511 with the :class:`Pool` class. |
|
1512 |
|
1513 .. class:: multiprocessing.Pool([processes[, initializer[, initargs]]]) |
|
1514 |
|
1515 A process pool object which controls a pool of worker processes to which jobs |
|
1516 can be submitted. It supports asynchronous results with timeouts and |
|
1517 callbacks and has a parallel map implementation. |
|
1518 |
|
1519 *processes* is the number of worker processes to use. If *processes* is |
|
1520 ``None`` then the number returned by :func:`cpu_count` is used. If |
|
1521 *initializer* is not ``None`` then each worker process will call |
|
1522 ``initializer(*initargs)`` when it starts. |
|
1523 |
|
1524 .. method:: apply(func[, args[, kwds]]) |
|
1525 |
|
1526 Equivalent of the :func:`apply` builtin function. It blocks till the |
|
1527 result is ready. |
|
1528 |
|
1529 .. method:: apply_async(func[, args[, kwds[, callback]]]) |
|
1530 |
|
1531 A variant of the :meth:`apply` method which returns a result object. |
|
1532 |
|
1533 If *callback* is specified then it should be a callable which accepts a |
|
1534 single argument. When the result becomes ready *callback* is applied to |
|
1535 it (unless the call failed). *callback* should complete immediately since |
|
1536 otherwise the thread which handles the results will get blocked. |
|
1537 |
|
1538 .. method:: map(func, iterable[, chunksize]) |
|
1539 |
|
1540 A parallel equivalent of the :func:`map` builtin function. It blocks till |
|
1541 the result is ready. |
|
1542 |
|
1543 This method chops the iterable into a number of chunks which it submits to |
|
1544 the process pool as separate tasks. The (approximate) size of these |
|
1545 chunks can be specified by setting *chunksize* to a positive integer. |
|
1546 |
|
1547 .. method:: map_async(func, iterable[, chunksize[, callback]]) |
|
1548 |
|
1549 A variant of the :meth:`map` method which returns a result object. |
|
1550 |
|
1551 If *callback* is specified then it should be a callable which accepts a |
|
1552 single argument. When the result becomes ready *callback* is applied to |
|
1553 it (unless the call failed). *callback* should complete immediately since |
|
1554 otherwise the thread which handles the results will get blocked. |
|
1555 |
|
1556 .. method:: imap(func, iterable[, chunksize]) |
|
1557 |
|
1558 An equivalent of :func:`itertools.imap`. |
|
1559 |
|
1560 The *chunksize* argument is the same as the one used by the :meth:`.map` |
|
1561 method. For very long iterables using a large value for *chunksize* can |
|
1562 make make the job complete **much** faster than using the default value of |
|
1563 ``1``. |
|
1564 |
|
1565 Also if *chunksize* is ``1`` then the :meth:`next` method of the iterator |
|
1566 returned by the :meth:`imap` method has an optional *timeout* parameter: |
|
1567 ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the |
|
1568 result cannot be returned within *timeout* seconds. |
|
1569 |
|
1570 .. method:: imap_unordered(func, iterable[, chunksize]) |
|
1571 |
|
1572 The same as :meth:`imap` except that the ordering of the results from the |
|
1573 returned iterator should be considered arbitrary. (Only when there is |
|
1574 only one worker process is the order guaranteed to be "correct".) |
|
1575 |
|
1576 .. method:: close() |
|
1577 |
|
1578 Prevents any more tasks from being submitted to the pool. Once all the |
|
1579 tasks have been completed the worker processes will exit. |
|
1580 |
|
1581 .. method:: terminate() |
|
1582 |
|
1583 Stops the worker processes immediately without completing outstanding |
|
1584 work. When the pool object is garbage collected :meth:`terminate` will be |
|
1585 called immediately. |
|
1586 |
|
1587 .. method:: join() |
|
1588 |
|
1589 Wait for the worker processes to exit. One must call :meth:`close` or |
|
1590 :meth:`terminate` before using :meth:`join`. |
|
1591 |
|
1592 |
|
1593 .. class:: AsyncResult |
|
1594 |
|
1595 The class of the result returned by :meth:`Pool.apply_async` and |
|
1596 :meth:`Pool.map_async`. |
|
1597 |
|
1598 .. method:: get([timeout]) |
|
1599 |
|
1600 Return the result when it arrives. If *timeout* is not ``None`` and the |
|
1601 result does not arrive within *timeout* seconds then |
|
1602 :exc:`multiprocessing.TimeoutError` is raised. If the remote call raised |
|
1603 an exception then that exception will be reraised by :meth:`get`. |
|
1604 |
|
1605 .. method:: wait([timeout]) |
|
1606 |
|
1607 Wait until the result is available or until *timeout* seconds pass. |
|
1608 |
|
1609 .. method:: ready() |
|
1610 |
|
1611 Return whether the call has completed. |
|
1612 |
|
1613 .. method:: successful() |
|
1614 |
|
1615 Return whether the call completed without raising an exception. Will |
|
1616 raise :exc:`AssertionError` if the result is not ready. |
|
1617 |
|
1618 The following example demonstrates the use of a pool:: |
|
1619 |
|
1620 from multiprocessing import Pool |
|
1621 |
|
1622 def f(x): |
|
1623 return x*x |
|
1624 |
|
1625 if __name__ == '__main__': |
|
1626 pool = Pool(processes=4) # start 4 worker processes |
|
1627 |
|
1628 result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously |
|
1629 print result.get(timeout=1) # prints "100" unless your computer is *very* slow |
|
1630 |
|
1631 print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" |
|
1632 |
|
1633 it = pool.imap(f, range(10)) |
|
1634 print it.next() # prints "0" |
|
1635 print it.next() # prints "1" |
|
1636 print it.next(timeout=1) # prints "4" unless your computer is *very* slow |
|
1637 |
|
1638 import time |
|
1639 result = pool.apply_async(time.sleep, (10,)) |
|
1640 print result.get(timeout=1) # raises TimeoutError |
|
1641 |
|
1642 |
|
1643 .. _multiprocessing-listeners-clients: |
|
1644 |
|
1645 Listeners and Clients |
|
1646 ~~~~~~~~~~~~~~~~~~~~~ |
|
1647 |
|
1648 .. module:: multiprocessing.connection |
|
1649 :synopsis: API for dealing with sockets. |
|
1650 |
|
1651 Usually message passing between processes is done using queues or by using |
|
1652 :class:`Connection` objects returned by :func:`Pipe`. |
|
1653 |
|
1654 However, the :mod:`multiprocessing.connection` module allows some extra |
|
1655 flexibility. It basically gives a high level message oriented API for dealing |
|
1656 with sockets or Windows named pipes, and also has support for *digest |
|
1657 authentication* using the :mod:`hmac` module. |
|
1658 |
|
1659 |
|
1660 .. function:: deliver_challenge(connection, authkey) |
|
1661 |
|
1662 Send a randomly generated message to the other end of the connection and wait |
|
1663 for a reply. |
|
1664 |
|
1665 If the reply matches the digest of the message using *authkey* as the key |
|
1666 then a welcome message is sent to the other end of the connection. Otherwise |
|
1667 :exc:`AuthenticationError` is raised. |
|
1668 |
|
1669 .. function:: answerChallenge(connection, authkey) |
|
1670 |
|
1671 Receive a message, calculate the digest of the message using *authkey* as the |
|
1672 key, and then send the digest back. |
|
1673 |
|
1674 If a welcome message is not received, then :exc:`AuthenticationError` is |
|
1675 raised. |
|
1676 |
|
1677 .. function:: Client(address[, family[, authenticate[, authkey]]]) |
|
1678 |
|
1679 Attempt to set up a connection to the listener which is using address |
|
1680 *address*, returning a :class:`~multiprocessing.Connection`. |
|
1681 |
|
1682 The type of the connection is determined by *family* argument, but this can |
|
1683 generally be omitted since it can usually be inferred from the format of |
|
1684 *address*. (See :ref:`multiprocessing-address-formats`) |
|
1685 |
|
1686 If *authentication* is ``True`` or *authkey* is a string then digest |
|
1687 authentication is used. The key used for authentication will be either |
|
1688 *authkey* or ``current_process().authkey)`` if *authkey* is ``None``. |
|
1689 If authentication fails then :exc:`AuthenticationError` is raised. See |
|
1690 :ref:`multiprocessing-auth-keys`. |
|
1691 |
|
1692 .. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]]) |
|
1693 |
|
1694 A wrapper for a bound socket or Windows named pipe which is 'listening' for |
|
1695 connections. |
|
1696 |
|
1697 *address* is the address to be used by the bound socket or named pipe of the |
|
1698 listener object. |
|
1699 |
|
1700 *family* is the type of socket (or named pipe) to use. This can be one of |
|
1701 the strings ``'AF_INET'`` (for a TCP socket), ``'AF_UNIX'`` (for a Unix |
|
1702 domain socket) or ``'AF_PIPE'`` (for a Windows named pipe). Of these only |
|
1703 the first is guaranteed to be available. If *family* is ``None`` then the |
|
1704 family is inferred from the format of *address*. If *address* is also |
|
1705 ``None`` then a default is chosen. This default is the family which is |
|
1706 assumed to be the fastest available. See |
|
1707 :ref:`multiprocessing-address-formats`. Note that if *family* is |
|
1708 ``'AF_UNIX'`` and address is ``None`` then the socket will be created in a |
|
1709 private temporary directory created using :func:`tempfile.mkstemp`. |
|
1710 |
|
1711 If the listener object uses a socket then *backlog* (1 by default) is passed |
|
1712 to the :meth:`listen` method of the socket once it has been bound. |
|
1713 |
|
1714 If *authenticate* is ``True`` (``False`` by default) or *authkey* is not |
|
1715 ``None`` then digest authentication is used. |
|
1716 |
|
1717 If *authkey* is a string then it will be used as the authentication key; |
|
1718 otherwise it must be *None*. |
|
1719 |
|
1720 If *authkey* is ``None`` and *authenticate* is ``True`` then |
|
1721 ``current_process().authkey`` is used as the authentication key. If |
|
1722 *authkey* is ``None`` and *authentication* is ``False`` then no |
|
1723 authentication is done. If authentication fails then |
|
1724 :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`. |
|
1725 |
|
1726 .. method:: accept() |
|
1727 |
|
1728 Accept a connection on the bound socket or named pipe of the listener |
|
1729 object and return a :class:`Connection` object. If authentication is |
|
1730 attempted and fails, then :exc:`AuthenticationError` is raised. |
|
1731 |
|
1732 .. method:: close() |
|
1733 |
|
1734 Close the bound socket or named pipe of the listener object. This is |
|
1735 called automatically when the listener is garbage collected. However it |
|
1736 is advisable to call it explicitly. |
|
1737 |
|
1738 Listener objects have the following read-only properties: |
|
1739 |
|
1740 .. attribute:: address |
|
1741 |
|
1742 The address which is being used by the Listener object. |
|
1743 |
|
1744 .. attribute:: last_accepted |
|
1745 |
|
1746 The address from which the last accepted connection came. If this is |
|
1747 unavailable then it is ``None``. |
|
1748 |
|
1749 |
|
1750 The module defines two exceptions: |
|
1751 |
|
1752 .. exception:: AuthenticationError |
|
1753 |
|
1754 Exception raised when there is an authentication error. |
|
1755 |
|
1756 |
|
1757 **Examples** |
|
1758 |
|
1759 The following server code creates a listener which uses ``'secret password'`` as |
|
1760 an authentication key. It then waits for a connection and sends some data to |
|
1761 the client:: |
|
1762 |
|
1763 from multiprocessing.connection import Listener |
|
1764 from array import array |
|
1765 |
|
1766 address = ('localhost', 6000) # family is deduced to be 'AF_INET' |
|
1767 listener = Listener(address, authkey='secret password') |
|
1768 |
|
1769 conn = listener.accept() |
|
1770 print 'connection accepted from', listener.last_accepted |
|
1771 |
|
1772 conn.send([2.25, None, 'junk', float]) |
|
1773 |
|
1774 conn.send_bytes('hello') |
|
1775 |
|
1776 conn.send_bytes(array('i', [42, 1729])) |
|
1777 |
|
1778 conn.close() |
|
1779 listener.close() |
|
1780 |
|
1781 The following code connects to the server and receives some data from the |
|
1782 server:: |
|
1783 |
|
1784 from multiprocessing.connection import Client |
|
1785 from array import array |
|
1786 |
|
1787 address = ('localhost', 6000) |
|
1788 conn = Client(address, authkey='secret password') |
|
1789 |
|
1790 print conn.recv() # => [2.25, None, 'junk', float] |
|
1791 |
|
1792 print conn.recv_bytes() # => 'hello' |
|
1793 |
|
1794 arr = array('i', [0, 0, 0, 0, 0]) |
|
1795 print conn.recv_bytes_into(arr) # => 8 |
|
1796 print arr # => array('i', [42, 1729, 0, 0, 0]) |
|
1797 |
|
1798 conn.close() |
|
1799 |
|
1800 |
|
1801 .. _multiprocessing-address-formats: |
|
1802 |
|
1803 Address Formats |
|
1804 >>>>>>>>>>>>>>> |
|
1805 |
|
1806 * An ``'AF_INET'`` address is a tuple of the form ``(hostname, port)`` where |
|
1807 *hostname* is a string and *port* is an integer. |
|
1808 |
|
1809 * An ``'AF_UNIX'`` address is a string representing a filename on the |
|
1810 filesystem. |
|
1811 |
|
1812 * An ``'AF_PIPE'`` address is a string of the form |
|
1813 ``r'\\\\.\\pipe\\PipeName'``. To use :func:`Client` to connect to a named |
|
1814 pipe on a remote computer called ServerName* one should use an address of the |
|
1815 form ``r'\\\\ServerName\\pipe\\PipeName'`` instead. |
|
1816 |
|
1817 Note that any string beginning with two backslashes is assumed by default to be |
|
1818 an ``'AF_PIPE'`` address rather than an ``'AF_UNIX'`` address. |
|
1819 |
|
1820 |
|
1821 .. _multiprocessing-auth-keys: |
|
1822 |
|
1823 Authentication keys |
|
1824 ~~~~~~~~~~~~~~~~~~~ |
|
1825 |
|
1826 When one uses :meth:`Connection.recv`, the data received is automatically |
|
1827 unpickled. Unfortunately unpickling data from an untrusted source is a security |
|
1828 risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module |
|
1829 to provide digest authentication. |
|
1830 |
|
1831 An authentication key is a string which can be thought of as a password: once a |
|
1832 connection is established both ends will demand proof that the other knows the |
|
1833 authentication key. (Demonstrating that both ends are using the same key does |
|
1834 **not** involve sending the key over the connection.) |
|
1835 |
|
1836 If authentication is requested but do authentication key is specified then the |
|
1837 return value of ``current_process().authkey`` is used (see |
|
1838 :class:`~multiprocessing.Process`). This value will automatically inherited by |
|
1839 any :class:`~multiprocessing.Process` object that the current process creates. |
|
1840 This means that (by default) all processes of a multi-process program will share |
|
1841 a single authentication key which can be used when setting up connections |
|
1842 between the themselves. |
|
1843 |
|
1844 Suitable authentication keys can also be generated by using :func:`os.urandom`. |
|
1845 |
|
1846 |
|
1847 Logging |
|
1848 ~~~~~~~ |
|
1849 |
|
1850 Some support for logging is available. Note, however, that the :mod:`logging` |
|
1851 package does not use process shared locks so it is possible (depending on the |
|
1852 handler type) for messages from different processes to get mixed up. |
|
1853 |
|
1854 .. currentmodule:: multiprocessing |
|
1855 .. function:: get_logger() |
|
1856 |
|
1857 Returns the logger used by :mod:`multiprocessing`. If necessary, a new one |
|
1858 will be created. |
|
1859 |
|
1860 When first created the logger has level :data:`logging.NOTSET` and has a |
|
1861 handler which sends output to :data:`sys.stderr` using format |
|
1862 ``'[%(levelname)s/%(processName)s] %(message)s'``. (The logger allows use of |
|
1863 the non-standard ``'%(processName)s'`` format.) Message sent to this logger |
|
1864 will not by default propagate to the root logger. |
|
1865 |
|
1866 Note that on Windows child processes will only inherit the level of the |
|
1867 parent process's logger -- any other customization of the logger will not be |
|
1868 inherited. |
|
1869 |
|
1870 Below is an example session with logging turned on:: |
|
1871 |
|
1872 >>> import multiprocessing, logging |
|
1873 >>> logger = multiprocessing.getLogger() |
|
1874 >>> logger.setLevel(logging.INFO) |
|
1875 >>> logger.warning('doomed') |
|
1876 [WARNING/MainProcess] doomed |
|
1877 >>> m = multiprocessing.Manager() |
|
1878 [INFO/SyncManager-1] child process calling self.run() |
|
1879 [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa' |
|
1880 >>> del m |
|
1881 [INFO/MainProcess] sending shutdown message to manager |
|
1882 [INFO/SyncManager-1] manager exiting with exitcode 0 |
|
1883 |
|
1884 |
|
1885 The :mod:`multiprocessing.dummy` module |
|
1886 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
1887 |
|
1888 .. module:: multiprocessing.dummy |
|
1889 :synopsis: Dumb wrapper around threading. |
|
1890 |
|
1891 :mod:`multiprocessing.dummy` replicates the API of :mod:`multiprocessing` but is |
|
1892 no more than a wrapper around the :mod:`threading` module. |
|
1893 |
|
1894 |
|
1895 .. _multiprocessing-programming: |
|
1896 |
|
1897 Programming guidelines |
|
1898 ---------------------- |
|
1899 |
|
1900 There are certain guidelines and idioms which should be adhered to when using |
|
1901 :mod:`multiprocessing`. |
|
1902 |
|
1903 |
|
1904 All platforms |
|
1905 ~~~~~~~~~~~~~ |
|
1906 |
|
1907 Avoid shared state |
|
1908 |
|
1909 As far as possible one should try to avoid shifting large amounts of data |
|
1910 between processes. |
|
1911 |
|
1912 It is probably best to stick to using queues or pipes for communication |
|
1913 between processes rather than using the lower level synchronization |
|
1914 primitives from the :mod:`threading` module. |
|
1915 |
|
1916 Picklability |
|
1917 |
|
1918 Ensure that the arguments to the methods of proxies are picklable. |
|
1919 |
|
1920 Thread safety of proxies |
|
1921 |
|
1922 Do not use a proxy object from more than one thread unless you protect it |
|
1923 with a lock. |
|
1924 |
|
1925 (There is never a problem with different processes using the *same* proxy.) |
|
1926 |
|
1927 Joining zombie processes |
|
1928 |
|
1929 On Unix when a process finishes but has not been joined it becomes a zombie. |
|
1930 There should never be very many because each time a new process starts (or |
|
1931 :func:`active_children` is called) all completed processes which have not |
|
1932 yet been joined will be joined. Also calling a finished process's |
|
1933 :meth:`Process.is_alive` will join the process. Even so it is probably good |
|
1934 practice to explicitly join all the processes that you start. |
|
1935 |
|
1936 Better to inherit than pickle/unpickle |
|
1937 |
|
1938 On Windows many types from :mod:`multiprocessing` need to be picklable so |
|
1939 that child processes can use them. However, one should generally avoid |
|
1940 sending shared objects to other processes using pipes or queues. Instead |
|
1941 you should arrange the program so that a process which need access to a |
|
1942 shared resource created elsewhere can inherit it from an ancestor process. |
|
1943 |
|
1944 Avoid terminating processes |
|
1945 |
|
1946 Using the :meth:`Process.terminate` method to stop a process is liable to |
|
1947 cause any shared resources (such as locks, semaphores, pipes and queues) |
|
1948 currently being used by the process to become broken or unavailable to other |
|
1949 processes. |
|
1950 |
|
1951 Therefore it is probably best to only consider using |
|
1952 :meth:`Process.terminate` on processes which never use any shared resources. |
|
1953 |
|
1954 Joining processes that use queues |
|
1955 |
|
1956 Bear in mind that a process that has put items in a queue will wait before |
|
1957 terminating until all the buffered items are fed by the "feeder" thread to |
|
1958 the underlying pipe. (The child process can call the |
|
1959 :meth:`Queue.cancel_join_thread` method of the queue to avoid this behaviour.) |
|
1960 |
|
1961 This means that whenever you use a queue you need to make sure that all |
|
1962 items which have been put on the queue will eventually be removed before the |
|
1963 process is joined. Otherwise you cannot be sure that processes which have |
|
1964 put items on the queue will terminate. Remember also that non-daemonic |
|
1965 processes will be automatically be joined. |
|
1966 |
|
1967 An example which will deadlock is the following:: |
|
1968 |
|
1969 from multiprocessing import Process, Queue |
|
1970 |
|
1971 def f(q): |
|
1972 q.put('X' * 1000000) |
|
1973 |
|
1974 if __name__ == '__main__': |
|
1975 queue = Queue() |
|
1976 p = Process(target=f, args=(queue,)) |
|
1977 p.start() |
|
1978 p.join() # this deadlocks |
|
1979 obj = queue.get() |
|
1980 |
|
1981 A fix here would be to swap the last two lines round (or simply remove the |
|
1982 ``p.join()`` line). |
|
1983 |
|
1984 Explicitly pass resources to child processes |
|
1985 |
|
1986 On Unix a child process can make use of a shared resource created in a |
|
1987 parent process using a global resource. However, it is better to pass the |
|
1988 object as an argument to the constructor for the child process. |
|
1989 |
|
1990 Apart from making the code (potentially) compatible with Windows this also |
|
1991 ensures that as long as the child process is still alive the object will not |
|
1992 be garbage collected in the parent process. This might be important if some |
|
1993 resource is freed when the object is garbage collected in the parent |
|
1994 process. |
|
1995 |
|
1996 So for instance :: |
|
1997 |
|
1998 from multiprocessing import Process, Lock |
|
1999 |
|
2000 def f(): |
|
2001 ... do something using "lock" ... |
|
2002 |
|
2003 if __name__ == '__main__': |
|
2004 lock = Lock() |
|
2005 for i in range(10): |
|
2006 Process(target=f).start() |
|
2007 |
|
2008 should be rewritten as :: |
|
2009 |
|
2010 from multiprocessing import Process, Lock |
|
2011 |
|
2012 def f(l): |
|
2013 ... do something using "l" ... |
|
2014 |
|
2015 if __name__ == '__main__': |
|
2016 lock = Lock() |
|
2017 for i in range(10): |
|
2018 Process(target=f, args=(lock,)).start() |
|
2019 |
|
2020 |
|
2021 Windows |
|
2022 ~~~~~~~ |
|
2023 |
|
2024 Since Windows lacks :func:`os.fork` it has a few extra restrictions: |
|
2025 |
|
2026 More picklability |
|
2027 |
|
2028 Ensure that all arguments to :meth:`Process.__init__` are picklable. This |
|
2029 means, in particular, that bound or unbound methods cannot be used directly |
|
2030 as the ``target`` argument on Windows --- just define a function and use |
|
2031 that instead. |
|
2032 |
|
2033 Also, if you subclass :class:`Process` then make sure that instances will be |
|
2034 picklable when the :meth:`Process.start` method is called. |
|
2035 |
|
2036 Global variables |
|
2037 |
|
2038 Bear in mind that if code run in a child process tries to access a global |
|
2039 variable, then the value it sees (if any) may not be the same as the value |
|
2040 in the parent process at the time that :meth:`Process.start` was called. |
|
2041 |
|
2042 However, global variables which are just module level constants cause no |
|
2043 problems. |
|
2044 |
|
2045 Safe importing of main module |
|
2046 |
|
2047 Make sure that the main module can be safely imported by a new Python |
|
2048 interpreter without causing unintended side effects (such a starting a new |
|
2049 process). |
|
2050 |
|
2051 For example, under Windows running the following module would fail with a |
|
2052 :exc:`RuntimeError`:: |
|
2053 |
|
2054 from multiprocessing import Process |
|
2055 |
|
2056 def foo(): |
|
2057 print 'hello' |
|
2058 |
|
2059 p = Process(target=foo) |
|
2060 p.start() |
|
2061 |
|
2062 Instead one should protect the "entry point" of the program by using ``if |
|
2063 __name__ == '__main__':`` as follows:: |
|
2064 |
|
2065 from multiprocessing import Process, freeze_support |
|
2066 |
|
2067 def foo(): |
|
2068 print 'hello' |
|
2069 |
|
2070 if __name__ == '__main__': |
|
2071 freeze_support() |
|
2072 p = Process(target=foo) |
|
2073 p.start() |
|
2074 |
|
2075 (The ``freeze_support()`` line can be omitted if the program will be run |
|
2076 normally instead of frozen.) |
|
2077 |
|
2078 This allows the newly spawned Python interpreter to safely import the module |
|
2079 and then run the module's ``foo()`` function. |
|
2080 |
|
2081 Similar restrictions apply if a pool or manager is created in the main |
|
2082 module. |
|
2083 |
|
2084 |
|
2085 .. _multiprocessing-examples: |
|
2086 |
|
2087 Examples |
|
2088 -------- |
|
2089 |
|
2090 Demonstration of how to create and use customized managers and proxies: |
|
2091 |
|
2092 .. literalinclude:: ../includes/mp_newtype.py |
|
2093 |
|
2094 |
|
2095 Using :class:`Pool`: |
|
2096 |
|
2097 .. literalinclude:: ../includes/mp_pool.py |
|
2098 |
|
2099 |
|
2100 Synchronization types like locks, conditions and queues: |
|
2101 |
|
2102 .. literalinclude:: ../includes/mp_synchronize.py |
|
2103 |
|
2104 |
|
2105 An showing how to use queues to feed tasks to a collection of worker process and |
|
2106 collect the results: |
|
2107 |
|
2108 .. literalinclude:: ../includes/mp_workers.py |
|
2109 |
|
2110 |
|
2111 An example of how a pool of worker processes can each run a |
|
2112 :class:`SimpleHTTPServer.HttpServer` instance while sharing a single listening |
|
2113 socket. |
|
2114 |
|
2115 .. literalinclude:: ../includes/mp_webserver.py |
|
2116 |
|
2117 |
|
2118 Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`: |
|
2119 |
|
2120 .. literalinclude:: ../includes/mp_benchmarks.py |
|
2121 |
|
2122 An example/demo of how to use the :class:`managers.SyncManager`, :class:`Process` |
|
2123 and others to build a system which can distribute processes and work via a |
|
2124 distributed queue to a "cluster" of machines on a network, accessible via SSH. |
|
2125 You will need to have private key authentication for all hosts configured for |
|
2126 this to work. |
|
2127 |
|
2128 .. literalinclude:: ../includes/mp_distributing.py |