物无定味适口者珍,Python3并发场景(CPU密集/IO密集)任务的并发方式的场景抉择(多线程threading/多进程multiprocessing/协程asyncio)

简介: 一般情况下,大家对Python原生的并发/并行工作方式:进程、线程和协程的关系与区别都能讲清楚。甚至具体的对象名称、内置方法都可以如数家珍,这显然是极好的,但我们其实都忽略了一个问题,就是具体应用场景,三者的使用目的是一样的,话句话说,使用结果是一样的,都可以提高程序运行的效率,但到底那种场景用那种方式更好一点?这就好比,目前主流的汽车发动机变速箱无外乎三种:双离合、CVT以及传统AT。主机厂把它们搭载到不同的发动机和车型上,它们都是变速箱,都可以将发动机产生的动力作用到车轮上,但不同使用场景下到底该选择那种变速箱?这显然也是一个问题。

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_221

一般情况下,大家对Python原生的并发/并行工作方式:进程、线程和协程的关系与区别都能讲清楚。甚至具体的对象名称、内置方法都可以如数家珍,这显然是极好的,但我们其实都忽略了一个问题,就是具体应用场景,三者的使用目的是一样的,话句话说,使用结果是一样的,都可以提高程序运行的效率,但到底那种场景用那种方式更好一点?

这就好比,目前主流的汽车发动机变速箱无外乎三种:双离合、CVT以及传统AT。主机厂把它们搭载到不同的发动机和车型上,它们都是变速箱,都可以将发动机产生的动力作用到车轮上,但不同使用场景下到底该选择那种变速箱?这显然也是一个问题。

所谓“无场景,不功能”,本次我们来讨论一下,具体的并发编程场景有哪些,并且对应到具体场景,应该怎么选择并发手段和方式。

什么是并发和并行?

在讨论场景之前,我们需要将多任务执行的方式进行一下分类,那就是并发方式和并行方式。教科书上告诉我们:并行是指两个或者多个事件在同一时刻发生;而并发是指两个或多个事件在同一时间间隔内发生。 在多道程序环境下,并发性是指在一段时间内宏观上有多个程序在同时运行,但在单处理机系统中,每一时刻却仅能有一道程序执行,故微观上这些程序只能是分时地交替执行。

好像有那么一点抽象,好吧,让我们务实一点,由于GIL全局解释器锁的存在,在Python编程领域,我们可以简单粗暴地将并发和并行用程序通过能否使用多核CPU来区分,能使用多核CPU就是并行,不能使用多核CPU,只能单核处理的,就是并发。就这么简单,是的,Python的GIL全局解释器锁帮我们把问题简化了, 这是Python的大幸?还是不幸?

Python中并发任务实现方式包含:多线程threading和协程asyncio,它们的共同点都是交替执行,而区别是多线程threading是抢占式的,而协程asyncio是协作式的,原理也很简单,只有一颗CPU可以用,而一颗CPU一次只能做一件事,所以只能靠不停地切换才能完成并发任务。

Python中并行任务的实现方式是多进程multiprocessing,通过multiprocessing库,Python可以在程序主进程中创建新的子进程。这里的一个进程可以被认为是一个几乎完全不同的程序,尽管从技术上讲,它们通常被定义为资源集合,其中资源包括内存、文件句柄等。换一种说法是,每个子进程都拥有自己的Python解释器,因此,Python中的并行任务可以使用一颗以上的CPU,每一颗CPU都可以跑一个进程,是真正的同时运行,而不需要切换,如此Python就可以完成并行任务。

什么时候使用并发?IO密集型任务

现在我们搞清楚了,Python里的并发运行方式就是多线程threading和协程asyncio,那么什么场景下使用它们?

一般情况下,任务场景,或者说的更准确一些,任务类型,无非两种:CPU密集型任务和IO密集型任务。

什么是IO密集型任务?IO就是Input-Output的缩写,说白了就是程序的输入和输出,想一想确实就是这样,您的电脑,它不就是这两种功能吗?用键盘、麦克风、摄像头输入数据,然后再用屏幕和音箱进行输出操作。

但输入和输出操作要比电脑中的CPU运行速度慢,换句话说,CPU得等着这些比它慢的输入和输出操作,说白了就是CPU运算一会,就得等这些IO操作,等IO操作完了,CPU才能继续运算一会,然后再等着IO操作,如图所示:

由此可知,并发适合这种IO操作密集和频繁的工作,因为就算CPU是苹果最新ARM架构的M2芯片,也没有用武之地。

另外,如果把IO密集型任务具象化,那就是我们经常操作的:硬盘读写(数据库读写)、网络请求、文件的打印等等。

并发方式的选择:多线程threading还是协程asyncio?

既然涉及硬盘读写(数据库读写)、网络请求、文件打印等任务都算并发任务,那我们就真正地实践一下,看看不同的并发方式到底能提升多少效率?

一个简单的小需求,对本站数据进行重复抓取操作,并计算首页数据文本的行数:

import requests  
import time  
  
  
def download_site(url, session):  
    with session.get(url) as response:  
        print(f"下载了{len(response.content)}行数据")  
  
  
def download_all_sites(sites):  
    with requests.Session() as session:  
        for url in sites:  
            download_site(url, session)  
  
  
if __name__ == "__main__":  
  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    download_all_sites(sites)  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了{duration}秒")

在不使用任何并发手段的前提下,程序返回:

下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了76347行数据  
下载了 50 次数据,执行了8.781155824661255秒  
[Finished in 9.6s]

这里程序的每一步都是同步操作,也就是说当第一次抓取网站首页时,剩下的49次都在等待。

接着使用多线程threading来改造程序:

import concurrent.futures  
import requests  
import threading  
import time  
  
  
thread_local = threading.local()  
  
  
def get_session():  
    if not hasattr(thread_local, "session"):  
        thread_local.session = requests.Session()  
    return thread_local.session  
  
  
def download_site(url):  
    session = get_session()  
    with session.get(url) as response:  
        print(f"下载了{len(response.content)}行数据")  
  
  
def download_all_sites(sites):  
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:  
        executor.map(download_site, sites)  
  
  
if __name__ == "__main__":  
  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    download_all_sites(sites)  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了{duration}秒")

这里通过with关键词开启线程池上下文管理器,并发8个线程进行下载,程序返回:

下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76161行数据  
下载了76424行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了 50次,执行了7.680492877960205秒

很明显,效率上有所提升,事实上,每个线程其实是在不停“切换”着运行,这就节省了单线程每次等待爬取结果的时间:

由此带来了另外一个问题:上下文切换的时间开销。

让我们继续改造,用协程来一试锋芒,首先安装异步web请求库aiohttp:

pip3 install aiohttp

改写逻辑:

import asyncio  
import time  
import aiohttp  
  
  
async def download_site(session, url):  
    async with session.get(url) as response:  
        print(f"下载了{response.content_length}行数据")  
  
  
async def download_all_sites(sites):  
    async with aiohttp.ClientSession() as session:  
        tasks = []  
        for url in sites:  
            task = asyncio.ensure_future(download_site(session, url))  
            tasks.append(task)  
        await asyncio.gather(*tasks, return_exceptions=True)  
  
  
if __name__ == "__main__":  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    asyncio.run(download_all_sites(sites))  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了{duration}秒")

程序返回:



下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76424行数据  
下载了76161行数据  
下载了76424行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据  
下载了76161行数据

下载了 50次,执行了6.893810987472534秒

效率上百尺竿头更进一步,同样的使用with关键字操作上下文管理器,协程使用asyncio.ensure\_future()创建任务列表,该列表还负责启动它们。创建所有任务后,使用asyncio.gather()来保持会话上下文的实例,直到所有爬取任务完成。和多线程threading的区别是,协程并不需要切换上下文,因此每个任务所需的资源和创建时间要少得多,因此创建和运行更多的任务效率更高:

综上,并发逻辑归根结底是减少CPU等待的时间,也就是让CPU少等一会儿,而协程的工作方式显然让CPU等待的时间最少。

并行方式:多进程multiprocessing

再来试试多进程multiprocessing,并行能不能干并发的事?

import requests  
import multiprocessing  
import time  
  
session = None  
  
  
def set_global_session():  
    global session  
    if not session:  
        session = requests.Session()  
  
  
def download_site(url):  
    with session.get(url) as response:  
        name = multiprocessing.current_process().name  
        print(f"读了{len(response.content)}行")  
  
  
def download_all_sites(sites):  
    with multiprocessing.Pool(initializer=set_global_session) as pool:  
        pool.map(download_site, sites)  
  
  
if __name__ == "__main__":  
    sites = ["https://v3u.cn"] * 50  
    start_time = time.time()  
    download_all_sites(sites)  
    duration = time.time() - start_time  
    print(f"下载了 {len(sites)}次,执行了{duration}秒")

这里我们依然使用上下文管理器开启进程池,默认进程数匹配当前计算机的CPU核心数,也就是有几核就开启几个进程,程序返回:

读了76000行  
读了76241行  
读了76044行  
读了75894行  
读了76290行  
读了76312行  
读了76419行  
读了76753行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
读了76290行  
下载了 50次,执行了8.195281982421875秒

虽然比同步程序要快,但无疑的,效率上要低于多线程和协程。为什么?因为多进程不适合IO密集型任务,虽然可以利用多核资源,但没有任何意义:

无论开多少进程,CPU都没有用武之地,多数情况下CPU都在等待IO操作,也就是说,多核反而拖累了IO程序的执行。

并行方式的选择:CPU密集型任务

什么是CPU密集型任务?这里我们可以使用逆定理:所有不涉及硬盘读写(数据库读写)、网络请求、文件打印等任务都算CPU密集型任务任务,说白了就是,计算型任务。

以求平方和为例子:

import time  
  
  
def cpu_bound(number):  
    return sum(i * i for i in range(number))  
  
  
def find_sums(numbers):  
    for number in numbers:  
        cpu_bound(number)  
  
  
if __name__ == "__main__":  
    numbers = [5_000_000 + x for x in range(20)]  
    start_time = time.time()  
    find_sums(numbers)  
    duration = time.time() - start_time  
    print(f"{duration}秒")

同步执行20次,需要花费多少时间?

4.466595888137817秒

再来试试并行方式:

import multiprocessing  
import time  
  
  
def cpu_bound(number):  
    return sum(i * i for i in range(number))  
  
  
def find_sums(numbers):  
    with multiprocessing.Pool() as pool:  
        pool.map(cpu_bound, numbers)  
  
  
if __name__ == "__main__":  
    numbers = [5_000_000 + x for x in range(20)]  
  
    start_time = time.time()  
    find_sums(numbers)  
    duration = time.time() - start_time  
    print(f"{duration}秒")

八核处理器,开八个进程开始跑:

1.1755797863006592秒

不言而喻,并行方式有效提高了计算效率。

最后,既然之前用并行方式运行了IO密集型任务,我们就再来试试用并发的方式运行CPU密集型任务:

import concurrent.futures  
import time  
  
  
def cpu_bound(number):  
    return sum(i * i for i in range(number))  
  
  
def find_sums(numbers):  
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:  
        executor.map(cpu_bound, numbers)  
  
  
if __name__ == "__main__":  
    numbers = [5_000_000 + x for x in range(20)]  
  
    start_time = time.time()  
    find_sums(numbers)  
    duration = time.time() - start_time  
    print(f"{duration}秒")

单进程开8个线程,走起:

4.452666759490967秒

如何?和并行方式运行IO密集型任务一样,可以运行,但是没有任何意义。为什么?因为没有任何IO操作了,CPU不需要等待了,CPU只要全力运算即可,所以你上多线程或者协程,无非就是画蛇添足、多此一举。

结语

有经验的汽修师傅会告诉你,想省油就选CVT和双离合,想质量稳定就选AT,经常高速上激烈驾驶就选双离合,经常市区内堵车就选CVT;同样地,作为经验丰富的后台研发,你也可以告诉汽修师傅,任何不需要CPU等待的任务就选择并行(multiprocessing)的处理方式,而需要CPU等待时间过长的任务,选择并发(threading/asyncio)。反过来,我就想用CVT在高速上飙车,用双离合在市区堵车,行不行?行,但没有意义,或者说的更准确一些,没有任何额外的收益;而用并发方式执行CPU密集型任务,用并行方式执行IO密集型任务行不行?也行,但依然没有任何额外的收益, 无他,唯物无定味,适口者珍矣。

原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_221

相关文章
|
3月前
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
2月前
|
机器学习/深度学习 API Python
Python 高级编程与实战:深入理解网络编程与异步IO
在前几篇文章中,我们探讨了 Python 的基础语法、面向对象编程、函数式编程、元编程、性能优化、调试技巧、数据科学、机器学习、Web 开发和 API 设计。本文将深入探讨 Python 在网络编程和异步IO中的应用,并通过实战项目帮助你掌握这些技术。
|
3月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
104 0
|
6月前
|
存储 Java 数据库
如何处理线程池关闭时未完成的任务?
总之,处理线程池关闭时未完成的任务需要综合考虑多种因素,并根据实际情况选择合适的处理方式。通过合理的处理,可以最大程度地减少任务丢失和数据不一致等问题,确保系统的稳定运行和业务的顺利开展。
270 64
|
6月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
189 62
|
4月前
|
消息中间件 调度
如何区分进程、线程和协程?看这篇就够了!
本课程主要探讨操作系统中的进程、线程和协程的区别。进程是资源分配的基本单位,具有独立性和隔离性;线程是CPU调度的基本单位,轻量且共享资源,适合并发执行;协程更轻量,由程序自身调度,适合I/O密集型任务。通过学习这些概念,可以更好地理解和应用它们,以实现最优的性能和资源利用。
139 11
|
4月前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
187 17
|
6月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
131 12
|
6月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
6月前
|
网络协议 物联网 API
Python网络编程:Twisted框架的异步IO处理与实战
【10月更文挑战第26天】Python 是一门功能强大且易于学习的编程语言,Twisted 框架以其事件驱动和异步IO处理能力,在网络编程领域独树一帜。本文深入探讨 Twisted 的异步IO机制,并通过实战示例展示其强大功能。示例包括创建简单HTTP服务器,展示如何高效处理大量并发连接。
111 1