《Python分布式计算》第2章 异步编程 (Distributed Computing with Python)

简介: 序言第1章 并行和分布式计算介绍第2章 异步编程第3章 Python的并行计算第4章 Celery分布式应用第5章 云平台部署Python第6章 超级计算机群使用Python第7章 测试和调试分布式应用第8章 继续学习从本章开始,终于开始写代码了!本书中所有的代码都适用于Python 3.5及以上版本。

序言
第1章 并行和分布式计算介绍
第2章 异步编程
第3章 Python的并行计算
第4章 Celery分布式应用
第5章 云平台部署Python
第6章 超级计算机群使用Python
第7章 测试和调试分布式应用
第8章 继续学习


从本章开始,终于开始写代码了!本书中所有的代码都适用于Python 3.5及以上版本。当模块、语句或语法结构不适用于以前的版本时(比如Python 2.7),会在本章中指出。进行一些修改,本书代码也可以运行在Python 2.x版本上。

先回顾下上一章的知识。我们已经学到,改变算法的结构可以让其运行在本地计算机,或运行在集群上。即使是在一台计算机上运行,我们也可以使用多线程或多进程,让子程序运行在多个CPU上。

现在暂时不考虑多CPU,先看一下单线程/进程。与传统的同步编程相比,异步编程或非阻塞编程,可以使性能获得极大提高。

任何包含多任务的程序,它的每个每个任务都在执行一个操作。我们可以把这些任务当做功能或方法,也可以把几个任务合并看做一个功能。例如,将总任务细分、在屏幕打印内容、或从网络抓取信息,等等。

看一下传统程序中的这些任务是如何使用一个CPU的。考虑一个原生的实例,它有四个任务:A、B、C、D。这些任务具体是做什么在这里不重要。我们可以假设这四个任务是关于计算和I/O操作的。安排这四个任务的最直观的方式是序列化。下图展示了这四个任务对CPU的使用:

img_892cfa3e5d09b43282ff0fdcf92b7dd6.png

我们看到,当每个任务都执行I/O操作时,CPU处于空闲状态,等待任务进行计算。这使得CPU大部分时间处于闲置状态。

重点是,从不同组件,例如硬盘、内存和网络,向CPU传递数据的速度相差极大(几个数量级)。

这就会造成任何进行大量I/O操作(访问硬盘、网络通讯等等)的代码都极有可能造成CPU大部分时间闲置,如上图所示。

理想的状态应该是安排一下任务,当一个任务等待I/O时,它处于悬停状态,就让另一个任务接管CPU。这就是异步(也称为事件驱动)编程。

下图生动地展示了用异步编程的方式安排四个任务:


img_9e2fa1738e31bdb07cf3991cfd0b8e73.png

任务仍然是序列的,但是不再各自占用CPU直到任务结束,任务不需要计算时,它们会自发地放弃CPU。尽管CPU仍有闲置,程序的总运行时间明显缩短了。

使用多线程在不同的线程并行运行,也可以达到同样的效果。但是,有一个显著的不同:使用多线程时,是由操作系统决定哪个线程处于运行或悬停。然而,在异步编程中,每个任务可以自己决定是否放弃CPU。

另外,单单使用异步编程,我们不能做出真正的并发:同一时间仅仅有一个任务在运行,消除了竞争条件。当然,我们可以混合使用多线程/多进程和异步编程。

另一点要注意的是,异步编程更善于处理I/O密集型任务,而不是CPU密集型任务(暂停任务不会使性能提高)。

协程

在Python中,让一个功能中途暂停的关键是使用协程。为了理解协程,先要理解生成器generator,要理解生成器,先要理解迭代器iterator。

大部分Python开发者都熟悉对类进行迭代(例如,字符串、列表、元组、文件对象等等):

>>> for i in range(3):
...     print(i)
...
1
2
>>> for line in open('exchange_rates_v1.py'):
...     print(line, end='')
... 
#!/usr/bin/env python3
import itertools
import time
import urllib.request
…

我们可以将各种对象(不仅仅是列表和字符串)进行迭代的原因是迭代协议。迭代协议定义了迭代的标准格式:一个执行__iter____next__(或Python 2.x中的 __iter__next)的对象就是一个迭代器,可以进行迭代操作,如下所示:

class MyIterator(object):
    def __init__(self, xs):
        self.xs = xs

    def __iter__(self):
        return self

    def __next__(self):
        if self.xs:
            return self.xs.pop(0)
        else:
            raise StopIteration

for i in MyIterator([0, 1, 2]):
    print(i)

结果如下所示:

1
2
3

我们能对MyIterator中的实例进行循环的原因是,它用__iter____next__方法,运行了迭代协议:前者返回了迭代的对象,后者逐个返回了序列中的元素。

为了进一步理解协议是如何工作的,我们手动分解这个循环,如下所示:

itrtr = MyIterator([3, 4, 5, 6])

print(next(itrtr))
print(next(itrtr))
print(next(itrtr))
print(next(itrtr))

print(next(itrtr))

运行结果如下:

3
4
5
6
Traceback (most recent call last):
  File "iteration.py", line 32, in <module>
    print(next(itrtr))
  File "iteration.py", line 19, in __next__
  raise StopIteration
StopIteration

我们实例化了MyIterator,然后为了获取它的值,我们多次调用了next()。当序列到头时,next()会抛出异常StopIteration。Python中的for循环使用了同样的机制,它调用迭代器的next(),通过获取异常StopIteration得知何时停止。

生成器就是一个callable,它生成一个结果序列,而不是返回结果。这是通过产生(通过yield关键字)值而不是返回值,见下面的例子(generators.py):

def mygenerator(n):
    while n:
        n -= 1
        yield n

if __name__ == '__main__':
    for i in mygenerator(3):
        print(i)

结果如下:

2
1
0

这是一个使用yield使mygenerator成为生成器的简单例子,它的功能并不简单。调用generator函数并不开始生成序列,只是产生一个generator对象,见如下shell语句:

>>> from generators import mygenerator
>>> mygenerator(5)
<generator object mygenerator at 0x101267b48>

为了激活generator对象,需要调用next(),见如下代码:

>>> g = mygenerator(2)
>>> next(g)
1
>>> next(g)
0
>>> next(g)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

每个next()从生成的序列产生一个值,直到序列为空,也就是获得异常StopIteration时。迭代器的行为也是类似的。本质上,生成器是简化的迭代器,免去了定义类中__iter____next__的方法。

另外,生成器是一次性操作,不能重复生成的序列。若要重复序列,必须再次调用generator函数。

用来在generator函数中产生序列值的yield表达式,还可以在等号右边使用,以消除值。这样就可以得到协程。协程就是一类函数,它可以通过yield,在指定位置暂停或继续任务。

需要注意,尽管协程是强化的生成器,在概念意义上并不等于生成器。原因是,协程与迭代无关。另一不同点,生成器产生值,而协程消除值。

让我们做一些协程,看看如何使用。协程有三种主要的结构,如下所示:

  • yield(): 用来暂停协程的执行
  • send(): 用来向协程传递数据(以让协程继续执行)
  • close():用来关闭协程

下面代码展示了协程的使用(coroutines.py):

def complain_about(substring):
    print('Please talk to me!')
    try:
        while True:
            text = (yield)
            if substring in text:
                print('Oh no: I found a %s again!'
                      % (substring))
    except GeneratorExit:
        print('Ok, ok: I am quitting.')

我们先定义个一个协程,它就是一个函数,名字是complain_about,它有一个参数:一个字符串。打印一句话之后,进入一个无限循环,由try except控制退出,即只有通过异常才能退出。利用异常GeneratorExit,当获得这个异常时就会退出。

循环的主体十分简单,使用yield来获取数据,存储在变量text中。然后,我们检测substring是否在text中。如果在的话,弹出一条新语句。

下面代码展示了在shell中如何使用这个协程:

>>> from coroutines import complain_about
>>> c = complain_about('Ruby')
>>> next(c)
Please talk to me!
>>> c.send('Test data')
>>> c.send('Some more random text')
>>> c.send('Test data with Ruby somewhere in it')
Oh no: I found a Ruby again!
>>> c.send('Stop complaining about Ruby or else!')
Oh no: I found a Ruby again!
>>> c.close()
Ok, ok: I am quitting.

执行complain_about('Ruby')产生了协程。为了使用新建的协程,我们用next()调用它,与在生成器中所做的相同。只有调用next()之后,才在屏幕上看到Please talk to me!

这时,协程到达了text = (yield),意味着它暂停了执行。控制点返回了shell,我们就可以向协程发送数据了。我们使用send()方法发送数据,如下所示:

>>> c.send('Test data')
>>> c.send('Some more random text')
>>> c.send('Test data with Ruby somewhere in it')
Oh no: I found a Ruby again!

每次调用send()方法都使代码到达下一个yield。在我们的例子中,到达while循环的下一次迭代,返回text = (yield)。这里,控制点返回shell。

我们可以调用close()方法停止协程,它可以在协程内部抛出异常GeneratorExit。此时,协程唯一能做的就是清理数据并退出。下面的代码展示了如何结束协程:

>>> c.close()
Ok, ok: I am quitting.

如果将try except部分注释掉,就不会获得GeneratorExit异常。但是协程还是会停止,如下所示:

>>> def complain_about2(substring):
...     print('Please talk to me!')
...     while True:
...         text = (yield)
...         if substring in text:
...             print('Oh no: I found a %s again!'
...                   % (substring))
... 
>>> c = complain_about2('Ruby')
>>> next(c)
Please talk to me!
>>> c.close()
>>> c.send('This will crash')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>> next(c)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

我们看到,一旦关闭协程,对象仍会保持,但是用途为零:不能向它发送数据,也不能调用next()使用它。

当使用协程时,许多人觉得必须要用next()很繁琐,转而使用装饰器,避免多余的调用,如下所示:

>>> def coroutine(fn):
...     def wrapper(*args, **kwargs):
...         c = fn(*args, **kwargs)
...         next(c)
...         return c
...     return wrapper
... 
>>> @coroutine
... def complain_about2(substring):
...     print('Please talk to me!')
...     while True:
...         text = (yield)
...         if substring in text:
...             print('Oh no: I found a %s again!'
...                   % (substring))
... 
>>> c = complain_about2('JavaScript')
Please talk to me!
>>> c.send('Test data with JavaScript somewhere in it')
Oh no: I found a JavaScript again!
>>> c.close()

协程的层级结构可以很复杂,可以让一个协程向其它多个协程发送数据,或从多个源接收数据。这在网络集群编程和系统编程中很有用(为了提高性能),可以用纯Python高效代替大多数Unix工具。

一个异步实例

为了简单又有趣,让我们写一个工具,可以对指定的文件,统计某个词的出现次数。使用之前的协程做基础,再添加一些功能。

在Linux和Mac OS X上,可以使用grep命令获得同样的结果。我们先下载一个大的文本文件,用作输入的数据。我们选择的是Project Gutenberg上列夫托尔斯泰所写的《战争与和平》,它的地址是http://www.gutenberg.org/cache/epub/2600/pg2600.txt

下面代码展示了如何下载(译者注:win上使用Git Bash):

$ curl -sO http://www.gutenberg.org/cache/epub/2600/pg2600.txt
$ wc pg2600.txt
   65007  566320 3291648 pg2600.txt

接下来,我们统计love一词出现的次数,忽略大小写,如下所示(译者注:会有编码问题):

$ time (grep -io love pg2600.txt | wc -l)
677
(grep -io love pg2600.txt) 0.11s user 0.00s system 98% cpu 0.116 total

现在使用Python的协程来做(grep.py):

def coroutine(fn):
    def wrapper(*args, **kwargs):
        c = fn(*args, **kwargs)
        next(c)
        return c
    return wrapper


def cat(f, case_insensitive, child):
    if case_insensitive:
        line_processor = lambda l: l.lower()
    else:
        line_processor = lambda l: l

    for line in f:
        child.send(line_processor(line))


@coroutine
def grep(substring, case_insensitive, child):
    if case_insensitive:
        substring = substring.lower()
    while True:
        text = (yield)
        child.send(text.count(substring))


@coroutine
def count(substring):
    n = 0
    try:
        while True:
            n += (yield)
    except GeneratorExit:
        print(substring, n)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('-i', action='store_true',
                        dest='case_insensitive')
    parser.add_argument('pattern', type=str)
    parser.add_argument('infile', type=argparse.FileType('r'))

    args = parser.parse_args()

    cat(args.infile, args.case_insensitive,
        grep(args.pattern, args.case_insensitive,
             count(args.pattern)))

分析代码之前,我们先运行一下,和grep进行比较:

$ time python3.5 grep.py -i love pg2600.txt
love 677
python3.5 grep.py -i love pg2600.txt  0.09s user 0.01s system 97% cpu 0.097 total

可以看到,使用协程的纯Python版本与使用grepwc命令的Unix相比,十分具有竞争力。当然,Unix的grep命令远比Python版本强大。不能简单宣称Python比C语言快!但是,Python的结果也是让人满意的。

来分析下代码。首先,再次执行coroutine的装饰器。之后,将总任务分解成三块:

  • 逐行读取文件(通过cat函数)
  • 统计每行中substring的出现次数(grep协程)
  • 求和并打印数据(count协程)

在脚本文件的主体部分,我们解析命令行选项,将cat结果传给grep,将grep结果传给count,就像操作普通的Unix工具。

实现这个链条极其简单。我们将接收数据的协程当做参数(前面例子的child),传递给产生数据的函数或协程。然后,在数据源中,调用协程的send方法。

第一个函数cat,作为整个函数的数据源,它逐行读取文件,将每行发送给grep
child.send(line))。如果匹配是大小写不敏感的,不需要进行转换;如果大小写敏感,则都转化为小写。

grep命令是我们的第一个协程。这里,进入一个无限循环,持续获取数据(text = (yield)),统计substringtext中的出现次数,,将次数发送给写一个协程(即count):child.send(text.count(substring)))

count协程用总次数n,从grep获取数据,对总次数进行求和,n += (yield)。它捕获发送给各个协程关闭时的GeneratorExit异常(在我们的例子中,到达文件最后就会出现异常),以判断何时打印这个substringn

当把协程组织为更复杂的结构时,会更有趣。比如,我们可以统计多个单词出现的次数。

下面的代码展示了一种这样做的方法,通过一个额外的协程负责广播,将输入数据发送给任意数目的子协程(mgrep.py):

def coroutine(fn):
    def wrapper(*args, **kwargs):
        c = fn(*args, **kwargs)
        next(c)
        return c
    return wrapper

def cat(f, case_insensitive, child):
    if case_insensitive:
        line_processor = lambda l: l.lower()
    else:
        line_processor = lambda l: l

    for line in f:
        child.send(line_processor(line))

@coroutine
def grep(substring, case_insensitive, child):
    if case_insensitive:
        substring = substring.lower()
    while True:
        text = (yield)
        child.send(text.count(substring))

@coroutine
def count(substring):
    n = 0
    try:
        while True:
            n += (yield)
    except GeneratorExit:
        print(substring, n)

@coroutine
def fanout(children):
    while True:
        data = (yield)
        for child in children:
            child.send(data)

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('-i', action='store_true', dest='case_insensitive')
    parser.add_argument('patterns', type=str, nargs='+',)
    parser.add_argument('infile', type=argparse.FileType('r'))

    args = parser.parse_args()

    cat(args.infile, args.case_insensitive,
        fanout([grep(p, args.case_insensitive,
                     count(p)) for p in args.patterns]))

代码看上去和之前的例子差不多。让我们分析一下差别。我们定义了一个广播器:fanoutfanout()协程使用一列协程作为输入,自身位于一个无限循环中。当收到数据后(data = (yield)),便将数据分发给注册的协程(for child in children: child.send(data))。

不用修改catgrepcount的代码,我们就可以利用原有的代码来搜索任意个数的字符串了!

它的性能依旧很好,如下所示:

$ time python3.5 mgrep.py -i love hate hope pg2600.txt
hate 103
love 677
hope 158
python3.5 mgrep.py -i love hate hope pg2600.txt  0.16s user 0.01s system 98% cpu 0.166 total

总结

Python从1.5.2版本之后引入了asyncoreasynchat模块,开始支持异步编程。2.5版本引入了yield,可以向协程传递数据,简化了代码、加强了性能。Python 3.4 引入了一个新的库进行异步I/O,称作asyncio

Python 3.5通过async defawait,引入了真正的协程类型。感兴趣的读者可以继续研究Python的新扩展。一句警告:异步编程是一个强大的工具,可以极大地提高I/O密集型代码的性能。但是异步编程也是存在问题的,而且还相当复杂。

任何异步代码都要精心选择非阻塞的库,以防使用阻塞代码。并且要运行一个协程规划期(因为OS不能像规划线程一样规划协程),包括写一个事件循环和其它事务。读异步代码会有一定困难,即使我们的最简单的例子也很难一眼看懂。所以,一定要小心!


序言
第1章 并行和分布式计算介绍
第2章 异步编程
第3章 Python的并行计算
第4章 Celery分布式应用
第5章 云平台部署Python
第6章 超级计算机群使用Python
第7章 测试和调试分布式应用
第8章 继续学习


目录
相关文章
|
1月前
|
机器学习/深度学习 边缘计算 人工智能
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing 机器学习 计算学习理论 数据挖掘 科学计算 计算应用 数字图像处理 人工智能
64 6
|
10天前
|
机器学习/深度学习 数据可视化 TensorFlow
使用Python实现深度学习模型的分布式训练
使用Python实现深度学习模型的分布式训练
127 73
|
5天前
|
人工智能 分布式计算 数据处理
云产品评测:MaxFrame — 分布式Python计算服务的最佳实践与体验
阿里云推出的MaxFrame是一款高性能分布式计算平台,专为大规模数据处理和AI应用设计。它提供了强大的Python编程接口,支持分布式Pandas操作,显著提升数据处理速度(3-5倍)。MaxFrame在大语言模型数据处理中表现出色,具备高效内存管理和任务调度能力。然而,在开通流程、API文档及功能集成度方面仍有改进空间。总体而言,MaxFrame在易用性和计算效率上具有明显优势,但在开放性和社区支持方面有待加强。
29 9
|
7天前
|
分布式计算 大数据 数据处理
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
33 2
|
8天前
|
人工智能 分布式计算 数据处理
云产品评测:分布式Python计算服务MaxFrame
云产品评测:分布式Python计算服务MaxFrame
37 3
|
1月前
|
API 调度 开发者
探索Python中的异步编程:从asyncio到Trio
本文将带你深入Python异步编程的心脏地带,从asyncio的基本概念到Trio的高级特性,我们将一起揭开Python异步编程的神秘面纱,并探讨它们如何改变我们的编程方式。
|
2天前
|
分布式计算 数据处理 MaxCompute
分布式Python计算服务MaxFrame使用心得
大家好,我是V哥。MaxFrame是阿里云自研的分布式计算框架,专为Python开发者设计,支持大规模数据处理和AI模型开发。MaxFrame适用于快速进行数据处理、数据科学和交互式探索,支持按量付费及包年包月两种计费方式。通过两个案例(金融数据清洗和大语言模型预处理),展示了MaxFrame在大规模数据处理中的显著性能提升。安装MaxFrame客户端只需简单几步,轻松开启高效数据处理之旅。欢迎关注威哥爱编程,一起交流技术心得!
|
1月前
|
API 开发者 Python
探索Python中的异步编程:Asyncio与Tornado的对决
在这个快节奏的世界里,Python开发者面临着一个挑战:如何让代码跑得更快?本文将带你走进Python异步编程的两大阵营——Asyncio和Tornado,探讨它们如何帮助我们提升性能,以及在实际应用中如何选择。我们将通过一场虚拟的“对决”,比较这两个框架的性能和易用性,让你在异步编程的战场上做出明智的选择。
|
1月前
|
并行计算 调度 开发者
探索Python中的异步编程:从基础到实战
在Python的世界里,异步编程是一种让程序运行更加高效、响应更快的技术。本文不仅会介绍异步编程的基本概念和原理,还将通过具体代码示例展示如何在Python中实现异步操作。无论你是初学者还是有经验的开发者,都能从中获益,了解如何运用这一技术优化你的项目。
|
1月前
|
数据处理 Python
探索Python中的异步编程:从基础到实战
在Python的世界中,“速度”不仅是赛车手的追求。本文将带你领略Python异步编程的魅力,从原理到实践,我们不单单是看代码,更通过实例感受它的威力。你将学会如何用更少的服务器资源做更多的事,就像是在厨房里同时烹饪多道菜而不让任何一道烧焦。准备好了吗?让我们开始这场技术烹饪之旅。