Passing Messages to Process

简介: Passing Messages to Process

Passing Messages to Process

As with threads, a common use pattern for multiple processes is to divide a job up among several workers to run in parallel. Effective use of multiple processes usually requires some communication between them, so that work can be divided and results can be aggregated. A simple way to communicate between processes with multiprocessing is to use a Queue to pass messages back and forth. Any object that can be serialized with pickle can pass through a Queue.

The example codes are follow:

import multiprocessing

class MyFancyClass:

def __init__(self, name):
    self.name = name


def do_something(self):
    proc_name = multiprocessing.current_process().name

    print('Doing something fancy in {} for {}'.format(proc_name, self.name))
def worker(q):
    obj = q.get()
    obj.do_something()

if __name__ == '__main__':

    queue = multiprocessing.Queue()
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()


        queue.put(MyFancyClass('Fancy Dan %s' % i))

    queue.close()
    queue.join_thread()
    p.join()

This short example passes just a single message to a single worker, then the main process waits for the workers to finish.

D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan 0
Doing something fancy in Process-3 for Fancy Dan 1
Doing something fancy in Process-2 for Fancy Dan 2

Process finished with exit code 0

A more complex example shows how to manage several workers that are consuming data from a JoinableQueue and passing results back to the parent process. The poison pill technique is used to stop these workers. After setting up the real tasks, the main program adds one “stop” value per worker to the job queue. When a worker encounters the special value, it breaks out of its processing loop. The main process uses the task queue’s join()

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue


    def run(self):
        proc_name = self.name

        while 1:
            next_task = self.task_queue.get()
            if next_task is None:
                # Posion pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)

class Task:

def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # Pretend to take time to do the work
        return '{} * {} = {}'.format(
            self.a, self.b,  self.a * self.b
        )

    def __str__(self):
        return '{} * {}'.format(self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]

    for w in consumers:
        w.start()

    # Enqueue jobs.
    num_jobs = 40
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

Although the jobs enter the queue in order, their execution occurs in parallel. Thus, there is no guarantee about the order in which they will be completed.

D:\Python39\python.exe C:/Users/admin/Desktop/multiprocessing_producer_consumer.py
Creating 40 consumers
Consumer-2: 0 * 0
Consumer-3: 1 * 1
Consumer-8: 2 * 2
Consumer-13: 3 * 3
Consumer-1: 4 * 4
Consumer-7: 5 * 5
Consumer-11: 6 * 6
Consumer-4: 7 * 7
Consumer-10: 8 * 8
Consumer-17: 9 * 9
Consumer-6: 10 * 10
Consumer-12: 11 * 11
Consumer-9: 12 * 12
Consumer-5: 13 * 13
Consumer-18: 14 * 14
Consumer-22: 15 * 15
Consumer-16: 16 * 16
Consumer-19: 17 * 17
Consumer-26: 18 * 18
Consumer-21: 19 * 19
Consumer-14: 20 * 20
Consumer-15: 21 * 21
Consumer-23: 22 * 22
Consumer-20: 23 * 23
Consumer-31: 24 * 24
Consumer-27: 25 * 25
Consumer-30: 26 * 26
Consumer-25: 27 * 27
Consumer-29: 28 * 28
Consumer-24: 29 * 29
Consumer-33: 30 * 30
Consumer-32: 31 * 31
Consumer-39: 32 * 32
Consumer-40: 33 * 33
Consumer-35: 34 * 34
Consumer-37: 35 * 35
Consumer-34: 36 * 36
Consumer-36: 37 * 37
Consumer-28: 38 * 38
Consumer-38: 39 * 39
Consumer-2: Exiting
Consumer-3: Exiting
Consumer-7: ExitingConsumer-1: Exiting
Consumer-11: Exiting
Consumer-13: Exiting

Consumer-4: Exiting
Consumer-8: Exiting
Consumer-9: Exiting
Consumer-6: Exiting
Consumer-5: Exiting
Consumer-10: Exiting
Consumer-17: Exiting
Consumer-12: Exiting
Consumer-14: Exiting
Consumer-22: Exiting
Consumer-19: Exiting
Consumer-18: Exiting
Consumer-26: Exiting
Consumer-16: Exiting
Consumer-21: Exiting
Consumer-30: Exiting
Consumer-25: Exiting
Consumer-20: Exiting
Consumer-27: Exiting
Consumer-29: Exiting
Consumer-15: Exiting
Consumer-31: Exiting
Consumer-23: Exiting
Consumer-40: Exiting
Consumer-39: Exiting
Consumer-38: Exiting
Consumer-32: Exiting
Consumer-37: Exiting
Consumer-35: Exiting
Consumer-28: Exiting
Consumer-36: Exiting
Consumer-33: Exiting
Consumer-34: Exiting
Consumer-24: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 3 * 3 = 9
Result: 7 * 7 = 49
Result: 2 * 2 = 4
Result: 12 * 12 = 144
Result: 10 * 10 = 100
Result: 13 * 13 = 169
Result: 8 * 8 = 64
Result: 11 * 11 = 121
Result: 9 * 9 = 81
Result: 15 * 15 = 225
Result: 20 * 20 = 400
Result: 17 * 17 = 289
Result: 14 * 14 = 196
Result: 18 * 18 = 324
Result: 19 * 19 = 361
Result: 16 * 16 = 256
Result: 26 * 26 = 676
Result: 27 * 27 = 729
Result: 23 * 23 = 529
Result: 25 * 25 = 625
Result: 28 * 28 = 784
Result: 21 * 21 = 441
Result: 22 * 22 = 484
Result: 24 * 24 = 576
Result: 33 * 33 = 1089
Result: 32 * 32 = 1024
Result: 39 * 39 = 1521
Result: 31 * 31 = 961
Result: 35 * 35 = 1225
Result: 38 * 38 = 1444
Result: 34 * 34 = 1156
Result: 37 * 37 = 1369
Result: 36 * 36 = 1296
Result: 30 * 30 = 900
Result: 29 * 29 = 841

Process finished with exit code 0
相关文章
|
4月前
|
PyTorch 算法框架/工具 Python
Traceback (most recent call last):WARNING: Dataset not found, nonexistent paths:
这篇文章描述了在使用YOLOv5进行训练时遇到的"Dataset not found"错误,分析了可能的原因,包括网络连接问题和数据集路径配置错误,并提供了相应的解决方法,如检查网络设置和确认数据集文件的存放位置。
Traceback (most recent call last):WARNING: Dataset not found, nonexistent paths:
|
6月前
(145) Table ‘./addon_collect_wukong_spider‘ is marked as crashed and should be repaired解决思路
(145) Table ‘./addon_collect_wukong_spider‘ is marked as crashed and should be repaired解决思路
29 0
|
7月前
Showing Recent Messages Command CodeSign failed with a nonzero exit code
Showing Recent Messages Command CodeSign failed with a nonzero exit code
79 0
|
存储 移动开发 自然语言处理
Document-Level event Extraction via human-like reading process 论文解读
文档级事件抽取(DEE)特别困难,因为它提出了两个挑战:论元分散和多事件。第一个挑战意味着一个事件记录的论元可能存在于文档中的不同句子中
102 0
config.guess: unable to guess system type、config.sub: missing argument
config.guess: unable to guess system type、config.sub: missing argument
143 0
|
应用服务中间件 Android开发
a configuration error occurred during startup. place verify the preference field whth the prompt:TomcatJDK name:
a configuration error occurred during startup. place verify the preference field whth the prompt:TomcatJDK name:
145 0
a configuration error occurred during startup. place verify the preference field whth the prompt:TomcatJDK name:
|
Java 网络架构 数据格式
Caused by: com.ctc.wstx.exc.WstxParsingException: Received non-all-whitespace CHARACTERS or CDATA event in nextTag()
webservice调用发生Caused by: com.ctc.wstx.exc.WstxParsingException: Received non-all-whitespace CHARACTERS or CDATA event in nextTag(), message中缺少了
4332 0