Passing Messages to Process
As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated. A simple way to communicate between processes with multiprocessing is to use a Queue to pass messages back and forth. Any object that can be serialized with pickle can pass through a Queue.
The example codes are follow:
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}'.format(proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
for i in range(3):
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan %s' % i))
queue.close()
queue.join_thread()
p.join()
This short example passes just a single message to a single worker, then the main process waits for the workers to finish.
D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan 0
Doing something fancy in Process-3 for Fancy Dan 1
Doing something fancy in Process-2 for Fancy Dan 2
Process finished with exit code 0
A more complex example shows how to manage several workers that are consuming data from a JoinableQueue and passing results back to the parent process. The poison pill technique is used to stop these workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while 1:
next_task = self.task_queue.get()
if next_task is None:
# Posion pill means shutdown
print('{}: Exiting'.format(proc_name))
self.task_queue.task_done()
break
print('{}: {}'.format(proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # Pretend to take time to do the work
return '{} * {} = {}'.format(
self.a, self.b, self.a * self.b
)
def __str__(self):
return '{} * {}'.format(self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [
Consumer(tasks, results)
for i in range(num_consumers)
]
for w in consumers:
w.start()
# Enqueue jobs.
num_jobs = 40
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print('Result:', result)
num_jobs -= 1
Although the jobs enter the queue in order, their execution occurs in parallel. Thus, there is no guarantee about the order in which they will be completed.
D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_producer_consumer.py
Creating 40 consumers
Consumer-2: 0 * 0
Consumer-3: 1 * 1
Consumer-8: 2 * 2
Consumer-13: 3 * 3
Consumer-1: 4 * 4
Consumer-7: 5 * 5
Consumer-11: 6 * 6
Consumer-4: 7 * 7
Consumer-10: 8 * 8
Consumer-17: 9 * 9
Consumer-6: 10 * 10
Consumer-12: 11 * 11
Consumer-9: 12 * 12
Consumer-5: 13 * 13
Consumer-18: 14 * 14
Consumer-22: 15 * 15
Consumer-16: 16 * 16
Consumer-19: 17 * 17
Consumer-26: 18 * 18
Consumer-21: 19 * 19
Consumer-14: 20 * 20
Consumer-15: 21 * 21
Consumer-23: 22 * 22
Consumer-20: 23 * 23
Consumer-31: 24 * 24
Consumer-27: 25 * 25
Consumer-30: 26 * 26
Consumer-25: 27 * 27
Consumer-29: 28 * 28
Consumer-24: 29 * 29
Consumer-33: 30 * 30
Consumer-32: 31 * 31
Consumer-39: 32 * 32
Consumer-40: 33 * 33
Consumer-35: 34 * 34
Consumer-37: 35 * 35
Consumer-34: 36 * 36
Consumer-36: 37 * 37
Consumer-28: 38 * 38
Consumer-38: 39 * 39
Consumer-2: Exiting
Consumer-3: Exiting
Consumer-7: ExitingConsumer-1: Exiting
Consumer-11: Exiting
Consumer-13: Exiting
Consumer-4: Exiting
Consumer-8: Exiting
Consumer-9: Exiting
Consumer-6: Exiting
Consumer-5: Exiting
Consumer-10: Exiting
Consumer-17: Exiting
Consumer-12: Exiting
Consumer-14: Exiting
Consumer-22: Exiting
Consumer-19: Exiting
Consumer-18: Exiting
Consumer-26: Exiting
Consumer-16: Exiting
Consumer-21: Exiting
Consumer-30: Exiting
Consumer-25: Exiting
Consumer-20: Exiting
Consumer-27: Exiting
Consumer-29: Exiting
Consumer-15: Exiting
Consumer-31: Exiting
Consumer-23: Exiting
Consumer-40: Exiting
Consumer-39: Exiting
Consumer-38: Exiting
Consumer-32: Exiting
Consumer-37: Exiting
Consumer-35: Exiting
Consumer-28: Exiting
Consumer-36: Exiting
Consumer-33: Exiting
Consumer-34: Exiting
Consumer-24: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 3 * 3 = 9
Result: 7 * 7 = 49
Result: 2 * 2 = 4
Result: 12 * 12 = 144
Result: 10 * 10 = 100
Result: 13 * 13 = 169
Result: 8 * 8 = 64
Result: 11 * 11 = 121
Result: 9 * 9 = 81
Result: 15 * 15 = 225
Result: 20 * 20 = 400
Result: 17 * 17 = 289
Result: 14 * 14 = 196
Result: 18 * 18 = 324
Result: 19 * 19 = 361
Result: 16 * 16 = 256
Result: 26 * 26 = 676
Result: 27 * 27 = 729
Result: 23 * 23 = 529
Result: 25 * 25 = 625
Result: 28 * 28 = 784
Result: 21 * 21 = 441
Result: 22 * 22 = 484
Result: 24 * 24 = 576
Result: 33 * 33 = 1089
Result: 32 * 32 = 1024
Result: 39 * 39 = 1521
Result: 31 * 31 = 961
Result: 35 * 35 = 1225
Result: 38 * 38 = 1444
Result: 34 * 34 = 1156
Result: 37 * 37 = 1369
Result: 36 * 36 = 1296
Result: 30 * 30 = 900
Result: 29 * 29 = 841
Process finished with exit code 0