「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!

简介: 「多线程大杀器」Python并发编程利器:ThreadPoolExecutor,让你一次性轻松开启多个线程,秒杀大量任务!

随着程序复杂度和数据量的不断增加,传统的同步编程方式已经无法满足开发人员的需求。异步编程随之产生,能够提供更高的并发性能和更好的资源利用率。Python的concurrent.futures模块是一个很好的异步编程工具,它提供了一组接口,可以方便地进行并发编程。


Python中已经有了threading模块,为什么还需要这些线程池、进程池处理呢?以Python爬虫为例,需要控制同时爬取的线程数,比如我们创建了20甚至100个线程,而同时只允许5-10个线程在运行,但是20-100个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?


其实只需要同时创建运行5-10个线程就可以,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行。

然而自己编写线程池很难写的比较完美,还需要考虑复杂情况下的线程同步,很容易发生死锁。而从Python3.2 开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对 threadingmultiprocessing 的进一步抽象,不仅可以帮我们自动调度线程,还可以做到:

  • 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  • 当一个线程完成的时候,主线程能够立即知道。
  • 让多线程和多进程的编码接口一致。


简介


concurrent.futures 模块是 Python3.2 中引入的新模块,用于支持异步执行,以及在多核CPU和网络I/O中进行高效的并发编程。这个模块提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,简化了跨平台异步编程的实现。


首先,让我们先来理解两种并发编程的方式:


1、多进程

当通过多进程来实现并发编程时,程序会将任务分配给多个进程,这些进程可以在不同的CPU上同时运行。进程之间是独立的,各自有自己的内存空间等,可以实现真正的并行执行。不过,进程之间的通信比较耗时,需要使用IPC(进程间通信)机制,而且进程之间的切换比线程之间的切换耗时,所以创建进程的代价较高。


2、多线程

当通过多线程来实现并发编程时,程序会将任务分配给多个线程,这些线程可以在同一个进程中的不同CPU核上同时运行。线程之间共享进程的内存空间,因此开销比较小。但是需要注意,在Python解释器中,线程是无法实现真正的并行执行,因为Python有GIL(全局解释器锁),它确保同时只有一个线程运行Python代码。因此,一个Python进程中的多个线程并不能并行执行,在使用多线程编程时不能完全利用多核CPU。


简单使用(案例及使用参数说明)


concurrent.futures 是Python中执行异步编程的重要工具,它提供了以下两个类:


1、ThreadPoolExecutor


ThreadPoolExecutor 创建一个线程池,任务可以提交到这个线程池中执行。ThreadPoolExecutorProcessPoolExecutor 更容易使用,且没有像进程那样的开销。它可以让我们在一个Python解释器中进行跨线程异步编程,因为它规避了GIL。


示例:

from concurrent.futures import ThreadPoolExecutor
def test(num):
    print("Threads" num)
# 新建ThreadPoolExecutor对象并指定最大的线程数量
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务到线程池中
    executor.submit(test, 1)
    executor.submit(test, 2)
    executor.submit(test, 3)

输出结果:

Thread 1
Thread 2
Thread 3

2、ProcessPoolExecutor


ProcessPoolExecutor 创建一个进程池,任务可以提交到这个进程池中执行。当对于单个任务的处理开销很大,例如大规模计算密集型应用,应该使用这个线程池。


示例:

from concurrent.futures import ProcessPoolExecutor
def test(num):
    print("Processs" num)
# 新建ProcessPoolExecutor对象并指定最大的进程数量
with ProcessPoolExecutor(max_workers=3) as executor:
    # 提交多个任务到进程池中
    executor.submit(test, 1)
    executor.submit(test, 2)
    executor.submit(test, 3)

输出结果:

Process 2
Process 1
Process 3

等待任务完成


1、ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。

2、使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄,注意submit()不是阻塞的,而是立即返回。

3、通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。

4、使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。

5、使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

在提交任务之后,我们通常需要等待它们完成,可以使用如下方法:


1、result()

用于获取 submit() 方法返回的 Future 对象的结果。该方法是同步的,Block主线程,直至得到结果或者抛异常。


示例:

from concurrent.futures import ThreadPoolExecutor
def test(num):
    print("Tasks" num)
# 新建ThreadPoolExecutor对象并指定最大的线程数量
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务到线程池中,并使用result方法等待任务完成
    future_1 = executor.submit(test, 1)
    future_2 = executor.submit(test, 2)
    future_3 = executor.submit(test, 3)
    print(future_1.result())

输出:

Task 1
Task 2
Task 3
None

2、add_done_callback()


给每个 submit() 返回的 Future 对象添加一个“完成时”的回调函数。主线程运行完毕而不需要等待任务完成,这个回调函数会在任务完成时自动执行。


示例:

from concurrent.futures import ThreadPoolExecutor
def callback(future):
    print("Task done? ", future.done())
    print("Result: ", future.result())
# 新建ThreadPoolExecutor对象并指定最大的线程数量
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务到线程池中,并添加“完成时”回调函数
    future_1 = executor.submit(pow, 2, 4)
    future_2 = executor.submit(pow, 3, 4)
    callback_future_1 = executor.submit(callback, future_1)

ThreadPoolExecutor类常用方法


ThreadPoolExecutorProcessPoolExecutor类下方法名大多都是同样的,只不过因为一个是线程方式、一个是进程方式,底层逻辑实现可能不同。由于我们在日常开发过程中,线程 ThreadPoolExecutor 使用的较多,所以以 ThreadPoolExecutor 为主要使用对象进行说明讲解


当使用 ThreadPoolExecutor 创建的线程池对象后,我们可以使用 submitmapshutdown等方法来操作线程池中的线程以及任务。


1、submit方法


ThreadPoolExecutorsubmit方法用于将任务提交到线程池中进行处理,该方法返回一个Future对象,代表将来会返回结果的值。submit方法的语法如下:

submit(fn, *args, **kwargs)

其中,fn参数是要执行的函数,*args**kwargs是fn的参数。


示例:

from concurrent.futures import ThreadPoolExecutor
def multiply(x, y):
    return x * y
with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(multiply, 10, 5)
    print(future.result()) # 50

2、map方法


ThreadPoolExecutormap方法用于将函数应用于迭代器中的每个元素,该方法返回一个迭代器。map方法的语法如下:

map(func, *iterables, timeout=None, chunksize=1)

其中,func参数是要执行的函数,*iterables是一个或多个迭代器,timeoutchunksize是可选参数。


示例:

from concurrent.futures import ThreadPoolExecutor
def square(x):
    return x * x
def cube(x):
    return x * x * x
with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(square, [1, 2, 3, 4, 5])
    for square_result in results:
        print(square_result)
    results = executor.map(cube, [1, 2, 3, 4, 5])
    for cube_result in results:
        print(cube_result)

3、shutdown方法


ThreadPoolExecutorshutdown方法用于关闭线程池,该方法在所有线程执行完毕后才会关闭线程池。shutdown方法的语法如下:

shutdown(wait=True)

其中,wait参数表示是否等待所有任务执行完毕后才关闭线程池,默认为True


示例:

from concurrent.futures import ThreadPoolExecutor
import time
def task(num):
    print("Task {} is running".format(num))
    time.sleep(1)
    return "Task {} is complete".format(num)
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(1, 4)]
    executor.shutdown()


源码分析


cocurrent.future模块中的future的意思是未来对象,可以把它理解为一个在未来完成的操作,这是异步编程的基础 。在线程池submit()之后,返回的就是这个future对象,返回的时候任务并没有完成,但会在将来完成。也可以称之为任务的返回容器,这个里面会存储任务的结果和状态。


那ThreadPoolExecutor内部是如何操作这个对象的呢?下面简单介绍 ThreadPoolExecutor 的部分代码:

1、init方法

init方法中主要重要的就是任务队列和线程集合,在其他方法中需要使用到。

image.png

init源码解析


2、submit方法


submit中有两个重要的对象,_base.Future()_WorkItem()对象,_WorkItem()对象负责运行任务和对**future对象进行设置,最后会将future对象返回,可以看到整个过程是立即返回的,没有阻塞。

image.png

submit源码解析


总结


在Python asyncio模块的基础之上,concurrent.futures模块为Python提供了一种简单高效的异步编程方式,它支持同步、线程、进程等多种并发执行方式,为开发人员提供了更加灵活高效的并发解决方案。我们可以使用submit、map、shutdown等方法来操作线程池中的线程以及任务,使用Future对象(异步编程的核心)来管理任务状态,更加方便地进行任务提交、状态管理和线程池的管理和控制。


在实际开发过程中,我们需要根据具体的应用场景,选择适当的异步编程工具和方式,以获得更好的效果。总之,concurrent.futures模块是Python异步编程中一个非常好的利器。

相关文章
|
4天前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
36 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
|
30天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
87 17
|
28天前
|
存储 安全 数据可视化
用Python实现简单的任务自动化
本文介绍如何使用Python实现任务自动化,提高效率和准确性。通过三个实用案例展示:1. 使用`smtplib`和`schedule`库自动发送邮件提醒;2. 利用`shutil`和`os`库自动备份文件;3. 借助`requests`库自动下载网页内容。每个案例包含详细代码和解释,并附带注意事项。掌握这些技能有助于个人和企业优化流程、节约成本。
56 3
|
2月前
|
数据采集 存储 监控
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
118 7
21个Python脚本自动执行日常任务(2)
|
1月前
|
数据采集 消息中间件 Java
python并发编程:什么是并发编程?python对并发编程有哪些支持?
并发编程能够显著提升程序的效率和响应速度。例如,网络爬虫通过并发下载将耗时从1小时缩短至20分钟;APP页面加载时间从3秒优化到200毫秒。Python支持多线程、多进程、异步I/O和协程等并发编程方式,适用于不同场景。线程通信方式包括共享变量、消息传递和同步机制,如Lock、Queue等。Python的并发编程特性使其在处理大规模数据和高并发访问时表现出色,成为许多领域的首选语言。
|
2月前
|
Python
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
61 18
|
3月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
70 12
|
2月前
|
数据采集 分布式计算 大数据
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。
|
3月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
4月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。