Python多进程教程

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介:

Python多进程教程


Python2.6版本中新添了multiprocessing模块。它最初由Jesse Noller和Richard Oudkerk定义在PEP 371中。就像你能通过threading模块衍生线程一样,multiprocessing 模块允许你衍生进程。这里用到的思想:因为你现在能衍生进程,所以你能够避免使用全局解释器锁(GIL),并且充分利用机器的多个处理器。

多进程包也包含一些根本不在threading 模块中的API。比如:有一个灵活的Pool类能让你在多个输入下并行化地执行函数。我们将在后面的小节讲解Pool类。我们将以multiprocessing模块的Process类开始讲解。

开始学习multiprocessing模块

Process这个类和threading模块中的Thread类很像。让我们创建一系列调用相同函数的进程,并且看看它是如何工作的。


 
 
  1. import os 
  2.  
  3. from multiprocessing import Process 
  4.  
  5. def doubler(number): 
  6.  
  7.     ""
  8.  
  9.     A doubling function that can be used by a process 
  10.  
  11.     ""
  12.  
  13.     result = number * 2 
  14.  
  15.     proc = os.getpid() 
  16.  
  17.     print('{0} doubled to {1} by process id: {2}'.format( 
  18.  
  19.         number, result, proc)) 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     numbers = [5, 10, 15, 20, 25] 
  24.  
  25.     procs = [] 
  26.  
  27.     for index, number in enumerate(numbers): 
  28.  
  29.         proc = Process(target=doubler, args=(number,)) 
  30.  
  31.         procs.append(proc) 
  32.  
  33.         proc.start() 
  34.  
  35.     for proc in procs: 
  36.  
  37.         proc.join() 

对于上面的例子,我们导入Process类、创建一个叫doubler的函数。在函数中,我们将传入的数字乘上2。我们也用Python的os模块来获取当前进程的ID(pid)。这个ID将告诉我们哪个进程正在调用doubler函数。然后,在下面的代码块中,我们实例化了一系列的Process类并且启动它们。最后一个循环只是调用每个进程的join()方法,该方法告诉Python等待进程直到它结束。如果你需要结束一个进程,你可以调用它的terminate()方法。

当你运行上面的代码,你应该看到和下面类似的输出结果:


 
 
  1. 5 doubled to 10 by process id: 10468 
  2.  
  3. 10 doubled to 20 by process id: 10469 
  4.  
  5. 15 doubled to 30 by process id: 10470 
  6.  
  7. 20 doubled to 40 by process id: 10471 
  8.  
  9. 25 doubled to 50 by process id: 10472 

有时候,你最好给你的进程取一个易于理解的名字 。幸运的是,Process类确实允许你访问同样的进程。让我们来看看如下例子:


 
 
  1. import os 
  2.  
  3. from multiprocessing import Process, current_process 
  4.  
  5. def doubler(number): 
  6.  
  7.     ""
  8.  
  9.     A doubling function that can be used by a process 
  10.  
  11.     ""
  12.  
  13.     result = number * 2 
  14.  
  15.     proc_name = current_process().name 
  16.  
  17.     print('{0} doubled to {1} by: {2}'.format( 
  18.  
  19.         number, result, proc_name)) 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     numbers = [5, 10, 15, 20, 25] 
  24.  
  25.     procs = [] 
  26.  
  27.     proc = Process(target=doubler, args=(5,)) 
  28.  
  29.     for index, number in enumerate(numbers): 
  30.  
  31.         proc = Process(target=doubler, args=(number,)) 
  32.  
  33.         procs.append(proc) 
  34.  
  35.         proc.start() 
  36.  
  37.     proc = Process(target=doubler, name='Test', args=(2,)) 
  38.  
  39.     proc.start() 
  40.  
  41.     procs.append(proc) 
  42.  
  43.     for proc in procs: 
  44.  
  45.         proc.join() 

这一次,我们多导入了current_process。current_process基本上和threading模块的current_thread是类似的东西。我们用它来获取正在调用我们的函数的线程的名字。你将注意到我们没有给前面的5个进程设置名字。然后我们将第6个进程的名字设置为“Test”。

让我们看看我们将得到什么样的输出结果:


 
 
  1. 5 doubled to 10 by: Process-2 
  2.  
  3. 10 doubled to 20 by: Process-3 
  4.  
  5. 15 doubled to 30 by: Process-4 
  6.  
  7. 20 doubled to 40 by: Process-5 
  8.  
  9. 25 doubled to 50 by: Process-6 
  10.  
  11. 2 doubled to 4 by: Test 

输出结果说明:默认情况下,multiprocessing模块给每个进程分配了一个编号,而该编号被用来组成进程的名字的一部分。当然,如果我们给定了名字的话,并不会有编号被添加到名字中。

multiprocessing模块支持锁,它和threading模块做的方式一样。你需要做的只是导入Lock,获取它,做一些事,释放它。


 
 
  1. from multiprocessing import Process, Lock 
  2.  
  3. def printer(item, lock): 
  4.  
  5.     ""
  6.  
  7.     Prints out the item that was passed in 
  8.  
  9.     ""
  10.  
  11.     lock.acquire() 
  12.  
  13.     try: 
  14.  
  15.         print(item) 
  16.  
  17.     finally: 
  18.  
  19.         lock.release() 
  20.  
  21. if __name__ == '__main__'
  22.  
  23.     lock = Lock() 
  24.  
  25.     items = ['tango''foxtrot', 10] 
  26.  
  27.     for item in items: 
  28.  
  29.         p = Process(target=printer, args=(item, lock)) 
  30.  
  31.         p.start() 

我们在这里创建了一个简单的用于打印函数,你输入什么,它就输出什么。为了避免线程之间互相阻塞,我们使用Lock对象。代码循环列表中的三个项并为它们各自都创建一个进程。每一个进程都将调用我们的函数,并且每次遍历到的那一项作为参数传入函数。因为我们现在使用了锁,所以队列中下一个进程将一直阻塞,直到之前的进程释放锁。

日志

为进程创建日志与为线程创建日志有一些不同。它们存在不同是因为Python的logging包不使用共享锁的进程,因此有可能以来自不同进程的信息作为结束的标志。让我们试着给前面的例子添加基本的日志。代码如下:


 
 
  1. import logging 
  2.  
  3. import multiprocessing 
  4.  
  5. from multiprocessing import Process, Lock 
  6.  
  7. def printer(item, lock): 
  8.  
  9.     ""
  10.  
  11.     Prints out the item that was passed in 
  12.  
  13.     ""
  14.  
  15.     lock.acquire() 
  16.  
  17.     try: 
  18.  
  19.         print(item) 
  20.  
  21.     finally: 
  22.  
  23.         lock.release() 
  24.  
  25. if __name__ == '__main__'
  26.  
  27.     lock = Lock() 
  28.  
  29.     items = ['tango''foxtrot', 10] 
  30.  
  31.     multiprocessing.log_to_stderr() 
  32.  
  33.     logger = multiprocessing.get_logger() 
  34.  
  35.     logger.setLevel(logging.INFO) 
  36.  
  37.     for item in items: 
  38.  
  39.         p = Process(target=printer, args=(item, lock)) 
  40.  
  41.         p.start() 

最简单的添加日志的方法通过推送它到stderr实现。我们能通过调用thelog_to_stderr() 函数来实现该方法。然后我们调用get_logger 函数获得一个logger实例,并将它的日志等级设为INFO。之后的代码是相同的。需要提示下这里我并没有调用join()方法。取而代之的:当它退出,父线程将自动调用join()方法。

当你这么做了,你应该得到类似下面的输出:


 
 
  1. [INFO/Process-1] child process calling self.run() 
  2.  
  3. tango 
  4.  
  5. [INFO/Process-1] process shutting down 
  6.  
  7. [INFO/Process-1] process exiting with exitcode 0 
  8.  
  9. [INFO/Process-2] child process calling self.run() 
  10.  
  11. [INFO/MainProcess] process shutting down 
  12.  
  13. foxtrot 
  14.  
  15. [INFO/Process-2] process shutting down 
  16.  
  17. [INFO/Process-3] child process calling self.run() 
  18.  
  19. [INFO/Process-2] process exiting with exitcode 0 
  20.  
  21. 10 
  22.  
  23. [INFO/MainProcess] calling join() for process Process-3 
  24.  
  25. [INFO/Process-3] process shutting down 
  26.  
  27. [INFO/Process-3] process exiting with exitcode 0 
  28.  
  29. [INFO/MainProcess] calling join() for process Process-2 

现在如果你想要保存日志到硬盘中,那么这件事就显得有些棘手。你能在Python的logging Cookbook阅读一些有关那类话题。

Pool类

Pool类被用来代表一个工作进程池。它有让你将任务转移到工作进程的方法。让我们看下面一个非常简单的例子。


 
 
  1. from multiprocessing import Pool 
  2.  
  3. def doubler(number): 
  4.  
  5.     return number * 2 
  6.  
  7. if __name__ == '__main__'
  8.  
  9.     numbers = [5, 10, 20] 
  10.  
  11.     pool = Pool(processes=3) 
  12.  
  13.     print(pool.map(doubler, numbers)) 

基本上执行上述代码之后,一个Pool的实例被创建,并且该实例创建了3个工作进程。然后我们使用map 方法将一个函数和一个可迭代对象映射到每个进程。最后我们打印出这个例子的结果:[10, 20, 40]。

你也能通过apply_async方法获得池中进程的运行结果:


 
 
  1. from multiprocessing import Pool 
  2.  
  3. def doubler(number): 
  4.  
  5.     return number * 2 
  6.  
  7. if __name__ == '__main__'
  8.  
  9.     pool = Pool(processes=3) 
  10.  
  11.     result = pool.apply_async(doubler, (25,)) 
  12.  
  13.     print(result.get(timeout=1)) 

我们上面做的事实际上就是请求进程的运行结果。那就是get函数的用途。它尝试去获取我们的结果。你能够注意到我们设置了timeout,这是为了预防我们调用的函数发生异常的情况。毕竟我们不想要它被无限期地阻塞。

进程通信

当遇到进程间通信的情况,multiprocessing 模块提供了两个主要的方法:Queues 和 Pipes。Queue 实现上既是线程安全的也是进程安全的。让我们看一个相当简单的并且基于 Queue的例子。代码来自于我的文章(threading articles)。


 
 
  1. from multiprocessing import Process, Queue 
  2.  
  3. sentinel = -1 
  4.  
  5. def creator(data, q): 
  6.  
  7.     ""
  8.  
  9.     Creates data to be consumed and waits for the consumer 
  10.  
  11.     to finish processing 
  12.  
  13.     ""
  14.  
  15.     print('Creating data and putting it on the queue'
  16.  
  17.     for item in data: 
  18.  
  19.         q.put(item) 
  20.  
  21. def my_consumer(q): 
  22.  
  23.     ""
  24.  
  25.     Consumes some data and works on it 
  26.  
  27.     In this caseall it does is double the input 
  28.  
  29.     ""
  30.  
  31.     while True
  32.  
  33.         data = q.get() 
  34.  
  35.         print('data found to be processed: {}'.format(data)) 
  36.  
  37.         processed = data * 2 
  38.  
  39.         print(processed) 
  40.  
  41.         if data is sentinel: 
  42.  
  43.             break 
  44.  
  45. if __name__ == '__main__'
  46.  
  47.     q = Queue() 
  48.  
  49.     data = [5, 10, 13, -1] 
  50.  
  51.     process_one = Process(target=creator, args=(data, q)) 
  52.  
  53.     process_two = Process(target=my_consumer, args=(q,)) 
  54.  
  55.     process_one.start() 
  56.  
  57.     process_two.start() 
  58.  
  59.     q.close() 
  60.  
  61.     q.join_thread() 
  62.  
  63.     process_one.join() 
  64.  
  65.     process_two.join() 

在这里我们只需要导入Queue和Process。Queue用来创建数据和添加数据到队列中,Process用来消耗数据并执行它。通过使用Queue的put()和get()方法,我们就能添加数据到Queue、从Queue获取数据。代码的最后一块只是创建了Queue 对象以及两个Process对象,并且运行它们。你能注意到我们在进程对象上调用join()方法,而不是在Queue本身上调用。

总结

我们这里有大量的资料。你已经学习如何使用multiprocessing模块指定不变的函数、使用Queues在进程间通信、给进程命名等很多事。在Python文档中也有很多本文没有接触到的知识点,因此也务必深入了解下文档。与此同时,你现在知道如何用Python利用你电脑所有的处理能力了!


作者:佚名

来源:51CTO

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
11天前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
37 4
|
7天前
|
数据采集 Web App开发 数据可视化
Python爬虫教程:Selenium可视化爬虫的快速入门
Python爬虫教程:Selenium可视化爬虫的快速入门
|
16天前
|
数据可视化 IDE 开发工具
【Python篇】PyQt5 超详细教程——由入门到精通(中篇二)
【Python篇】PyQt5 超详细教程——由入门到精通(中篇二)
168 13
|
11天前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
21 3
|
16天前
|
监控 数据可视化 搜索推荐
【Python篇】matplotlib超详细教程-由入门到精通(下篇)2
【Python篇】matplotlib超详细教程-由入门到精通(下篇)
28 8
|
14天前
|
缓存 测试技术 Apache
告别卡顿!Python性能测试实战教程,JMeter&Locust带你秒懂性能优化💡
【10月更文挑战第1天】告别卡顿!Python性能测试实战教程,JMeter&Locust带你秒懂性能优化💡
45 4
|
16天前
|
数据可视化 API 数据处理
【Python篇】matplotlib超详细教程-由入门到精通(上篇)
【Python篇】matplotlib超详细教程-由入门到精通(上篇)
59 5
|
16天前
|
编解码 数据可视化 IDE
【Python篇】matplotlib超详细教程-由入门到精通(下篇)1
【Python篇】matplotlib超详细教程-由入门到精通(下篇)
26 3
|
16天前
|
数据可视化 IDE 开发者
【Python篇】PyQt5 超详细教程——由入门到精通(终篇)
【Python篇】PyQt5 超详细教程——由入门到精通(终篇)
21 1
|
7天前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
11 0