一、Celery异步分布式
Celery 是一个python开发的异步分布式任务调度模块,是一个消息传输的中间件,可以理解为一个邮箱,每当应用程序调用celery的异步任务时,会向broker传递消息,然后celery的worker从中取消息
Celery 用于存储消息以及celery执行的一些消息和结果
对于brokers,官方推荐是rabbitmq和redis
对于backend,也就是指数据库,为了简单一般使用redis
使用redis连接url格式:
redis://:password@hostname:port/db_number
1)定义连接脚本tasks.py
1
2
3
4
5
6
7
8
9
|
#!/usr/bin/env python
from
celery
import
Celery
broker
=
"redis://192.168.2.230:6379/1"
backend
=
"redis://192.168.2.230:6379/2"
app
=
Celery(
"tasks"
, broker
=
broker, backend
=
backend)
@app
.task
def
add(x,y):
return
x
+
y
|
2)安装启动celery
pip install celery
pip install redis
启动方式:celery -A huang tasks -l info #-l 等同于 --loglevel
3)执行测试 huang.py
1
2
3
4
5
6
7
8
9
10
|
#!/usr/bin/env python
from
tasks
import
add
re
=
add.delay(
10
,
20
)
print
(re.result)
#任务返回值
print
(re.ready)
#如果任务被执行返回True,其他情况返回False
print
(re.get(timeout
=
1
))
#带参数的等待,最后返回结果
print
(re.status)
#任务当前状态
|
运行结果:
30
<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>
30
SUCCESS
4)根据成功返回的key或celery界面输出的信息,查看redis存储
说明:停止celery服务,执行完huang.py之后,再启动celery服务也是有保存数据的
二、celery多进程
1)配置文件 celeryconfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from
kombu
import
Exchange,Queue
BROKER_URL
=
"redis://192.168.2.230:6379/3"
CELERY_RESULT_BACKEND
=
"redis://192.168.2.230:6379/4"
CELERY_QUEUES
=
(
Queue(
"default"
,Exchange(
"default"
),routing_key
=
"default"
),
Queue(
"for_task_A"
,Exchange(
"for_task_A"
),routing_key
=
"for_task_A"
),
Queue(
"for_task_B"
,Exchange(
"for_task_B"
),routing_key
=
"for_task_B"
)
)
CELERY_ROUTES
=
{
'tasks.taskA'
:{
"queue"
:
"for_task_A"
,
"routing_key"
:
"for_task_A"
},
'tasks.taskB'
:{
"queue"
:
"for_task_B"
,
"routing_key"
:
"for_task_B"
}
}
|
2)tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from
celery
import
Celery
app
=
Celery()
app.config_from_object(
"celeryconfig"
)
@app
.task
def
taskA(x,y):
return
x
+
y
@app
.task
def
taskB(x,y,z):
return
x
+
y
+
z
|
3)启动celery
celery -A tasks worker --loglevel info
4)执行脚本huang2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
#!/usr/bin/env python
#-*- coding:utf-8 -*-
from
tasks
import
taskA,taskB
re
=
taskA.delay(
10
,
20
)
print
(re.result)
#任务返回值
print
(re.ready)
#如果任务被执行返回True,其他情况返回False
print
(re.get(timeout
=
1
))
#带参数的等待,最后返回结果
print
(re.status)
#任务当前状态
re2
=
taskB.delay(
10
,
20
,
30
)
print
(re2.result)
print
(re2.ready)
print
(re2.get(timeout
=
1
))
print
(re2.status)
|
5)运行结果
None
<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>
30
SUCCESS
None
<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>
60
SUCCESS