在项目中使用Celery
Project
项目文件结构:
src/ proj/__init__.py /celery.py /tasks.py
proj/celery.py
from celery import Celery app = Celery('proj', broker='amqp://', backend='rpc://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
Celery
的参数:
broker
:代理backend
:结果后端include
:worker
启动时要导入的模块列表。
然后我们定义了一些函数,并注册为任务@app.task
:
proj/tasks.py
from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
启动 worker
使用 celery 程序来启动 worker(需要在 proj上层目录(即src)下运行)
celery -A proj worker -l INFO
停止worker
Ctrl+C
后台运行worker:
守护程序脚本使用 celery multi
命令在后台启动一个或多个工作线程:
celery multi start w1 -A proj -l INFO
您也可以重启:
celery multi restart w1 -A proj -l INFO
或停止
celery multi stop w1 -A proj -l INFO
stop 命令是异步的,因此它不会等待工作线程关闭。您可能希望改用该 stopwait
命令,该命令可确保在退出之前完成所有当前正在执行的任务:
celery multi stopwait w1 -A proj -l INFO
默认情况下,它将在当前目录中创建 pid 和日志文件。为了防止多个工作线程相互叠加启动,建议您将这些工作线程放在一个专用目录中:
mkdir -p /var/run/celery mkdir -p /var/log/celery celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \ --logfile=/var/log/celery/%n%I.log
使用 multi 命令,您可以启动多个 worker,并且还有一个强大的命令行语法来指定不同 worker 的参数,例如:
celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \ -Q default -L:4,5 debug
Calling Tasks(调用任务)
- 可以使用
delay()
调用任务
from proj.tasks import add add.delay(2, 2)
delay方法实际上是apply_async()
的快捷方式,add.delay(2, 2)
相当于add.apply_async((2, 2))
。
2. apply_async()
允许更多的选择,如运行时间(countdown),队列(queue):
add.apply_async((2, 2), queue='lopri', countdown=10)
- 直接调用任务将在当前进程中执行任务,因此不会发送任何消息:
add(2, 2)
delay
和 apply_async
方法返回一个 AsyncResult
实例,可用于跟踪任务执行状态。但为此,您需要启用结果后端(result backend),以便状态可以存储在某个地方。
配置了结果后端,就可通过res.get()
获得任务的返回值:
res = add.delay(2, 2) res.get(timeout=1)
如果任务引发异常,您还可以检查异常并回溯, result.get()
默认情况下会传播错误(Trackback…)。
要检查任务是成功还是失败,您必须在结果实例上使用相应的方法:
res.failed() #True res.successful() #False
可以通过查看任务state
判断任务是否失败:
res.state #'FAILURE'
任务的状态在成功执行的情况下会这样变化:
PENDING -> STARTED -> SUCCESS
如果重试任务,则各个阶段可能会变得更加复杂。为了演示,对于重试两次的任务,阶段将是:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
Canvas:设计工作流
您刚刚学会了如何使用delay
方法调用任务。但有时您可能希望将任务调用的签名
(signature)传递给另一个进程,或者作为参数传递给另一个函数,Celery 为此使用了一种称为签名(signature)的东西。
签名包装了单个任务调用的参数
和执行选项
,使其可以传递给函数,甚至可以序列化并通过网络发送。
您可以使用参数 (2, 2) 和countdiwb=10 秒为 add 任务创建签名:
add.signature((2, 2), countdown=10) #tasks.add(2, 2)
还有一个快捷方式来创建签名:
add.s(2, 2)
And there’s that calling API again…
签名实例还支持calling API,因此它具有delay和apply_async方法。
s1 = add.s(2, 2) res = s1.delay() res.get()
你也可以制作不完整的签名:
# incomplete partial: add(?, 2) s2 = add.s(2) res = s2.delay(8) res.get() 10
也可以在签名中添加关键字参数
3 = add.s(2, 2, debug=True) s3.delay(debug=False) # debug is now False.
这看上去很好,但是究竟能用来干什么呢?为此,需要介绍Canvas的一些基元。
The primitives
- group
- chain
- chord
- map
- starmap
- chunks
这些基元都是签名对象,因此它们可以组合在一起,组成复杂的工作流。
group
group并行调用任务列表,并返回一个特殊的结果实例,该实例允许你将结果作为组进行检查,并按顺序检索返回值。
from celery import group from proj.tasks import add group(add.s(i, i) for i in range(10))().get() #[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Partial group
g = group(add.s(i) for i in range(10)) g(10).get() #[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
chain
任务可以链式调用,调用完一个任务后继续调用另一个任务:
from celery import chain from proj.tasks import add, mul # (4 + 4) * 8 chain(add.s(4, 4) | mul.s(8))().get() 64
partial chain:
#(? + 4) * 8 g = chain(add.s(4) | mul.s(8)) g(4).get() 64
链也可以这样写:
(add.s(4, 4) | mul.s(8))().get()
chord
chord是具有回调的group:
from celery import chord from proj.tasks import add, xsum chord((add.s(i, i) for i in range(10)), xsum.s())().get() 90
链接到另一个任务的组将自动转换为chord:
(group(add.s(i, i) for i in range(10)) | xsum.s())().get() 90
由于这些基元都是签名类型,因此它们几乎可以随心所欲地组合,例如:
upload_document.s(file) | group(apply_filter.s() for filter in filters)
Routing(路由)
Celery 支持 AMQP 提供的所有路由工具,但它也支持将消息发送到指定队列的简单路由。
task_routes
设置使您能够按名称路由任务,并将所有内容集中在一个位置:
app.conf.update( task_routes = { 'proj.tasks.add': {'queue': 'hipri'}, }, )
您还可以在运行时指定队列,方法是指定apply_async的queue
参数:
from proj.tasks import add add.apply_async((2, 2), queue='hipri')
然后,您可以通过指定 celery worker -Q 选项使worker从此队列中consume:
celery -A proj worker -Q hipri
您可以使用逗号分隔的列表指定多个队列。例如,您可以让辅worker同时consume默认队列(celery)和 hipri 队列
celery -A proj worker -Q hipri,celery
Remote Control 远程控制
如果您使用 RabbitMQ (AMQP)、Redis 或 Qpid 作为代理,则可以在运行时控制和检查工作线程。
例如,您可以查看worker正在处理的任务:
celery -A proj inspect active
这是通过使用广播消息来实现的,因此集群中的每个工作线程都会接收所有远程控制命令。
可用 --destination
选项指定worker对请求执行操作。以下是以逗号分隔的工作器主机名列表:
celery -A proj inspect active --destination=celery@example.com
Timezone
日期和时间,内部和消息中都使用UTC时区。
当worker受到消息时,会将UTC时间转换成本地时间。
(通常不需要手动设置时区)可以通过timezone
设置时区:
app.conf.timezone = 'Europe/London'
Optimization
默认配置未针对吞吐量进行优化。默认情况下,它尝试在许多短任务和较少的长任务之间走中间路线,这是吞吐量和公平计划之间的折衷。
What’s now?
现在您已经阅读了本文档,您应该继续阅读用户指南
还有一个API参考,可能会有用。