Passing Messages to Process

简介: Passing Messages to Process

Passing Messages to Process

As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated. A simple way to communicate between processes with multiprocessing is to use a Queue to pass messages back and forth. Any object that can be serialized with pickle can pass through a Queue.

The example codes are follow:

import multiprocessing

class MyFancyClass:

def __init__(self, name):
    self.name = name


def do_something(self):
    proc_name = multiprocessing.current_process().name

    print('Doing something fancy in {} for {}'.format(proc_name, self.name))
def worker(q):
    obj = q.get()
    obj.do_something()

if __name__ == '__main__':

    queue = multiprocessing.Queue()
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()


        queue.put(MyFancyClass('Fancy Dan %s' % i))

    queue.close()
    queue.join_thread()
    p.join()

This short example passes just a single message to a single worker, then the main process waits for the workers to finish.

D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan 0
Doing something fancy in Process-3 for Fancy Dan 1
Doing something fancy in Process-2 for Fancy Dan 2

Process finished with exit code 0

A more complex example shows how to manage several workers that are consuming data from a JoinableQueue and passing results back to the parent process. The poison pill technique is used to stop these workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join()

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue


    def run(self):
        proc_name = self.name

        while 1:
            next_task = self.task_queue.get()
            if next_task is None:
                # Posion pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)

class Task:

def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # Pretend to take time to do the work
        return '{} * {} = {}'.format(
            self.a, self.b,  self.a * self.b
        )

    def __str__(self):
        return '{} * {}'.format(self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]

    for w in consumers:
        w.start()

    # Enqueue jobs.
    num_jobs = 40
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

Although the jobs enter the queue in order, their execution occurs in parallel. Thus, there is no guarantee about the order in which they will be completed.

D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_producer_consumer.py
Creating 40 consumers
Consumer-2: 0 * 0
Consumer-3: 1 * 1
Consumer-8: 2 * 2
Consumer-13: 3 * 3
Consumer-1: 4 * 4
Consumer-7: 5 * 5
Consumer-11: 6 * 6
Consumer-4: 7 * 7
Consumer-10: 8 * 8
Consumer-17: 9 * 9
Consumer-6: 10 * 10
Consumer-12: 11 * 11
Consumer-9: 12 * 12
Consumer-5: 13 * 13
Consumer-18: 14 * 14
Consumer-22: 15 * 15
Consumer-16: 16 * 16
Consumer-19: 17 * 17
Consumer-26: 18 * 18
Consumer-21: 19 * 19
Consumer-14: 20 * 20
Consumer-15: 21 * 21
Consumer-23: 22 * 22
Consumer-20: 23 * 23
Consumer-31: 24 * 24
Consumer-27: 25 * 25
Consumer-30: 26 * 26
Consumer-25: 27 * 27
Consumer-29: 28 * 28
Consumer-24: 29 * 29
Consumer-33: 30 * 30
Consumer-32: 31 * 31
Consumer-39: 32 * 32
Consumer-40: 33 * 33
Consumer-35: 34 * 34
Consumer-37: 35 * 35
Consumer-34: 36 * 36
Consumer-36: 37 * 37
Consumer-28: 38 * 38
Consumer-38: 39 * 39
Consumer-2: Exiting
Consumer-3: Exiting
Consumer-7: ExitingConsumer-1: Exiting
Consumer-11: Exiting
Consumer-13: Exiting

Consumer-4: Exiting
Consumer-8: Exiting
Consumer-9: Exiting
Consumer-6: Exiting
Consumer-5: Exiting
Consumer-10: Exiting
Consumer-17: Exiting
Consumer-12: Exiting
Consumer-14: Exiting
Consumer-22: Exiting
Consumer-19: Exiting
Consumer-18: Exiting
Consumer-26: Exiting
Consumer-16: Exiting
Consumer-21: Exiting
Consumer-30: Exiting
Consumer-25: Exiting
Consumer-20: Exiting
Consumer-27: Exiting
Consumer-29: Exiting
Consumer-15: Exiting
Consumer-31: Exiting
Consumer-23: Exiting
Consumer-40: Exiting
Consumer-39: Exiting
Consumer-38: Exiting
Consumer-32: Exiting
Consumer-37: Exiting
Consumer-35: Exiting
Consumer-28: Exiting
Consumer-36: Exiting
Consumer-33: Exiting
Consumer-34: Exiting
Consumer-24: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 3 * 3 = 9
Result: 7 * 7 = 49
Result: 2 * 2 = 4
Result: 12 * 12 = 144
Result: 10 * 10 = 100
Result: 13 * 13 = 169
Result: 8 * 8 = 64
Result: 11 * 11 = 121
Result: 9 * 9 = 81
Result: 15 * 15 = 225
Result: 20 * 20 = 400
Result: 17 * 17 = 289
Result: 14 * 14 = 196
Result: 18 * 18 = 324
Result: 19 * 19 = 361
Result: 16 * 16 = 256
Result: 26 * 26 = 676
Result: 27 * 27 = 729
Result: 23 * 23 = 529
Result: 25 * 25 = 625
Result: 28 * 28 = 784
Result: 21 * 21 = 441
Result: 22 * 22 = 484
Result: 24 * 24 = 576
Result: 33 * 33 = 1089
Result: 32 * 32 = 1024
Result: 39 * 39 = 1521
Result: 31 * 31 = 961
Result: 35 * 35 = 1225
Result: 38 * 38 = 1444
Result: 34 * 34 = 1156
Result: 37 * 37 = 1369
Result: 36 * 36 = 1296
Result: 30 * 30 = 900
Result: 29 * 29 = 841

Process finished with exit code 0
相关文章
|
5月前
|
存储 缓存 自然语言处理
“error“: { “root_cause“: [{ “type“: “circuit_breaking_exception“, “reason“: “[parent] D【已解决】
“error“: { “root_cause“: [{ “type“: “circuit_breaking_exception“, “reason“: “[parent] D【已解决】
47 1
|
Java Go API
译|Don’t just check errors, handle them gracefully(二)
译|Don’t just check errors, handle them gracefully(二)
97 0
|
程序员 Go API
译|Don’t just check errors, handle them gracefully(一)
译|Don’t just check errors, handle them gracefully
81 0
|
Java Maven
An attempt was made to call a method that does not exist. The attempt was made from the following
An attempt was made to call a method that does not exist. The attempt was made from the following
463 0
error: ‘PRIO_PROCESS’ undeclared
error: ‘PRIO_PROCESS’ undeclared
96 0
|
应用服务中间件 Android开发
a configuration error occurred during startup. place verify the preference field whth the prompt:TomcatJDK name:
a configuration error occurred during startup. place verify the preference field whth the prompt:TomcatJDK name:
137 0
a configuration error occurred during startup. place verify the preference field whth the prompt:TomcatJDK name: