线程并行和进程并行的概念
基于线程的并行(线程并行)是指在一个进程中创建多个线程,这些线程共享进程的资源,如内存空间、文件描述符等。由于多个线程共享同一个进程,因此线程之间的通信和同步相对容易实现。线程并行常用于处理I/O密集型任务,例如网络请求、文件读写等。
然而,线程并行也存在一些问题。首先,由于多个线程共享同一个进程,一个线程的错误可能会影响到其他线程的执行,导致整个进程崩溃或数据不一致。因此,在线程并行中需要特别关注线程安全性,使用适当的同步机制(如锁、信号量等)来保护共享资源的访问。其次,由于操作系统对进程和线程的管理方式不同,线程之间的切换和调度会带来一些开销,例如上下文切换的开销。此外,由于全局解释器锁(GIL)的存在,Python中的线程无法实现真正的并行执行,对于CPU密集型任务并没有性能上的优势。
importthreadingdefworker(num): """线程执行的任务"""print("Worker %d is running..."%num) # 创建3个线程,每个线程执行worker函数threads= [] foriinrange(3): t=threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() # 等待所有线程执行完毕fortinthreads: t.join() print("All threads are finished.")
上述代码使用threading模块创建了3个线程,并让每个线程执行worker函数。每个线程都会打印出一个相应的消息。
首先,worker函数定义了线程的具体任务,它接受一个参数num,并在函数体内打印相应的消息。
然后,在主程序中,一个空的线程列表threads被创建。接下来的循环中,通过threading.Thread类创建了3个线程对象,并将它们的目标函数指定为worker函数,同时传递一个参数i作为worker函数的参数。每个线程对象被添加到threads列表中,并通过调用start()方法启动线程。
接着,通过循环遍历threads列表,并调用join()方法等待所有线程执行完毕,这样主线程会阻塞在这里直到所有子线程都执行完成。
最后,打印出"All threads are finished."的消息,表示所有线程已经执行完毕。
基于进程的并行(进程并行)是指在操作系统中创建多个进程,每个进程都有自己独立的内存空间、文件描述符等资源。不同进程之间的通信和同步相对困难,需要使用特定的机制(如管道、共享内存等)进行进程间通信(IPC)。进程并行常用于处理CPU密集型任务,例如数据处理、图像处理等。
由于每个进程都是独立的,进程之间相互隔离,一个进程的错误不会影响到其他进程的执行。因此,在进程并行中,更容易实现安全和稳定的并行执行。然而,由于每个进程都有自己独立的内存空间,进程之间的数据共享和通信相对复杂,需要使用IPC机制来实现进程间的数据交换。
另外,由于操作系统对进程的管理方式不同于线程,进程之间的切换和调度会产生一些开销,例如创建和销毁进程的开销,以及进程间的数据传输开销。此外,每个进程都需要占用一定的内存资源,因此进程并行可能会在内存占用方面带来一些开销。
importmultiprocessingdefworker(num): """进程执行的任务"""print("Worker %d is running..."%num) # 创建3个进程,每个进程执行worker函数processes= [] foriinrange(3): p=multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start() # 等待所有进程执行完毕forpinprocesses: p.join() print("All processes are finished.")
上述代码使用multiprocessing模块创建了3个进程,并让每个进程执行worker函数。每个进程都会打印出一个相应的消息。
首先,worker函数定义了进程的具体任务,它接受一个参数num,并在函数体内打印相应的消息。
然后,在主程序中,一个空的进程列表processes被创建。接下来的循环中,通过multiprocessing.Process类创建了3个进程对象,并将它们的目标函数指定为worker函数,同时传递一个参数i作为worker函数的参数。每个进程对象被添加到processes列表中,并通过调用start()方法启动进程。
接着,通过循环遍历processes列表,并调用join()方法等待所有进程执行完毕,这样主进程会阻塞在这里直到所有子进程都执行完成。
最后,打印出"All processes are finished."的消息,表示所有进程已经执行完毕。
通过代码不难看出,Thread和Process都是Python中用于实现并发编程的重要模块,它们之间有些许不同之处,但也有很多相同的地方。
线程并行和进程并行的性能
一般来说,threading模块的性能比multiprocessing模块要好一些。这是因为threading模块是在用户级别上创建线程的,而multiprocessing模块则是通过操作系统级别的进程来实现并发的。因此,在多核CPU的情况下,使用threading模块可以更好地利用多核CPU的性能,从而获得更好的并发性能。
另外,由于threading模块是在用户级别上创建线程的,因此它的开销相对较小,创建和销毁线程的速度也比较快。相比之下,multiprocessing模块需要操作系统内核创建和维护进程,因此它的开销相对较大,创建和销毁进程的速度也比较慢。
需要留意的是,因为CPython全局解释器锁的原因,同一时刻只有一个线程可以执行Python代码,官方文档中有特意标注:
如果你想让你的应用更好地利用多核心计算机的计算资源,推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。 但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。
importthreadingimporttimeimportglobclassCSVReaderThread(threading.Thread): def__init__(self, file_path, lock): super().__init__() self.file_path=file_pathself.result=Noneself.lock=lockdefrun(self): # 获取锁self.lock.acquire() try: # 打开CSV文件并读取数据withopen(self.file_path, 'r') asf: reader=csv.reader(f) self.result= [rowforrowinreader] # 打印读取的数据print(f"Data from {self.file_path} is completed.") exceptExceptionase: # 如果出现异常,则输出异常信息并释放锁,等待下一个线程继续执行print(f"Error reading {self.file_path}: {e}") self.lock.release() finally: # 释放锁self.lock.release() defread_csv_files(): files=glob.glob("*.csv") # 获取当前目录下所有CSV文件的路径# 创建多个线程并启动它们threads= [] max_threads=len(files) *5//4# 最多允许五个线程同时运行start_index=0end_index=max_threads+1whilestart_index<len(files): ifend_index>len(files): end_index=len(files) else: end_index+=1file_path=files[start_index:end_index] # 每次选取一个线程需要读取的文件路径范围t=CSVReaderThread(file_path[0], threading.Lock()) # 为每个线程创建一个锁对象threads.append(t) t.start() start_index+=end_index-start_index# 根据线程数量计算下一个线程开始的位置# 等待所有线程完成任务,包括中断的线程num_threads=len(threads) foriinrange(num_threads): threads[i].join() # 先等待正常完成的线程,再等待中断的线程
上述代码使用threading模块实现了一个读取多个CSV文件的并发任务。
首先定义了一个继承自threading.Thread的CSVReaderThread类,该类用于读取单个CSV文件的数据。在__init__方法中,传入文件路径和一个锁对象作为参数,并初始化了一些实例变量。
run方法是线程的主要执行逻辑。在执行之前,通过self.lock.acquire()获取锁,确保同一时间只有一个线程可以进入关键区域。然后尝试打开文件并读取其中的数据,使用csv.reader模块来解析CSV文件。读取的结果存储在self.result中。之后打印读取的数据,并在except块中处理可能发生的异常,打印错误信息并释放锁,以便其他线程可以继续执行。最后,无论是否发生异常,都通过self.lock.release()释放锁,确保下一个线程可以获取锁并执行。
read_csv_files函数是主程序的入口。首先使用glob.glob函数获取当前目录下所有的CSV文件路径,并存储在files列表中。接下来,创建一个空的线程列表threads,然后计算最大允许同时运行的线程数量,这里规定最多允许五个线程同时运行。使用start_index和end_index来迭代遍历files列表,并确定每个线程需要读取的文件范围。对于每个范围,创建一个CSVReaderThread对象,传入对应的文件路径和一个threading.Lock对象作为锁。将线程对象添加到threads列表中,并调用start()方法启动线程。
最后,使用num_threads记录线程的数量,通过循环遍历threads列表,依次调用join()方法等待所有线程完成任务,包括中断的线程。这样主线程会阻塞在这里,直到所有线程都执行完毕。
线程并行和进程并行的场景
Threading适用于小型、简单的并发任务,而Multiprocessing适用于大型、复杂的并发任务。
Threading
- 线程之间的通信比较简单,通常只需要使用全局变量或共享数据结构来实现。
- 线程之间的上下文切换比较频繁,因为每个线程都有自己的堆栈空间。
- 线程的数量相对较少,通常在几十到几百个之间。
Multiprocessing
- 需要在多个CPU核心上并行执行的任务,因为每个进程都有自己的独立内存空间和操作系统资源。
- 需要在不同的计算机上运行的任务,因为每个进程都有自己的独立地址空间。
- 需要高度同步的任务,因为每个进程都有自己的独立状态机。
- 需要处理大量数据的任务,因为每个进程都可以使用独立的内存空间来存储数据。
比如我们可以使用threading模块创建多个线程来处理客户端请求,每个线程负责不同的部分,如接收请求、解析请求、处理响应;使用multiprocessing模块创建多个进程来处理大型数据集。每个进程都调用worker()函数来读取数据、进行处理和写入结果。我们可以使用multiprocessing模块提供的共享内存来传递数据和结果。
threading和multiprocessing的底层实现
Threading是通过使用线程来实现并发的。线程是轻量级的执行单位,它们在同一个进程中共享相同的内存空间。因为线程共享内存,所以在多线程编程中需要注意对共享资源的访问控制,以避免竞争条件和数据不一致问题。在Python中,线程通过threading模块来创建和管理。
Thread的底层实现是基于操作系统的原生线程机制,如POSIX线程(pthread)或Windows线程。这些原生线程由操作系统内核来管理调度。Python的解释器会在这些线程之间进行切换,以实现并发执行的效果。由于全局解释器锁(GIL)的存在,Python的多线程并不能实现真正的并行执行,GIL是一个Python语言级别的锁,它确保在任何时候只有一个线程可以执行Python字节码。
即使有多个CPU核心,也只有一个线程可以获得CPU时间片,因此无法充分利用多核CPU的优势。
相比之下,Multiprocessing是通过使用多个进程来实现并发的。每个进程都拥有自己独立的内存空间和解释器实例,它们通过进程间通信(IPC)来进行数据交换。由于每个进程都有自己的GIL,所以可以实现真正的并行执行。在Python中,进程通过multiprocessing模块来创建和管理。
Multiprocessing的底层实现依赖于操作系统提供的进程创建和管理机制。它使用操作系统级别的调度器来管理进程间的切换和调度。因为每个进程有独立的内存空间,所以它们之间的数据共享需要通过特定的IPC机制来实现,如管道、共享内存或消息队列。