1、引言
小屌丝:鱼哥,你给我讲一讲并行处理呗。
小鱼:咋的, 你要进军xxx领域了呗?
小屌丝:我这不得与时俱进,紧跟鱼哥的步伐。
小鱼:别扯犊子。说实话,为啥要了解并行处理?
小屌丝:嘿嘿, 就是我想提速啊。
小鱼:… 什么提速?
小屌丝:额,鱼哥, 我指的是代码, 因为循环嵌套多了, 代码运行速度就慢了。
小鱼:哦哦哦, 我也没说别的 ,看你想的挺多啊。
小屌丝:… 我还不了解你。
小鱼:… 还想知道什么是算力吗?
小屌丝:想,非常想,真的好想。
小鱼:… ,我还是跟你说说并行处理吧。
2、并行处理
2.1 定义
并行处理是指同时使用多个处理器或计算机来执行一个任务或程序。
在并行处理中,任务被分解成多个子任务,这些子任务被同时处理,然后将结果合并以得出最终结果。
并行处理在许多领域都得到了广泛应用,如:
高性能计算
科学计算
机器学习
数据挖掘等
在这些领域中,处理大量数据或进行复杂的计算任务需要并行处理来提高效率和性能。
2.2 并行处理优缺点
优点包括:
提高计算效率和性能
减少等待时间和计算时间
提高系统的可靠性和可用性
有助于实现更高的资源利用率
缺点包括:
需要更多的处理器或计算机资源
可能会增加系统的复杂性和管理成本
可能会导致更高的错误率和故障率
2.3 并行处理的常用库
在Python中,常用的并行处理库有以下几个:
multiprocessing:这是Python标准库中的一个模块,提供了跨平台的多进程支持。它可以通过创建多个进程来实现并行处理。
threading:也是Python标准库中的一个模块,提供了多线程支持。与多进程不同,多线程是在同一个进程内的多个线程之间进行并行处理。
concurrent.futures:这是Python标准库中的一个模块,提供了高级的并行处理接口。它可以通过线程池或进程池来实现并行处理,简化了并行编程的复杂性。
joblib:这是一个第三方库,提供了简单而高效的并行处理工具。它可以通过多进程或多线程来实现并行处理,适用于科学计算和机器学习等领域。
dask:这是一个用于并行计算的第三方库,可以处理大规模数据集。它提供了类似于NumPy和Pandas的接口,可以在分布式集群上进行并行计算。
2.4 代码示例
这里,我们使用并行处理技术可以加速for循环的计算过程。
2.4.1 multiprocessing
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-06-30 # @Author : Carl_DJ ''' 实现功能: 使用multiprocessing ,进行并行计算。 ''' import multiprocessing #定义了一个square函数,用于计算一个数的平方值 def square(num): return num ** 2 if __name__ == '__main__': # 原始列表 numbers = [1, 2, 3, 4, 5] # 使用for循环计算平方值 result_serial = [] for num in numbers: result_serial.append(square(num)) print("Serial result:", result_serial) # 使用并行处理计算平方值 pool = multiprocessing.Pool() result_parallel = pool.map(square, numbers) pool.close() pool.join() print("Parallel result:", result_parallel)
解析:
for循环:
是逐个计算每个元素的平方值,并将结果存储在result_serial列表中,也就是串行处理;
multiprocessing.Pool():
创建了一个进程池,该进程池可以并行处理任务;
通过调用pool.map()方法,我们将square函数应用于numbers列表中的每个元素,并将结果存储在result_parallel列表中;
这是并行计算的方式。
2.4.2 concurrent.futures
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-06-30 # @Author : Carl_DJ ''' 实现功能: 使用concurrent.futures ,进行并行计算。 ''' import concurrent.futures def square(n): return n * n # 串行处理 def serial_processing(numbers): results = [] for num in numbers: results.append(square(num)) return results # 并行处理 def parallel_processing(numbers): results = [] with concurrent.futures.ThreadPoolExecutor() as executor: # 提交任务给线程池 futures = [executor.submit(square, num) for num in numbers] # 获取结果 for future in concurrent.futures.as_completed(futures): results.append(future.result()) return results if __name__ == '__main__': numbers = [1, 2, 3, 4, 5] # 串行处理 serial_results = serial_processing(numbers) print("Serial results:", serial_results) # 并行处理 parallel_results = parallel_processing(numbers) print("Parallel results:", parallel_results)
解析
for循环:
串行处理函数serial_processing使用for循环逐个计算每个元素的平方值,
并将结果存储在结果列表中;
并行处理
并行处理函数parallel_processing使用concurrent.futures.ThreadPoolExecutor创建一个线程池,
并使用executor.submit方法将任务提交给线程池进行并行处理,
最后,使用concurrent.futures.as_completed方法获取每个任务的结果,并将结果存储在结果列表中。
2.4.3 joblib
同样, 我们使用并行处理技术可以加速for循环的计算过程。
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-06-30 # @Author : Carl_DJ ''' 实现功能: 使用joblib ,进行并行计算。 ''' from joblib import Parallel, delayed def square(x): return x**2 # 使用for循环串行处理 def serial_processing(input_list): result = [] for num in input_list: result.append(square(num)) return result # 使用joblib并行处理 def parallel_processing(input_list): num_cores = 4 # 设置并行处理的核心数 result = Parallel(n_jobs=num_cores)(delayed(square)(num) for num in input_list) return result # 测试代码 input_list = [1, 2, 3, 4, 5] print("Serial processing:", serial_processing(input_list)) print("Parallel processing:", parallel_processing(input_list))
2.4.4 threading
关于threading , 我们再熟悉不过了 。
这里,小鱼直接上代码。
代码示例
# -*- coding:utf-8 -*- # @Time : 2023-06-30 # @Author : Carl_DJ ''' 实现功能: 使用threading,进行并行计算。 ''' import threading #square_list_serial函数,使用for循环串行处理 def square_list_serial(input_list): result = [] for num in input_list: result.append(num ** 2) return result input_list = [1, 2, 3, 4, 5] result_serial = square_list_serial(input_list) print(result_serial) #并行处理 def square_list_parallel(input_list, result, index): for i, num in enumerate(input_list): result[index + i] = num ** 2 def square_list_parallel_threads(input_list): result = [0] * len(input_list) num_threads = 4 # 设置线程数量 threads = [] chunk_size = len(input_list) // num_threads for i in range(num_threads): start = i * chunk_size end = start + chunk_size if i < num_threads - 1 else len(input_list) thread = threading.Thread(target=square_list_parallel, args=(input_list[start:end], result, start)) threads.append(thread) thread.start() for thread in threads: thread.join() return result input_list = [1, 2, 3, 4, 5] result_parallel = square_list_parallel_threads(input_list) print(result_parallel)
当然,关于threading的并行处理,小鱼也写过一些博文,如:
《接口测试开发之:Python3,订单并发性能实战》
《Python3,我用这种方式讲解python模块,80岁的奶奶都说能理解。建议收藏 ~ ~》
…
3、总结
看到这里,今天的分享差不多就结束了。
今天主要介绍了Python常用的并行处理的 库:
multiprocessing
threading:
concurrent.futures
joblib
dask
并行处理的库并不止这几个, 我只是列举了几个常用的。
如果你有更好,或者你认为更奈斯的库,也可以在评论区留言, 一起讨论。