《Python分布式计算》 第4章 Celery分布式应用 (Distributed Computing with Python)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
全局流量管理 GTM,标准版 1个月
简介: 序言第1章 并行和分布式计算介绍第2章 异步编程第3章 Python的并行计算第4章 Celery分布式应用第5章 云平台部署Python第6章 超级计算机群使用Python第7章 测试和调试分布式应用第8章 继续学习本章是前面某些知识点的延续。

序言
第1章 并行和分布式计算介绍
第2章 异步编程
第3章 Python的并行计算
第4章 Celery分布式应用
第5章 云平台部署Python
第6章 超级计算机群使用Python
第7章 测试和调试分布式应用
第8章 继续学习


本章是前面某些知识点的延续。特别的,本章以实例详细的探讨了异步编程和分布式计算。本章关注Celery,一个复杂的用于构建分布应用的Python框架。最后,对比了Celery的对手:PyroPython-RQ

此时,你应该已经明白了并行、分布和异步编程的基本含义。如果没有的话,最好再学习下前面几章。

搭建多机环境

学习Celery和其它Python包之前,先来搭建测试环境。我们开发的是分布应用,因此需要多机环境。

可以使用至少两台联网机器的读者可以跳过这部分。其余读者,请继续阅读。对于后者,仍然有免费或便宜的解决方案。

其一是在主机上使用虚拟机VM(例如VirtualBox,https://www.virtualbox.org)。创建几个VM,安装Linux,让它们在后台运行。因为它们不需要图像化桌面,所以可以很轻量,使用少量RAM和CPU即可。

另一方法是买几个便宜的小型计算机主板,比如树莓派(https://www.raspberrypi.org),在它上面安装Linux,连上局域网。

第三种方案是用云服务器,比如Amazon EC2,使用它的虚拟机。如果使用这种方法,要确认这些包的端口在防火墙是打开的。

无论是用哪种方法,紧跟着的问题就是没有在集群上安装完整的DNS。最便捷的方法是在所有机器上编辑/etc/hosts文件。查看IP地址,为每台机器起一个名字,并将它们添加到/etc/hosts

我在Mac主机上使用了两个虚拟机,这是我的hosts文件:

$ cat /etc/hosts
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1             localhost 
fe80::1%lo0 localhost

# Development VMs
192.168.123.150 ubuntu1 ubuntu1.local
192.168.123.151 ubuntu2 ubuntu2.local

相似的,这是我的两个虚拟机(运行Ubuntu 15.04)上的host文件:

$ cat /etc/hosts
127.0.0.1 localhost
192.168.123.151 ubuntu2
192.168.123.150 ubuntu1

# The following lines are desirable for IPv6 capable hosts
::1     ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

你要确保hosts文件上的IP地址和名字是要使用的机器。本书,会命名这些机器命名为HOST1、HOST2、HOST3等等。

搭建好多机环境之后,就可以开始写分布应用了。

安装Celery

目前为止,我们用的都是Python的标准库,Celery(http://www.celeryproject.org)是用到的第一个第三方库。Celery是一个分布任务队列,就是一个以队列为基础的系统,和之前的某些例子很像。它还是分布式的,意味着工作进程和保存结果的和请求的队列,在不同机器上。

首先安装Celery和它的依赖。在每台机器上建立一个虚拟环境(起名为book),代码如下(环境是Unix):

$ pip install virtualenvwrapper

如果这个命令被拒绝,可以加上sudo,用超级用户权限来安装virtualenvwrapper,代码如下:

$ sudo pip install virtualenvwrapper

sudo命令会向你询问Unix用户密码。或者,可以用下面代码安装virtualenvwrapper

$ pip install --user virtualenvwrapper

不管使用哪种方法,完成安装virtualenvwrapper之后,都需要配置它,定义三个环境变量(用于bash类的shell,假定virtualenvwrapper安装在/usr/local/bin):

$ export WORKON_HOME=$HOME/venvs
$ export PROJECT_HOME=$HOME/workspace
$ source /usr/local/bin/virtualenvwrapper.sh

你需要修改前置路径,来决定虚拟环境所在的位置($WORKON_HOME)和代码的根目录($PROJECT_HOME)。virtualenvwrapper.sh的路径也可能需要变动。这三行代码最好添加到相关的shell启动文件(例如,~/.bashrc~/.profile)。

做好了前面的设置,我们就可以创建要使用的虚拟环境了,如下所示:

$ mkvirtualenv book --python=`which python3.5`

这个命令会在$WORKON_HOME之下建立新的虚拟环境,名字是book,使用的是Python 3.5。以后,可以用下面命令启动这个虚拟环境:

$ workon book

使用虚拟环境的好处是,可以在里面安装所有需要的包,而不污染系统的Python。以后不再需要这个虚拟环境时,可以方便的删除(参考rmvirtualenv命令)。

现在就可以安装Celery了。和以前一样,(在每台机器上)使用pip

$ pip install celery

该命令可以在激活的虚拟环境中下载、解压、安装所有的依赖。

快完成了,现在只需安装配置一个中间代理,Celery用它主持任务队列,并向工作进程(只有一台机器,HOST1)发送消息。从文档中可以看到,Celery支持多种中间代理,包括SQLAlchemyhttp://www.sqlalchemy.org),用以本地开发和测试。这里推荐使用的中间代理是RabbitMQhttps://www.rabbitmq.com)。

https://www.rabbitmq.com上有安装指导、文档和下载。在Mac主机上,安装的最简方法是使用homebrewhttp://brew.sh),如下所示:

$ brew install rabbitmq

对于Windows用户,最好使用官方的安装包。对于Linux,官方也提供了安装包。

安装好RabbitMQ之后,就可以立即使用了。这里还有一个简单的配置步骤,因为在例子中,访问队列不会创建用户和密码。只要编辑RabbitMQ的配置文件(通常位于/usr/local/etc/rabbitmq/rabbitmq.config),添加下面的条目,允许网络中的默认guest账户:

[
  {rabbit, [{loopback_users, []}]}
].

手动启动RabbitMQ,如下所示(服务器脚本可能不在$PATH环境,通常存储在/usr/local/sbin):

$ sudo rabbitmq-server

sudo会向你询问用户密码。对于我们的例子,我们不会进一步配置中间代理,使用默认访客账户就行。

注意:感兴趣的读者可以在http://www.rabbitmq.com/admin-guide.html阅读RabbitMQ的管理指导。

到这里,我们就安装好了所有需要的东西,可以开始使用Celery了。有另外一个依赖,也值得考虑安装,尽管不是严格需要的,尤其是我们只想使用Celery。它是结果后台,即Celery的工作进程用其存储计算的结果。它就是Redis(http://redis.io)。安装Redis是非必须的,但极力推荐安装,和RabbitMQ类似,Redis运行在另一台机器上,称作HOST2。

Redis的安装十分简单,安装代码适用于Linux,Mac OS X和Windows。我们在Mac上用homebrew安装,如下:

$ brew install redis

在其它操作系统上,例如Linux,可以方便的用二进制码安装(例如对于Ubuntu,sudo apt-get install redis-server)。

启动Redis的命令如下:

$ sudo redis-server

本章剩下的部分会假定结果后台存在,如果没有安装,会到时指出配置和代码的不同。同时,任何在生产环境中使用Celery的人,都应该考虑使用结果后台。

测试安装

快速尝试一个例子,以验证Celery是正确安装的。我们需要四个终端窗口,三个不同的机器(命名为HOST1、HOST2、HOST3和HOST4)。在HOST1的窗口启动RabbitMQ(确保rabbitmq-server路径正确):

HOST1 $ sudo /usr/local/sbin/rabbitmq-server

在HOST2的窗口,启动Redis(没安装的话,跳到下一段):

HOST2 $ sudo /usr/local/bin/redis-server

最后,在HOST3的窗口,创建如下Python文件(记得使用workon book激活虚拟环境),命名为test.py

import celery

app = celery.Celery('test',
                        broker='amqp://HOST1',
                        backend='redis://HOST2')

@app.task
def echo(message):
    return message

这段代码很简单。先引入了Celery包,然后定义了一个Celery应用(app),名字是test。这个应用使用HOST1的中间代理RabbitMQ和HOST2的Redis数据库的默认账户和消息队列。

要是想用RabbitMQ作为结果后台而不用Redis,需要修改前面的代码,将backend进行如下修改:

import celery

app = celery.Celery('test',
                        broker='amqp://HOST1',
                        backend=amqp://HOST1')

@app.task
def echo(message):
    return message

有了应用实例,就可以用它装饰远程的worker(使用装饰器@app.task)。在这个例子中,我们装饰一个简单的函数,它可以返回传递给它的消息(echo)。

之后,在终端HOST3,建立worker池,如下所示:

HOST3 $ celery -A test worker --loglevel=info

记得要在test.py的目录(或将PYTHONPATH环境变量指向test.py的目录),好让Celery可以引入代码。

celery命令会默认启动CPU数目相同的worker进程。worker会使用test模块中的应用app(我们可以使用实例的名字celery -A test.app worker),并使用INFO等级在控制台显示日志。在我的电脑上(有HyperThreading的四核电脑),Celery默认启用了八个worker进程。

在HOST4终端,复制test.py代码,启动book虚拟环境,在test.py目录打开Python shell,如下所示:

HOST4 $ python3.5
Python 3.5.0 (v3.5.0:374f501f4567, Sep 12 2015, 11:00:19)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.

从复制的test模块引入echo函数,如下:

>>> from test import echo

我们现在可以像普通Python函数一样调用echoecho可以直接在本地(即HOST4)运行,如下所示:

>>> res = echo('Python rocks!')
>>> print(res)
Python rocks!

为了让HOST3的worker进程运行echo()函数,我们不能像之前那样直接调用。我们需要调用它的delay方法(装饰器@app.task注入的),见下面的命令:

>>> res = echo.delay('Python rocks!'); print(type(res)); print(res)
<class 'celery.result.AsyncResult'>
1423ec2b-b6c7-4c16-8769-e62e09c1fced
>>> res.ready()
True
>>> res.result
'Python rocks!'

我们看到,调用echo.delay('Python rocks!')不会返回字符串。相反,它在任务队列(运行在HOST1的RabbitMQ服务器)中安排了一个请求以执行echo函数,并返回Future,准确的说是AsyncResult(Celery的Future)。正如concurrent.futures模块,这个对象是一个异步调用结果的占位符。在我们的例子中,异步调用的是我们安插在任务队列的echo函数,调用它的是其它位置的Celery的worker进程(我们的例子中是HOST3)。

我们可以查询AsyncResult对象来确定它们是否预备好。如果是的话,我们可以访问它们的结果,在我们的例子中是字符串'Python rocks!'。

切换到启用worker进程的窗口,我们可以看到worker池接收到了echo任务请求,如下所示:

[2015-11-10 08:30:12,869: INFO/MainProcess] Received task: test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced]
[2015-11-10 08:30:12,886: INFO/MainProcess] Task test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced] succeeded in 0.01469148206524551s: 'Python rocks!'

我们现在可以退出Python shell和worker进程(在发起celery worker命令的终端窗口按CTRL+C):Celery安装成功。

Celery介绍

什么是分布式任务队列,Celery是怎么运行分布式任务队列的呢?分布式任务队列这种架构已经存在一定时间了。这是一种master-worker架构,有一个中间件层,中间件层使用多个任务请求队列(即任务队列),和一个用于存储结果的队列(即结果后台)。

主进程(也叫作clientproducer)将任务请求安插到某个任务队列,从结果后台获取数据。worker进程订阅任务队列以明确任务是什么,并把结果放到结果后台。

这是一个简单灵活的架构。主进程不需要知道有多少个可用的worker,也不需要知道worker运行在哪台机器。它只需要知道队列在哪,以及如何发送任务请求。

worker进程也是如此。它们不需要知道任务请求来自何处,也不需要知道结果用来做什么。它们只需知道从哪里取得任务,存储在哪里。

这样的优点是worker的数量、种类、形态可以随意变化,而不对总系统的功能产生影响(但会影响性能和延迟)。分布式任务队列可以方便地进行扩展(添加新worker),规划优先级(给队列定义不同的优先级,给不同的队列安排不同数量的worker)。

另一个优点是,这个去耦合化的系统在原则上,worker和producer可以用不同语言来写。例如,Python代码生成的工作由C语言写的worker进程来做,这样性能是最高的。

Celery使用了第三方、健壮的、实地验证的系统来做它的队列和结果后台。推荐的中间代理是RabbitMQ,我们之前用过。RabbitMQ是一个非常复杂的消息代理,有许多特性,本书不会对它做深入探索。结果后台也是如此,它可以是一个简单的RabbitMQ队列,或者更优的,使用专门的服务比如Redis。

下图展示了典型的使用RabbitMQ和Redis的Celery应用架构:

img_78395d932aa807b7f5be6624c98871a4.png

每个方框中的进程(即RabbitMQ、Redis、worker和master.py)都可以运行在不同的机器上。小型的安装方案是将RabbitMQ和Redis放在同一个主机上,worker几点可能只有一个或两个。大型方案会使用更多的机器,或者专门的服务器。

更复杂的Celery应用

我们用Celery做两个简单有趣的应用。第一个仿照第3章中汇率例子,第二个是一个分布式排序算法。

我们还是使用四台机器(HOST1、HOST2、HOST3、HOST4)。和以前一样,HOST1运行RabbitMQ,HOST2运行Redis,HOST3运行Celery的worker,HOST运行主代码。

先从简单的例子开始。创建一个Python文件(celery/currency.py),代码如下(如果你没有使用Redis,记得修改backend'amqp://HOST1'):

import celery
import urllib.request


app = celery.Celery('currency',
                    broker='amqp://HOST1',
                    backend='redis://HOST2')


URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

@app.task
def get_rate(pair, url_tmplt=URL):
    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('pairs', type=str, nargs='+')
    args = parser.parse_args()

    results = [get_rate.delay(pair) for pair in args.pairs]
    for result in results:
        pair, rate = result.get()
        print(pair, rate)

这段代码和第3章的多线程版本差不多。主要的区别是,因为使用的是Celery,我们不需要创建队列,Celery负责建立队列。另外,除了为每个汇率对建一个线程,我们只需让worker负责从队列获取任务请求,执行相应的函数请求,完毕之后返回结果。

探讨调用的行为是有益的,比如成功的调用、由于缺少worker而不工作的调用、失败且抛出异常的调用。我们从成功的调用开始。

echo的例子一样,在各自的终端启动RabbitMQ和Redis(通过redis-serverrabbitmq-server命令)。

然后,在worker主机(HOST3)上,复制currency.py文件,切换到它的目录,创建worker池(记住,Celery启动的worker数目尽可能和CPU核数一样多):

HOST3 $ celery -A currency worker --loglevel=info

最后,复制相同的文件到HOST4,并运行如下:

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0644
CHFUSD 0.986
GBPUSD 1.5216
GBPEUR 1.4296
CADUSD 0.751
CADEUR 0.7056

一切工作正常,我么得到了五个汇率。如果查看启动worker池的主机(HOST3),我们会看到类似下图的日志:

img_923523252ef690292ecf24caeb02cec3.png

这是日志等级loglevel=info时,Celery worker的日志。每个任务都被分配了一个独立ID(例如GBP兑USD的任务ID是f8658917-868c-4eb5-b744-6aff997c6dd2),基本的时间信息也被打印了出来。

如果没有可用的worker呢?最简单的方法是停止worker(在终端窗口按CTRL+C),返回HOST4的currency.py,如下所示:

OST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR

什么都没发生,currency.py一直处于等待worker的状态。这样的状态可能也可能不是我们想要的:其一,让文件等待而不发生崩溃,是很方便的;其二,我们可能想在一定时间后,停止等待。可以在result.get()timeout参数。

例如,修改代码,使用result.get(timeout=1),会有如下结果(还是在没有worker的情况下):

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
 Traceback (most recent call last):
  File "currency.py", line 29, in <module>
    pair, rate = result.get(timeout=1)
  File "/venvs/book/lib/python3.5/site-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File " /venvs/book/lib/python3.5/site-packages/celery/backends/base.py", line 226, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

当然,我们应该总是使用超时,以捕获对应的异常,作为错误处理的策略。

要记住,默认下,任务队列是持续的,它的日志不会停止(Celery允许用户定制)。这意味着,如果我们现在启动了一些worker,它们就会开始从队列获取悬挂的任务,并返回结果。我们可以用如下命令清空队列:

HOST4 $ celery purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation!

(to skip this prompt use the -f option)

Are you sure you want to delete all tasks (yes/NO)? yes
Purged 12 messages from 1 known task queue.

接下来看任务产生异常的情况。修改HOST3的currency.py文件,让get_rate抛出一个异常,如下所示:

@app.task
def get_rate(pair, url_tmplt=URL):
    raise Exception('Booo!')

现在,重启HOST3的worker池(即HOST3 $ celery -A currency worker --loglevel=info),然后在HOST4启动主程序:

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
Traceback (most recent call last):
  File "currency.py", line 31, in <module>
    pair, rate = result.get(timeout=1)
  File "/Users/fpierfed/Documents/venvs/book/lib/python3.5/site-packages/celery/result.py", line 175, in get
    raise meta['result']
Exception: Booo!

所有的worker都抛出了异常,异常传递到了调用的代码,在首次调用result.get()返回。

任务抛出任何异常,我们都要小心。远程运行的代码失败的原因可能有很多,不一定和代码本身有关,因此需要谨慎应对。

Celery可以用如下的方法提供帮助:我们可以用timeout获取结果;重新提交失败的任务(参考task装饰器的retry参数)。还可以取消任务请求(参考任务的apply_async方法的expires参数,它比之前我们用过的delay功能强大)。

有时,任务图会很复杂。一项任务的结果还要传递给另一个任务。Celery支持复杂的调用方式,但是会有性能损耗。

用第二个例子来探讨:一个分布式的归并排序算法。这是包含两个文件的长代码:一个是算法本身(mergesory.py),一个是主代码(main.py)。

归并排序是一个简单的基于递归二分输入列表的算法,将两个部分排序,再将结果合并。建立一个新的Python文件(celery/mergesort.py),代码如下:

import celery


app = celery.Celery('mergesort',
                        broker='amqp://HOST1',
                        backend='redis://HOST2')

@app.task
def sort(xs):
    lenxs = len(xs)
    if(lenxs <= 1):
        return(xs)

    half_lenxs = lenxs // 2
    left = xs[:half_lenxs]
    right = xs[half_lenxs:]
    return(merge(sort(left), sort(right)))

def merge(left, right):
    nleft = len(left)
    nright = len(right)

    merged = []
    i = 0
    j = 0
    while i < nleft and j < nright:
        if(left[i] < right[j]):
            merged.append(left[i])
            i += 1
        else:
            merged.append(right[j])
            j += 1
    return merged + left[i:] + right[j:]

这段代码很直白。Celery应用命名为app,它使用RabbitMQ作为任务队列,使用Redis作为结果后台。然后,定义了sort算法,它使用了附属的merge函数以合并两个排好序的子列表,成为一个排好序的单列表。

对于主代码,另建一个文件(celery/main.py),它的代码如下:

#!/usr/bin/env python3.5
import random
import time
from celery import group
from mergesort import sort, merge


# Create a list of 1,000,000 elements in random order.
sequence = list(range(1000000))
random.shuffle(sequence)

t0 = time.time()

# Split the sequence in a number of chunks and process those 
# independently.
n = 4
l = len(sequence) // n
subseqs = [sequence[i * l:(i + 1) * l] for i in range(n - 1)]
subseqs.append(sequence[(n - 1) * l:])

# Ask the Celery workers to sort each sub-sequence.
# Use a group to run the individual independent tasks as a unit of work.
partials = group(sort.s(seq) for seq in subseqs)().get()

# Merge all the individual sorted sub-lists into our final result.
result = partials[0]
for partial in partials[1:]:
    result = merge(result, partial)

dt = time.time() - t0
print('Distributed mergesort took %.02fs' % (dt))

# Do the same thing locally and compare the times.
t0 = time.time()
truth = sort(sequence)
dt = time.time() - t0
print('Local mergesort took %.02fs' % (dt))

# Final sanity checks.
assert result == truth
assert result == sorted(sequence)

我们先生成一个足够长的无序(random.shuffle)整数序列(sequence = list(range(1000000)))。然后,分成长度相近的子列表(n=4)。

有了子列表,就可以对它们进行并行处理(假设至少有四个可用的worker)。问题是,我们要知道什么时候这些列表排序好了,好进行合并。

Celery提供了多种方法让任务协同执行,group是其中之一。它可以在一个虚拟的任务里,将并发的任务捆绑执行。group的返回值是GroupResult(与类AsyncResult的层级相同)。如果没有结果后台,GroupResult get()方法是必须要有的。当组中所有的任务完成并返回值,group方法会获得一个任务签名(用参数调用任务s()方法,比如代码中的sort.s(seq))的列表。任务签名是Celery把任务当做参数,传递给其它任务(但不执行)的机制。

剩下的代码是在本地合并排好序的列表,每次合并两个。进行完分布式排序,我们再用相同的算法重新排序原始列表。最后,对比归并排序结果与内建的sorted调用。

要运行这个例子,需要启动RabbitMQ和Redis。然后,在HOST3启动一些worker,如下所示:

HOST3 $ celery -A mergesort worker --loglevel=info

记得拷贝mergesort.py文件,并切换到其目录运行(或者,定义PYTHONPATH指向它所在的位置)。

之后,在HOST4上运行:

HOST4 $ python3.5 main.py
Distributed mergesort took 10.84s
Local mergesort took 26.18s

查看Celery日志,我们看到worker池接收并执行了n个任务,结果发回给了caller。

性能和预想的不一样。使用多进程(使用multiprocessingconcurrent.futures)来运行,与前面相比,可以有n倍的性能提升(7秒,使用四个worker)。

这是因为Celery同步耗时长,最好在只有不得不用的时候再使用。Celery持续询问组中的部分结果是否准备好,好进行后续的工作。这会非常消耗资源。

生产环境中使用Celery

下面是在生产环境中使用Celery的tips。

第一个建议是在Celery应用中使用配置模块,而不要在worker代码中进行配置。假设,配置文件是config.py,可以如下将其传递给Celery应用:

import celery
app = celery.Celery('mergesort')
app.config_from_object('config')

然后,与其他可能相关的配置指令一起,在config.py中添加:

BROKER_URL = 'amqp://HOST1'
CELERY_RESULT_BACKEND = 'redis://HOST2'

关于性能的建议是,使用至少两个队列,好让任务按照执行时间划分优先级。使用多个队列,将任务划分给合适的队列,是分配worker的简便方法。Celery提供了详尽的方法将任务划分给队列。分成两步:首先,配置Celery应用,启动worker,如下所示:

# In config.py
CELERY_ROUTES = {project.task1': {'queue': 'queue1'},
                    'project.task2': {'queue': 'queue2'}}

为了在队列中启动worker,在不同的机器中使用下面的代码:

HOST3 $ celery –A project worker –Q queue1
HOST5 $ celery –A project worker –Q queue2

使用Celery命令行工具的-c标志,可以控制worker池的大小,例如,启动一个有八个worker的池:

HOST3 $ celery –A project worker –c 8

说道worker,要注意,Celery默认使用多进程模块启动worker池。这意味着,每个worker都是一个完整的Python进程。如果某些worker只处理I/O密集型任务,可以将它们转换成协程或多线程,像前面的例子。这样做的话,可以使用-P标志,如下所示:

$ celery –A project worker –P threads

使用线程和协程可以节省资源,但不利于CPU制约型任务,如前面的菲波那切数列的例子。

谈到性能,应该尽量避免同步原语(如前面的group()),除非非用不可。当同步无法回避时,好的方法是使用结果后台(如Redis)。另外,如果可能的话,要避免传递复杂的对象给远程任务,因为这些对象需要序列化和去序列化,通常很耗时。

额外的,如果不需要某个任务的结果,应该确保Celery不去获取这些结果。这是通过装饰器@task(ignore_result=True)来做的。如果所有的任务结果都忽略了,就不必定义结果后台。这可以让性能大幅提高。

除此之外,还要指出,如何启动worker、在哪里运行worker、如何确保它们持续运行是很重要的。默认的方法是使用工具,例如supervisord (http://supervisord.org) ,来管理worker进程。

Celery带有一个supervisord的配置案例(在安装文件的extra/supervisord目录)。一个监督的优秀方案是flower(https://github.com/mher/flower),一个worker的网络控制和监督工具。

最后,RabbitMQ和Redis结合起来,是一个很好的中间代理和结果后台解决方案,适用于大多数项目。

Celery的替代方案:Python-RQ

Celery的轻量简易替代方案之一是 Python-RQ (http://python-rq.org)。它单单基于Redis作为任务队列和结果后台。没有复杂任务或任务路由,使用它很好。

因为Celery和Python-RQ在概念上很像,让我们立即重写一个之前的例子。新建一个文件(rq/currency.py),代码如下:

import urllib.request

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

def get_rate(pair, url_tmplt=URL):
    # raise Exception('Booo!')

    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

这就是之前的汇率例子的代码。区别是,与Celery不同,这段代码不需要依赖Python-RQ或Redis。将这段代码拷贝到worker节点(HOST3)。

主程序也同样简单。新建一个Python文件(rq/main.py),代码如下:

#!/usr/bin/env python3
import argparse
import redis
import rq
from currency import get_rate

parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()

conn = redis.Redis(host='HOST2')
queue = rq.Queue(connection=conn)

jobs = [queue.enqueue(get_rate, pair) for pair in args.pairs]

for job in jobs:
    while job.result is None:
        pass
    print(*job.result)

我们在这里看到Python-RQ是怎么工作的。我们需要连接Redis服务器(HOST2),然后将新建的连接对象传递给Queue类构造器。结果Queue对象用来向其提交任务请求。这是通过传递函数对象和其它参数给queue.enqueue

函数排队调用的结果是job实例,它是个异步调用占位符,之前见过多次。

因为Python-RQ没有Celery的阻塞AsyncResult.get()方法,我们要手动建一个事件循环,持续向job实例查询,以确认是否它们的result不是None这种方法不推荐在生产环境中使用,因为持续的查询会浪费资源,查询不足会浪费时间,但对于这个简易例子没有问题。

为了运行代码,首先要安装Python-RQ,用pip进行安装:

$ pip install rq

在所有机器上都要安装。然后,在HOST2运行Redis:

$ sudo redis-server

在HOST3上,启动一些worker。Python-RQ不自动启动worker池。启动多个worker的简易的方法是使用一个文件(start_workers.py):

#!/usr/bin/env python3
import argparse
import subprocess

def terminate(proc, timeout=.5):
    """
    Perform a two-step termination of process `proc`: send a SIGTERM
    and, after `timeout` seconds, send a SIGKILL. This should give 
    `proc` enough time to do any necessary cleanup.
    """
    if proc.poll() is None:
        proc.terminate()
        try:
            proc.wait(timeout)
        except subprocess.TimeoutExpired:
            proc.kill()
    return

parser = argparse.ArgumentParser()
parser.add_argument('N', type=int)
args = parser.parse_args()

workers = []
for _ in range(args.N):
    workers.append(subprocess.Popen(['rqworker',
                                            '-u', 'redis://yippy']))
try:
    running = [w for w in workers if w.poll() is None]
    while running:
        proc = running.pop(0)
        try:
            proc.wait(timeout=1.)
        except subprocess.TimeoutExpired:
            running.append(proc)
except KeyboardInterrupt:
    for w in workers:
        terminate(w)

这个文件会启动用户指定书目的Python-RQ worker进程(通过使用rqworker脚本,Python-RQ源码的一部分),通过Ctrl+C杀死进程。更健壮的方法是使用类似之前提过的supervisord工具。

在HOST3上运行:

HOST3 $ ./start_workers.py 6

现在可以运行代码。在HOST4,运行main.py

HOST4 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0635
CHFUSD 0.9819
GBPUSD 1.5123
GBPEUR 1.422
CADUSD 0.7484
CADEUR 0.7037

效果与Celery相同。

Celery的替代方案:Pyro

Pyro (http://pythonhosted.org/Pyro4/)的意思是Python Remote Objects,是1998年创建的一个包。因此,它十分稳定,且功能完备。

Pyro使用的任务分布方法与Celery和Python-RQ十分不同,它是在网络中将Python对象作为服务器。然后创建它们的代理对象,让调用代码可以将其看做本地对象。这个架构在90年代末的系统很流行,比如COBRA和Java RMI。

Pyro掩盖了代码中的对象是本地还是远程的,是让人诟病的一点。原因是,远程代码运行错误的原因很多,当远程代码隐藏在代理对象后面执行,就不容易发现错误。

另一个诟病的地方是,Pyro在点对点网络(不是所有主机名都可以解析)中,或者UDP广播无效的网络中,很难正确运行。

尽管如此,大多数开发者认为Pyro非常简易,在生产环境中足够健壮。

Pyro安装很简单,它是纯Python写的,依赖只有几个,使用pip:

$ pip install pyro4

这个命令会安装Pyro 4.x和Serpent,后者是Pyro用来编码和解码Python对象的序列器。

用Pyro重写之前的汇率例子,要比用Python-RQ复杂,它需要另一个软件:Pyro nameserver。但是,不需要中间代理和结果后台,因为Pyro对象之间可以直接进行通讯。

Pyro运行原理如下。每个远程访问的对象都封装在处于连接监听的socket服务器框架中。每当调用远程对象中的方法,被调用的方法,连同它的参数,就被序列化并发送到适当的对象/服务器上。此时,远程对象执行被请求的任务,经由相同的连接,将结果发回到(同样是序列化的)调用它的代码。

因为每个远程对象自身就可以调用远程对象,这个架构可以是相当去中心化的。另外,一旦建立通讯,对象之间就是p2p的,这与分布式任务队列的轻度耦合架构十分不同。另一点,每个远程对象既可以做master,也可以做worker。

接下来重写汇率的例子,来看看具体是怎么运行的。建立一个Python文件(pyro/worker.py),代码如下:

import urllib.request
import Pyro4

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

@Pyro4.expose(instance_mode="percall")
class Worker(object):
    def get_rate(self, pair, url_tmplt=URL):
        with urllib.request.urlopen(url_tmplt.format(pair)) as res:
            body = res.read()
        return (pair, float(body.strip()))

# Create a Pyro daemon which will run our code.
daemon = Pyro4.Daemon()
uri = daemon.register(Worker)
Pyro4.locateNS().register('MyWorker', uri)

# Sit in an infinite loop accepting connections
print('Accepting connections')
try:
    daemon.requestLoop()
except KeyboardInterrupt:
    daemon.shutdown()
print('All done')

worker的代码和之前的很像,不同点是将get_rate函数变成了Worker类的一个方法。变动的原因是,Pyro允许导出类的实例,但不能导出函数。

剩下的代码是Pyro特有的。我们需要一个Daemon实例(它本质上是后台的网络服务器),它会获得类,并在网络上发布,好让其它的代码可以调用方法。分成两步来做:首先,创建一个类Pyro4.Daemon的实例,然后添加类,通过将其传递给register方法。

每个Pyro的Daemon实例可以隐藏任意数目的类。内部,需要的话,Daemon对象会创建隐藏类的实例(也就是说,如果没有代码需要这个类,相应的Daemon对象就不会将其实例化)。

每一次网络连接,Daemon对象默认会实例化一次注册的类,如果要进行并发任务,这样就不可以。可以通过装饰注册的类修改,@Pyro4.expose(instance_mode=...)

instance_mode支持的值有三个:singlesessionpercall。使用single意味Daemon只为类创建一个实例,使用它应付所有的客户请求。也可以通过注册一个类的实例(而不是类本身)。

使用session可以采用默认模式:每个client连接都会得到一个新的实例,client始终都会使用它。使用instance_mode="percall",会为每个远程方法调用建立一个新实例。

无论创建实例的模式是什么,用Daemon对象注册一个类(或实例)都会返回一个唯一的识别符(即URI),其它代码可以用识别符连接对象。我们可以手动传递URI,但更方便的方法是在Pyro nameserver中存储它,这样通过两步来做。先找到nameserver,然后给URI注册一个名字。在前面的代码中,是通过下面来做的:

Pyro4.locateNS().register('MyWorker', uri)

nameserver的运行类似Python的字典,注册两个名字相同的URI,第二个URI就会覆盖第一个。另外,我们看到,client代码使用存储在nameserver中的名字控制了许多远程对象。这意味着,命名需要特别的留意,尤其是当许多worker进程提供的功能相同时。

最后,在前面的代码中,我们用daemon.requestLoop()进入了一个Daemon事件循环。Daemon对象会在无限循环中服务client的请求。

对于client,创建一个Python文件(pyro/main.py),它的代码如下:

#!/usr/bin/env python3
import argparse
import time
import Pyro4

parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()

# Retrieve the rates sequentially.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")

for pair in args.pairs:
    print(worker.get_rate(pair))
print('Sync calls: %.02f seconds' % (time.time() - t0))

# Retrieve the rates concurrently.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)

results = [async_worker.get_rate(pair) for pair in args.pairs]
for result in results:
    print(result.value)
print('Async calls: %.02f seconds' % (time.time() - t0))

可以看到,client把相同的工作做了两次。这么做的原因是展示Pyro两种调用方式:同步和异步。

来看代码,我们使用argparse包从命令行获得汇率对。然后,对于同步的方式,通过名字worker = Pyro4.Proxy("PYRONAME:MyWorker")获得了一些远程worker对象。前缀PYRONAME:告诉Pyro在nameserver中该寻找哪个名字。这样可以避免手动定位nameserver。

一旦有了worker对象,可以把它当做本地的worker类的实例,向其调用方法。这就是我们在第一个循环中做的:

for pair in args.pairs:
    print(worker.get_rate(pair))

对每个worker.get_rate(pair)声明,Proxy对象会用它的远程Daemon对象连接,发送请求,以运行get_rate(pair)。我们例子中的Daemon对象,每次会创建一个Worker类的的实例,并调用它的get_rate(pair)方法。结果序列化之后发送给client,然后打印出来。每个调用都是同步的,任务完成后会封锁。

在第二个循环中,做了同样的事,但是使用的是异步调用。我们需要向远程的类创建一个Proxy对象,然后,将它封装在一个异步handler中。这就是下面代码的功能:

worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)

我们现在可以在后台用async_worker获取汇率。每次调用async_worker.get_rate(pair)是非阻塞的,会返回一个Pyro4.futures.FutureResult的实例,它和concurrent.futures模块中Future对象很像。访问它的value需要等待,直到相应的异步调用完成。

为了运行这个例子,需要三台机器的三个窗口:一个是nameserver(HOST1),一个是Worker类和它的Daemon(HOST2),第三个(HOST3)是client(即main.py)。

在第一个终端,启动nameserver,如下:

HOST1 $ pyro4-ns --host 0.0.0.0
Broadcast server running on 0.0.0.0:9091
NS running on 0.0.0.0:9090 (0.0.0.0)
Warning: HMAC key not set. Anyone can connect to this server!
URI = PYRO:Pyro.NameServer@0.0.0.0:9090

简单来说,nameserver绑定为0.0.0.0,任何人都可以连接它。我们没有设置认证,因此在倒数第二行弹出了一个警告。

nameserver运行起来了,在第二个终端启动worker:

HOST2 $ python3.5 worker.py
Accepting connections

Daemon对象接收连接,现在去第三个终端窗口运行client代码,如下:

HOST3 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Sync calls: 1.55 seconds
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Async calls: 0.29 seconds

结果和预想一致,IO限制型代码可以方便的进行扩展,异步代码的速度六倍于同步代码。

这里,还有几个提醒。第一是,Pyro的Daemon实例要能解析主机的名字。如果不能解析,那么它只能接受127.0.0.1的连接,这意味着,不能被远程连接(只能本地连接)。解决方案是将其与运行的主机进行IP绑定,确保它不是环回地址。可以用下面的Python代码选择一个可用的IP:

from socket import gethostname, gethostbyname_ex

ips = [ip for ip in gethostbyname_ex(gethostname())[-1] 
        if ip != '127.0.0.1']
ip = ips.pop()

另一个要考虑的是:作为Pyro使用“直接连接被命名对象”方法的结果,很难像Celery和Python-RQ那样直接启动一批worker。在Pyro中,必须用不同的名字命名worker,然后用名字进行连接(通过代理)。这就是为什么,Pyro的client用一个mini的规划器来向可用的worker分配工作。

另一个要注意的是,nameserver不会跟踪worker的断开,因此,用名字寻找一个URI对象不代表对应的远程Daemon对象是真实运行的。最好总是这样对待Pyro调用:远程服务器的调用可能成功,也可能不成功。

记住这些点,就可以用Pyro搭建复杂的网络和分布式应用。

总结

这一章很长。我们学习了Celery,他是一个强大的包,用以构建Python分布式应用。然后学习了Python-RQ,一个轻量且简易的替代方案。两个包都是使用分布任务队列架构,它是用多个机器来运行相同系统的分布式任务。

然后介绍了另一个替代方案,Pyro。Pyro的机理不同,它使用的是代理方式和远程过程调用(RPC)。

两种方案都有各自的优点,你可以选择自己喜欢的。

下一章会学习将分布式应用部署到云平台,会很有趣。


序言
第1章 并行和分布式计算介绍
第2章 异步编程
第3章 Python的并行计算
第4章 Celery分布式应用
第5章 云平台部署Python
第6章 超级计算机群使用Python
第7章 测试和调试分布式应用
第8章 继续学习


目录
相关文章
|
3月前
|
机器学习/深度学习 边缘计算 人工智能
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing 机器学习 计算学习理论 数据挖掘 科学计算 计算应用 数字图像处理 人工智能
104 6
|
1月前
|
分布式计算 DataWorks 大数据
分布式Python计算服务MaxFrame测评
一文带你入门分布式Python计算服务MaxFrame
89 23
分布式Python计算服务MaxFrame测评
|
1月前
|
分布式计算 DataWorks 数据处理
产品测评 | 上手分布式Python计算服务MaxFrame产品最佳实践
MaxFrame是阿里云自研的分布式计算框架,专为大数据处理设计,提供高效便捷的Python开发体验。其主要功能包括Python编程接口、直接利用MaxCompute资源、与MaxCompute Notebook集成及镜像管理功能。本文基于MaxFrame最佳实践,详细介绍了在DataWorks中使用MaxFrame创建数据源、PyODPS节点和MaxFrame会话的过程,并展示了如何通过MaxFrame实现分布式Pandas处理和大语言模型数据处理。测评反馈指出,虽然MaxFrame具备强大的数据处理能力,但在文档细节和新手友好性方面仍有改进空间。
|
1月前
|
数据采集 人工智能 分布式计算
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
在数据驱动的时代,大数据分析和AI模型训练对数据预处理的效率要求极高。传统的Pandas工具在小数据集下表现出色,但面对大规模数据时力不从心。阿里云推出的Python分布式计算框架MaxFrame,以“Pandas风格”为核心设计理念,旨在降低分布式计算门槛,同时支持超大规模数据处理。MaxFrame不仅保留了Pandas的操作习惯,还通过底层优化实现了高效的分布式调度、内存管理和容错机制,并深度集成阿里云大数据生态。本文将通过实践评测,全面解析MaxFrame的能力与价值,展示其在大数据和AI场景中的卓越表现。
61 4
🚀 MaxFrame 产品深度体验评测:Python 分布式计算的未来
|
1月前
|
人工智能 分布式计算 大数据
MaxFrame 产品评测:大数据与AI融合的Python分布式计算框架
MaxFrame是阿里云MaxCompute推出的自研Python分布式计算框架,支持大规模数据处理与AI应用。它提供类似Pandas的API,简化开发流程,并兼容多种机器学习库,加速模型训练前的数据准备。MaxFrame融合大数据和AI,提升效率、促进协作、增强创新能力。尽管初次配置稍显复杂,但其强大的功能集、性能优化及开放性使其成为现代企业与研究机构的理想选择。未来有望进一步简化使用门槛并加强社区建设。
80 7
|
1月前
|
SQL 分布式计算 数据处理
云产品评测|分布式Python计算服务MaxFrame | 在本地环境中使用MaxFrame + 基于MaxFrame实现大语言模型数据处理
本文基于官方文档,介绍了由浅入深的两个部分实操测试,包括在本地环境中使用MaxFrame & 基于MaxFrame实现大语言模型数据处理,对步骤有详细说明。体验下来对MaxCompute的感受是很不错的,值得尝试并使用!
53 1
|
1月前
|
SQL 分布式计算 DataWorks
MaxCompute MaxFrame评测 | 分布式Python计算服务MaxFrame(完整操作版)
在当今数字化迅猛发展的时代,数据信息的保存与分析对企业决策至关重要。MaxCompute MaxFrame是阿里云自研的分布式计算框架,支持Python编程接口、兼容Pandas接口并自动进行分布式计算。通过MaxCompute的海量计算资源,企业可以进行大规模数据处理、可视化数据分析及科学计算等任务。本文将详细介绍如何开通MaxCompute和DataWorks服务,并使用MaxFrame进行数据操作。包括创建项目、绑定数据源、编写PyODPS 3节点代码以及执行SQL查询等内容。最后,针对使用过程中遇到的问题提出反馈建议,帮助用户更好地理解和使用MaxFrame。
|
1月前
|
分布式计算 数据处理 MaxCompute
云产品评测|分布式Python计算服务MaxFrame
云产品评测|分布式Python计算服务MaxFrame
74 2
|
1月前
|
人工智能 分布式计算 数据处理
有奖评测,基于分布式 Python 计算服务 MaxFrame 进行数据处理
阿里云MaxCompute MaxFrame推出分布式Python计算服务MaxFrame评测活动,助力开发者高效完成大规模数据处理、可视化探索及ML/AI开发。活动时间为2024年12月17日至2025年1月31日,参与者需体验MaxFrame并发布评测文章,有机会赢取精美礼品。
|
2月前
|
人工智能 分布式计算 数据处理
云产品评测:MaxFrame — 分布式Python计算服务的最佳实践与体验
阿里云推出的MaxFrame是一款高性能分布式计算平台,专为大规模数据处理和AI应用设计。它提供了强大的Python编程接口,支持分布式Pandas操作,显著提升数据处理速度(3-5倍)。MaxFrame在大语言模型数据处理中表现出色,具备高效内存管理和任务调度能力。然而,在开通流程、API文档及功能集成度方面仍有改进空间。总体而言,MaxFrame在易用性和计算效率上具有明显优势,但在开放性和社区支持方面有待加强。
66 9

热门文章

最新文章

推荐镜像

更多