Python基于线程的并行和基于进程并行详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 当涉及到并行编程时,Python标准库提供了两种不同的方式:基于线程的并行(threading)和基于进程的并行(multiprocessing)。下面我将从概念、性能、使用场景和底层实现等方面对它们进行解释和比较。

线程并行和进程并行的概念

基于线程的并行(线程并行)是指在一个进程中创建多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。由于多个线程共享同一个进程,因此线程之间的通信和同步相对容易实现。线程并行常用于处理I/O密集型任务,例如网络请求、文件读写等。

然而,线程并行也存在一些问题。首先,由于多个线程共享同一个进程,一个线程的错误可能会影响到其他线程的执行,导致整个进程崩溃或数据不一致。因此,在线程并行中需要特别关注线程安全性,使用适当的同步机制(如锁、信号量等)来保护共享资源的访问。其次,由于操作系统对进程和线程的管理方式不同,线程之间的切换和调度会带来一些开销,例如上下文切换的开销。此外,由于全局解释器锁(GIL)的存在,Python中的线程无法实现真正的并行执行,对于CPU密集型任务并没有性能上的优势。

importthreadingdefworker(num):
"""线程执行的任务"""print("Worker %d is running..."%num)
# 创建3个线程,每个线程执行worker函数threads= []
foriinrange(3):
t=threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程执行完毕fortinthreads:
t.join()
print("All threads are finished.")

上述代码使用threading模块创建了3个线程,并让每个线程执行worker函数。每个线程都会打印出一个相应的消息。

首先,worker函数定义了线程的具体任务,它接受一个参数num,并在函数体内打印相应的消息。

然后,在主程序中,一个空的线程列表threads被创建。接下来的循环中,通过threading.Thread类创建了3个线程对象,并将它们的目标函数指定为worker函数,同时传递一个参数i作为worker函数的参数。每个线程对象被添加到threads列表中,并通过调用start()方法启动线程。

接着,通过循环遍历threads列表,并调用join()方法等待所有线程执行完毕,这样主线程会阻塞在这里直到所有子线程都执行完成。

最后,打印出"All threads are finished."的消息,表示所有线程已经执行完毕。

基于进程的并行(进程并行)是指在操作系统中创建多个进程,每个进程都有自己独立的内存空间、文件描述符等资源。不同进程之间的通信和同步相对困难,需要使用特定的机制(如管道、共享内存等)进行进程间通信(IPC)。进程并行常用于处理CPU密集型任务,例如数据处理、图像处理等。

由于每个进程都是独立的,进程之间相互隔离,一个进程的错误不会影响到其他进程的执行。因此,在进程并行中,更容易实现安全和稳定的并行执行。然而,由于每个进程都有自己独立的内存空间,进程之间的数据共享和通信相对复杂,需要使用IPC机制来实现进程间的数据交换。

另外,由于操作系统对进程的管理方式不同于线程,进程之间的切换和调度会产生一些开销,例如创建和销毁进程的开销,以及进程间的数据传输开销。此外,每个进程都需要占用一定的内存资源,因此进程并行可能会在内存占用方面带来一些开销。

importmultiprocessingdefworker(num):
"""进程执行的任务"""print("Worker %d is running..."%num)
# 创建3个进程,每个进程执行worker函数processes= []
foriinrange(3):
p=multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
# 等待所有进程执行完毕forpinprocesses:
p.join()
print("All processes are finished.")

上述代码使用multiprocessing模块创建了3个进程,并让每个进程执行worker函数。每个进程都会打印出一个相应的消息。

首先,worker函数定义了进程的具体任务,它接受一个参数num,并在函数体内打印相应的消息。

然后,在主程序中,一个空的进程列表processes被创建。接下来的循环中,通过multiprocessing.Process类创建了3个进程对象,并将它们的目标函数指定为worker函数,同时传递一个参数i作为worker函数的参数。每个进程对象被添加到processes列表中,并通过调用start()方法启动进程。

接着,通过循环遍历processes列表,并调用join()方法等待所有进程执行完毕,这样主进程会阻塞在这里直到所有子进程都执行完成。

最后,打印出"All processes are finished."的消息,表示所有进程已经执行完毕。

通过代码不难看出,Thread和Process都是Python中用于实现并发编程的重要模块,它们之间有些许不同之处,但也有很多相同的地方。

线程并行和进程并行的性能

一般来说,threading模块的性能比multiprocessing模块要好一些。这是因为threading模块是在用户级别上创建线程的,而multiprocessing模块则是通过操作系统级别的进程来实现并发的。因此,在多核CPU的情况下,使用threading模块可以更好地利用多核CPU的性能,从而获得更好的并发性能。

另外,由于threading模块是在用户级别上创建线程的,因此它的开销相对较小,创建和销毁线程的速度也比较快。相比之下,multiprocessing模块需要操作系统内核创建和维护进程,因此它的开销相对较大,创建和销毁进程的速度也比较慢。

需要留意的是,因为CPython全局解释器锁的原因,同一时刻只有一个线程可以执行Python代码,官方文档中有特意标注:

如果你想让你的应用更好地利用多核心计算机的计算资源,推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。 但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。

importthreadingimporttimeimportglobclassCSVReaderThread(threading.Thread):
def__init__(self, file_path, lock):
super().__init__()
self.file_path=file_pathself.result=Noneself.lock=lockdefrun(self):
# 获取锁self.lock.acquire()
try:
# 打开CSV文件并读取数据withopen(self.file_path, 'r') asf:
reader=csv.reader(f)
self.result= [rowforrowinreader]
# 打印读取的数据print(f"Data from {self.file_path} is completed.")
exceptExceptionase:
# 如果出现异常,则输出异常信息并释放锁,等待下一个线程继续执行print(f"Error reading {self.file_path}: {e}")
self.lock.release()
finally:
# 释放锁self.lock.release()
defread_csv_files():
files=glob.glob("*.csv") # 获取当前目录下所有CSV文件的路径# 创建多个线程并启动它们threads= []
max_threads=len(files) *5//4# 最多允许五个线程同时运行start_index=0end_index=max_threads+1whilestart_index<len(files):
ifend_index>len(files):
end_index=len(files)
else:
end_index+=1file_path=files[start_index:end_index] # 每次选取一个线程需要读取的文件路径范围t=CSVReaderThread(file_path[0], threading.Lock()) # 为每个线程创建一个锁对象threads.append(t)
t.start()
start_index+=end_index-start_index# 根据线程数量计算下一个线程开始的位置# 等待所有线程完成任务,包括中断的线程num_threads=len(threads)
foriinrange(num_threads):
threads[i].join() # 先等待正常完成的线程,再等待中断的线程

上述代码使用threading模块实现了一个读取多个CSV文件的并发任务。

首先定义了一个继承自threading.Thread的CSVReaderThread类,该类用于读取单个CSV文件的数据。在__init__方法中,传入文件路径和一个锁对象作为参数,并初始化了一些实例变量。

run方法是线程的主要执行逻辑。在执行之前,通过self.lock.acquire()获取锁,确保同一时间只有一个线程可以进入关键区域。然后尝试打开文件并读取其中的数据,使用csv.reader模块来解析CSV文件。读取的结果存储在self.result中。之后打印读取的数据,并在except块中处理可能发生的异常,打印错误信息并释放锁,以便其他线程可以继续执行。最后,无论是否发生异常,都通过self.lock.release()释放锁,确保下一个线程可以获取锁并执行。

read_csv_files函数是主程序的入口。首先使用glob.glob函数获取当前目录下所有的CSV文件路径,并存储在files列表中。接下来,创建一个空的线程列表threads,然后计算最大允许同时运行的线程数量,这里规定最多允许五个线程同时运行。使用start_index和end_index来迭代遍历files列表,并确定每个线程需要读取的文件范围。对于每个范围,创建一个CSVReaderThread对象,传入对应的文件路径和一个threading.Lock对象作为锁。将线程对象添加到threads列表中,并调用start()方法启动线程。

最后,使用num_threads记录线程的数量,通过循环遍历threads列表,依次调用join()方法等待所有线程完成任务,包括中断的线程。这样主线程会阻塞在这里,直到所有线程都执行完毕。

线程并行和进程并行的场景

Threading适用于小型、简单的并发任务,而Multiprocessing适用于大型、复杂的并发任务。

Threading

  • 线程之间的通信比较简单,通常只需要使用全局变量或共享数据结构来实现。
  • 线程之间的上下文切换比较频繁,因为每个线程都有自己的堆栈空间。
  • 线程的数量相对较少,通常在几十到几百个之间。

Multiprocessing

  • 需要在多个CPU核心上并行执行的任务,因为每个进程都有自己的独立内存空间和操作系统资源。
  • 需要在不同的计算机上运行的任务,因为每个进程都有自己的独立地址空间。
  • 需要高度同步的任务,因为每个进程都有自己的独立状态机。
  • 需要处理大量数据的任务,因为每个进程都可以使用独立的内存空间来存储数据。

比如我们可以使用threading模块创建多个线程来处理客户端请求,每个线程负责不同的部分,如接收请求、解析请求、处理响应;使用multiprocessing模块创建多个进程来处理大型数据集。每个进程都调用worker()函数来读取数据、进行处理和写入结果。我们可以使用multiprocessing模块提供的共享内存来传递数据和结果。

threading和multiprocessing的底层实现

Threading是通过使用线程来实现并发的。线程是轻量级的执行单位,它们在同一个进程中共享相同的内存空间。因为线程共享内存,所以在多线程编程中需要注意对共享资源的访问控制,以避免竞争条件和数据不一致问题。在Python中,线程通过threading模块来创建和管理。

Thread的底层实现是基于操作系统的原生线程机制,如POSIX线程(pthread)或Windows线程。这些原生线程由操作系统内核来管理调度。Python的解释器会在这些线程之间进行切换,以实现并发执行的效果。由于全局解释器锁(GIL)的存在,Python的多线程并不能实现真正的并行执行,GIL是一个Python语言级别的锁,它确保在任何时候只有一个线程可以执行Python字节码。

即使有多个CPU核心,也只有一个线程可以获得CPU时间片,因此无法充分利用多核CPU的优势。

相比之下,Multiprocessing是通过使用多个进程来实现并发的。每个进程都拥有自己独立的内存空间和解释器实例,它们通过进程间通信(IPC)来进行数据交换。由于每个进程都有自己的GIL,所以可以实现真正的并行执行。在Python中,进程通过multiprocessing模块来创建和管理。

Multiprocessing的底层实现依赖于操作系统提供的进程创建和管理机制。它使用操作系统级别的调度器来管理进程间的切换和调度。因为每个进程有独立的内存空间,所以它们之间的数据共享需要通过特定的IPC机制来实现,如管道、共享内存或消息队列。

目录
相关文章
|
2月前
|
数据采集 存储 JSON
Python爬取知乎评论:多线程与异步爬虫的性能优化
Python爬取知乎评论:多线程与异步爬虫的性能优化
|
2月前
|
人工智能 安全 调度
Python并发编程之线程同步详解
并发编程在Python中至关重要,线程同步确保多线程程序正确运行。本文详解线程同步机制,包括互斥锁、信号量、事件、条件变量和队列,探讨全局解释器锁(GIL)的影响及解决线程同步问题的最佳实践,如避免全局变量、使用线程安全数据结构、精细化锁的使用等。通过示例代码帮助开发者理解并提升多线程程序的性能与可靠性。
|
2月前
|
监控 编译器 Python
如何利用Python杀进程并保持驻留后台检测
本教程介绍如何使用Python编写进程监控与杀进程脚本,结合psutil库实现后台驻留、定时检测并强制终止指定进程。内容涵盖基础杀进程、多进程处理、自动退出机制、管理员权限启动及图形界面设计,并提供将脚本打包为exe的方法,适用于需持续清理顽固进程的场景。
|
2月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
3月前
|
JSON 算法 Java
打造终端里的下载利器:Python实现可恢复式多线程下载器
在数字时代,大文件下载已成为日常需求。本文教你用Python打造专业级下载器,支持断点续传、多线程加速、速度限制等功能,显著提升终端下载体验。内容涵盖智能续传、多线程分块下载、限速控制及Rich库构建现代终端界面,助你从零构建高效下载工具。
171 1
|
2月前
|
数据采集 存储 Java
多线程Python爬虫:加速大规模学术文献采集
多线程Python爬虫:加速大规模学术文献采集
|
3月前
|
数据采集 网络协议 前端开发
Python多线程爬虫模板:从原理到实战的完整指南
多线程爬虫通过并发请求大幅提升数据采集效率,适用于大规模网页抓取。本文详解其原理与实现,涵盖任务队列、线程池、会话保持、异常处理、反爬对抗等核心技术,并提供可扩展的Python模板代码,助力高效稳定的数据采集实践。
123 0
|
6月前
|
数据采集 存储 安全
Python爬虫实战:利用短效代理IP爬取京东母婴纸尿裤数据,多线程池并行处理方案详解
本文分享了一套结合青果网络短效代理IP和多线程池技术的电商数据爬取方案,针对京东母婴纸尿裤类目商品信息进行高效采集。通过动态代理IP规避访问限制,利用多线程提升抓取效率,同时确保数据采集的安全性和合法性。方案详细介绍了爬虫开发步骤、网页结构分析及代码实现,适用于大规模电商数据采集场景。
|
7月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
276 0

推荐镜像

更多