如何优雅地实现Python通用多线程/进程并行模块

简介:

当单线程性能不足时,我们通常会使用多线程/多进程去加速运行。而这些代码往往多得令人绝望,需要考虑:

  • 如何创建线程执行的函数?
  • 如何收集结果?若希望结果从子线程返回主线程,则还要使用队列
  • 如何取消执行? 直接kill掉所有线程?信号如何传递?
  • 是否需要线程池? 否则反复创建线程的成本过高了

不仅如此,若改为多进程或协程,代码还要继续修改。若多处使用并行,则这些代码还会重复很多遍,非常痛苦。

于是,我们考虑将并行的所有逻辑封装到一个模块之内,向外部提供像串行执行一样的编程体验,还能彻底解决上面所述的疑难问题。所有代码不足180行。

GitHub地址:

https://github.com/ferventdesert/multi_yielder

使用时非常简洁:

def xprint(x): 
    time.sleep(1)  # mock a long time task 
    yield x*x   
i=0
for item in multi_yield(xrange(100)),xprint, process_mode,3:
    i+=1
    print(item)
    if i>10:
        break

上面的代码会使用三个进程,并行地打印1-10的平方。当打印完10之后,进程自动回收释放。就像串行程序一样简单。

1. 先实现串行任务

我们通常会将任务分割为很多个子块,从而方便并行。因此可以将任务抽象为生成器。类似下面的操作,每个seed都是任务的种子。

def get_generator():
    for seed in 100:
        yield seed

任务本身的定义,则可以通过一个接受种子的函数来实现:

def worker(seed):
    # some long time task
    return seed*seed # just example

那么实现串行任务就像这样:

for seed in get_generator(n):
    print worker(seed)

进一步地,可以将其抽象为下面的函数:

def serial_yield(genenator,worker):
    for seed in generator():
        yield worker(seed)

该函数通过传入生成器函数(generator)和任务的定义(worker函数),即可再返回一个生成器。消费时:

for result in serial_yield(your_genenator, your_worker):
    print(result)

我们看到,通过定义高阶函数,serial_yield就像map函数,对seed进行加工后输出。

2. 定义并行任务

考虑如下场景: boss负责分发任务到任务队列,多个worker从任务队列捞数据,处理完之后,再写入结果队列。主线程从结果队列中取结果即可。

我们定义如下几种执行模式:

  • async: 异步/多协程
  • thread: 多线程
  • process: 多进程

使用Python创建worker的代码如下,func是任务的定义(是个函数)

    def factory(func, args=None, name='task'):
        if args is None:
            args = ()
        if mode == process_mode:
            return multiprocessing.Process(name=name, target=func, args=args)
        if mode == thread_mode:
            import threading
            t = threading.Thread(name=name, target=func, args=args)
            t.daemon = True
            return t
        if mode == async_mode:
            import gevent
            return gevent.spawn(func, *args)

创建队列的代码如下,注意seeds可能是无穷流,因此需要限定队列的长度,当入队列发现队列已满时,则任务需要阻塞。

  def queue_factory(size):
        if mode == process_mode:
            return multiprocessing.Queue(size)
        elif mode == thread_mode:
            return Queue(size)
        elif mode == async_mode:
            from gevent import queue
            return queue.Queue(size)

什么时候任务可以终止? 我们罗列如下几种情况:

  • 所有的seed都已经被消费完了
  • 外部传入了结束请求

对第一种情况,我们让boss在seed消费完之后,在队列里放入多个Empty标志,worker收到Empty之后,就会自动退出,下面是boss的实现逻辑:

    def _boss(task_generator, task_queue, worker_count):
        for task in task_generator:
            task_queue.put(task)
        for i in range(worker_count):
            task_queue.put(Empty)
        print('worker boss finished')

再定义worker的逻辑:

    def _worker(task_queue, result_queue, gene_func):
        import time
        try:
            while not stop_wrapper.is_stop():
                if task_queue.empty():
                    time.sleep(0.01)
                    continue
                task = task.get()
                if task == Empty:
                    result_queue.put(Empty)
                    break
                if task == Stop:
                    break
                for item in gene_func(task):
                    result_queue.put(item)
            print ('worker worker is stop')
        except Exception as e:
            logging.exception(e)
            print ('worker exception, quit')

简单吧?但是这样会有问题,这个后面再说,我们把剩余的代码写完。

再定义multi_yield的主要代码。 代码非常好理解,创建任务和结果队列,再创建boss和worker线程(或进程/协程)并启动,之后不停地从结果队列里取数据就可以了。

 def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10):
        workers = []
        result_queue = queue_factory(queue_size)
        task_queue = queue_factory(queue_size)

        main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss')
        for process_id in range(0, worker_count):
            name = 'worker_%s' % (process_id)
            p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name)
            workers.append(p)
        main.start()

        for r in workers:
            r.start()
        count = 0
        while not should_stop():
            data = result_queue.get()
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data

这样从外部消费时,即可:

def xprint(x):
    time.sleep(1)
    yield x

i=0
for item in multi_yield(xprint, process_mode,3,xrange(100)):
    i+=1
    print(item)
    if i>10:
        break

这样我们就实现了一个与serial_yield功能类似的multi_yield。可以定义多个worker,从队列中领任务,而不需重复地创建和销毁,更不需要线程池。当然,代码不完全,运行时可能出问题。但以上代码已经说明了核心的功能。完整的代码可以在文末找到。

但是你也会发现很严重的问题:

  • 当从外部break时,内部的线程并不会自动停止
  • 我们无法判断队列的长度,若队列满,那么put操作会永远卡死在那里,任务都不会结束。

3. 改进任务停止逻辑

最开始想到的,是通过在multi_yield函数参数中添加一个返回bool的函数,这样当外部break时,同时将该函数的返回值置为True,内部检测到该标志位后强制退出。伪代码如下:

_stop=False
def can_stop():
    return _stop

for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop):
    i+=1
    print(item)
    if i>10:
        _stop=True
        break

但这样并不优雅,引入了更多的函数作为参数,还必须手工控制变量值,非常繁琐。在多进程模式下,stop标志位还如何解决?

我们希望外部在循环时执行了break后,会自动通知内部的生成器。实现方法似乎就是with语句,即contextmanager.

我们实现以下的包装类:

class Yielder(object):
    def __init__(self, dispose):
        self.dispose = dispose

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.dispose()

它实现了with的原语,参数是dispose函数,作用是退出with代码块后的回收逻辑。

由于值类型的标志位无法在多进程环境中传递,我们再创建StopWrapper类,用于管理停止标志和回收资源:

   class Stop_Wrapper():
        def __init__(self):
            self.stop_flag = False
            self.workers=[]

        def is_stop(self):
            return self.stop_flag

        def stop(self):
            self.stop_flag = True
            for process in self.workers:
                if isinstance(process,multiprocessing.Process):
                    process.terminate()
                    

最后的问题是,如何解决队列满或空时,put/get的无限等待问题呢?考虑包装一下put/get:包装在while True之中,每隔两秒get/put,这样即使阻塞时,也能保证可以检查退出标志位。所有线程在主线程结束后,最迟也能在2s内自动退出。

def safe_queue_get(queue, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            data = queue.get(timeout=timeout)
            return data
        except:
            continue


def safe_queue_put(queue, item, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            queue.put(item, timeout=timeout)
            return item
        except:
            continue
            

如何使用呢?我们只需在multi_yield的yield语句之外加上一行就可以了:

    with Yielder(stop_wrapper.stop):
        # create queue,boss,worker, then start all
        # ignore repeat code
        while not should_stop():
            data = safe_queue_get(result_queue, should_stop)
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data

仔细阅读上面的代码, 外部循环时退出循环,则会自动触发stop_wrapper的stop操作,回收全部资源,而不需通过外部的标志位传递!这样调用方在心智完全不需有额外的负担。

实现生成器和上下文管理器的编程语言,都可以通过上述方式实现自动协程资源回收。笔者也实现了一个C#版本的,有兴趣欢迎交流。

这样,我们就能像文章开头那样,实现并行的迭代器操作了。

4. 结语

完整代码在:

https://github.com/ferventdesert/multi_yielder/blob/master/src/multi_yielder.py

一些实现的细节很有趣,我们借助在函数中定义函数,可以不用复杂的类去承担职责,而仅仅只需函数。而类似的思想,在函数式编程中非常常见。

该工具已经被笔者的流式语言etlpy所集成。但是依然有较多改进的空间,如没有集成分布式执行模式。

欢迎留言交流。


作者:热情的沙漠
出处:http://www.cnblogs.com/buptzym/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

分类: 开源项目


本文转自FerventDesert博客园博客,原文链接:http://www.cnblogs.com/buptzym/p/6933868.html,如需转载请自行联系原作者
目录
相关文章
|
1月前
|
调度 开发者 Python
深入浅出操作系统:进程与线程的奥秘
在数字世界的底层,操作系统扮演着不可或缺的角色。它如同一位高效的管家,协调和控制着计算机硬件与软件资源。本文将拨开迷雾,深入探索操作系统中两个核心概念——进程与线程。我们将从它们的诞生谈起,逐步剖析它们的本质、区别以及如何影响我们日常使用的应用程序性能。通过简单的比喻,我们将理解这些看似抽象的概念,并学会如何在编程实践中高效利用进程与线程。准备好跟随我一起,揭开操作系统的神秘面纱,让我们的代码运行得更加流畅吧!
|
1月前
|
消息中间件 Unix Linux
【C语言】进程和线程详解
在现代操作系统中,进程和线程是实现并发执行的两种主要方式。理解它们的区别和各自的应用场景对于编写高效的并发程序至关重要。
62 6
|
1月前
|
调度 开发者
深入理解:进程与线程的本质差异
在操作系统和计算机编程领域,进程和线程是两个核心概念。它们在程序执行和资源管理中扮演着至关重要的角色。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
63 5
|
1月前
|
算法 调度 开发者
深入理解操作系统:进程与线程的管理
在数字世界的复杂编织中,操作系统如同一位精明的指挥家,协调着每一个音符的奏响。本篇文章将带领读者穿越操作系统的幕后,探索进程与线程管理的奥秘。从进程的诞生到线程的舞蹈,我们将一起见证这场微观世界的华丽变奏。通过深入浅出的解释和生动的比喻,本文旨在揭示操作系统如何高效地处理多任务,确保系统的稳定性和效率。让我们一起跟随代码的步伐,走进操作系统的内心世界。
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
60 4
|
2月前
|
数据采集 存储 数据处理
Python中的多线程编程及其在数据处理中的应用
本文深入探讨了Python中多线程编程的概念、原理和实现方法,并详细介绍了其在数据处理领域的应用。通过对比单线程与多线程的性能差异,展示了多线程编程在提升程序运行效率方面的显著优势。文章还提供了实际案例,帮助读者更好地理解和掌握多线程编程技术。
|
2月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
2月前
|
Linux 调度 C语言
深入理解操作系统:进程和线程的管理
【10月更文挑战第32天】本文旨在通过浅显易懂的语言和实际代码示例,带领读者探索操作系统中进程与线程的奥秘。我们将从基础知识出发,逐步深入到它们在操作系统中的实现和管理机制,最终通过实践加深对这一核心概念的理解。无论你是编程新手还是希望复习相关知识的资深开发者,这篇文章都将为你提供有价值的见解。
|
2月前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
33 1
|
2月前
深入理解操作系统:进程与线程的管理
【10月更文挑战第30天】操作系统是计算机系统的核心,它负责管理计算机硬件资源,为应用程序提供基础服务。本文将深入探讨操作系统中进程和线程的概念、区别以及它们在资源管理中的作用。通过本文的学习,读者将能够更好地理解操作系统的工作原理,并掌握进程和线程的管理技巧。
46 2
下一篇
开通oss服务