Python使用多进程并行加速业务操作 完整代码

简介: Python使用多进程并行加速业务操作 完整代码

Python使用多进程并行加速业务操作 完整代码






需求分析


  最近在对一个数据集进行处理,共2000条,每条去调一个第三方接口,耗时7-10秒。单线程处理一次要3.9-5.6小时,于是想着用多进程加速一下。 需求大致如下:


 1、能配置进程数目


 2、能加载要处理的数据


 3、能打印完善的日志


 4、多进程能共享处理后的数据结果,方便最终获取/导出


完整代码


  Python代码如下:(其中需要修改的地方加了TODO)在win和linux上都可以用


import logging
import math
import multiprocessing
import time
import pandas as pd
from contextlib import contextmanager
import threading
# 设置日志配置
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO)
# 定义超时异常
class TimeoutException(Exception): pass
# 超时控制
@contextmanager
def time_limit(seconds):
    timer = threading.Timer(seconds, lambda: _raise_timeout_exception())
    def _raise_timeout_exception():
        raise TimeoutException("Timed out!")
    try:
        timer.start()
        yield
    finally:
        timer.cancel()
def process_data(i, data, results, lock):
    logging.info('------group: ' + str(i) + '------')
    logging.info('------len: ' + str(len(data)) + '------')
    for _, row in data.iterrows():
        if _ % (math.ceil(len(data) / 10.0)) == 0:
            logging.info('------group' + str(i) + ': ' + str(_) + '/' + str(len(data)) + '------')
        try:
            # 超时限制 TODO 秒数
            with time_limit(20):
                # 模拟任务 TODO 任务
                time.sleep(1)
                # 使用锁来保证对结果列表的进程安全访问
                lock.acquire()
                try:
                    # 将结果添加到共享的结果列表中 TODO 收集结果
                    results.append(row['id'])
                finally:
                    lock.release()
        except Exception as e:
            logging.info('------err: ' + str(e) + '------')
if __name__ == '__main__':
    # 手动设置并行进程数目 TODO 进程数目
    group_num = 8
    # 从电脑配置中设置并行进程数目
    # group_num = multiprocessing.cpu_count()
    # 读取数据 TODO 数据源
    data = pd.read_excel('data.xlsx')
    # 使用pandas平均划分数据
    grouped_data = data.groupby(data.index % group_num)
    # 定义共享的结果列表
    manager = multiprocessing.Manager()
    results = manager.list()
    # 创建锁
    lock = multiprocessing.Lock()
    start_time = time.time()
    # 定义多进程
    processes = []
    for i in range(group_num):
        p = multiprocessing.Process(target=process_data,
                                    args=(i, grouped_data.get_group(i).reset_index(), results, lock))
        processes.append(p)
    # 启动
    for _p in processes:
        _p.start()
    for _p in processes:
        _p.join()
    end_time = time.time()
    execution_time = end_time - start_time
    # 打印数据
    print(f"代码执行时间:{execution_time}秒")
    print(results)


  data.xlsx里面的数据是随便打的:


image.png


本demo性能分析


  16核CPU,执行上述代码,其中任务部分用了time.sleep(1)停了1秒,耗时分析如下:


进程数 耗时
1 29.317383289337158秒
4 8.288025140762329秒
8 5.77861475944519秒
14 4.941734313964844秒
16 5.262717008590698秒


可以看到加了多进程,加速效果还是比较明显的。


Python中单线程、多线程和多进程的效率对比实验


此处参考:http://blog.atomicer.cn/2016/09/30/Python


 我们知道,线程操作、进程操作一般分为CPU密集型操作、IO密集型操作、网络请求密集型操作。


 资料显示,如果多线程的进程是CPU密集型的,那多线程并不能有多少效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降,推荐使用多进程;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。所以我们根据实验对比不同场景的效率:


image.png


通过上面的结果,我们可以看到:


 多线程在IO密集型的操作下似乎也没有很大的优势(也许IO操作的任务再繁重一些就能体现出优势),在CPU密集型的操作下明显地比单线程线性执行性能更差,但是对于网络请求这种忙等阻塞线程的操作,多线程的优势便非常显著了


 多进程无论是在CPU密集型还是IO密集型以及网络请求密集型(经常发生线程阻塞的操作)中,都能体现出性能的优势。不过在类似网络请求密集型的操作上,与多线程相差无几,但却更占用CPU等资源,所以对于这种情况下,我们可以选择多线程来执行。

相关文章
|
1天前
|
SQL 并行计算 API
Dask是一个用于并行计算的Python库,它提供了类似于Pandas和NumPy的API,但能够在大型数据集上进行并行计算。
Dask是一个用于并行计算的Python库,它提供了类似于Pandas和NumPy的API,但能够在大型数据集上进行并行计算。
19 9
|
1天前
|
机器学习/深度学习 人工智能 数据挖掘
Numba是一个Python库,用于对Python代码进行即时(JIT)编译,以便在硬件上高效执行。
Numba是一个Python库,用于对Python代码进行即时(JIT)编译,以便在硬件上高效执行。
20 9
|
1天前
|
机器人 Shell 开发者
`roslibpy`是一个Python库,它允许非ROS(Robot Operating System)环境(如Web浏览器、移动应用等)与ROS环境进行交互。通过使用`roslibpy`,开发者可以编写Python代码来远程控制ROS节点,发布和订阅话题,以及调用服务。
`roslibpy`是一个Python库,它允许非ROS(Robot Operating System)环境(如Web浏览器、移动应用等)与ROS环境进行交互。通过使用`roslibpy`,开发者可以编写Python代码来远程控制ROS节点,发布和订阅话题,以及调用服务。
18 8
|
1天前
|
存储 缓存 算法
如何优化Python代码?
【7月更文挑战第14天】如何优化Python代码?
13 6
|
1天前
|
消息中间件 安全 数据处理
Python中的并发编程:理解多线程与多进程的区别与应用
在Python编程中,理解并发编程是提高程序性能和响应速度的关键。本文将深入探讨多线程和多进程的区别、适用场景及实际应用,帮助开发者更好地利用Python进行并发编程。
|
1天前
|
机器学习/深度学习 TensorFlow API
Keras是一个高层神经网络API,由Python编写,并能够在TensorFlow、Theano或CNTK之上运行。Keras的设计初衷是支持快速实验,能够用最少的代码实现想法,并且能够方便地在CPU和GPU上运行。
Keras是一个高层神经网络API,由Python编写,并能够在TensorFlow、Theano或CNTK之上运行。Keras的设计初衷是支持快速实验,能够用最少的代码实现想法,并且能够方便地在CPU和GPU上运行。
9 0
|
1天前
|
Unix Linux Python
`subprocess`模块是Python中用于生成新进程、连接到它们的输入/输出/错误管道,并获取它们的返回(退出)代码的模块。
`subprocess`模块是Python中用于生成新进程、连接到它们的输入/输出/错误管道,并获取它们的返回(退出)代码的模块。
6 0
|
1天前
|
Python
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
4 0
|
2月前
|
算法 编译器 开发者
如何提高Python代码的性能:优化技巧与实践
本文探讨了如何提高Python代码的性能,重点介绍了一些优化技巧与实践方法。通过使用适当的数据结构、算法和编程范式,以及利用Python内置的性能优化工具,可以有效地提升Python程序的执行效率,从而提升整体应用性能。本文将针对不同场景和需求,分享一些实用的优化技巧,并通过示例代码和性能测试结果加以说明。
|
28天前
|
算法 搜索推荐 开发者
解锁Python代码的速度之谜:性能瓶颈分析与优化实践
探索Python性能优化,关注解释器开销、GIL、数据结构选择及I/O操作。使用cProfile和line_profiler定位瓶颈,通过Cython减少解释器影响,多进程避开GIL,优化算法与数据结构,以及借助asyncio提升I/O效率。通过精准优化,Python可应对高性能计算挑战。【6月更文挑战第15天】
38 1