44. Python Celery多实例 定时任务

简介:

celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢?

celery可以支持多台不同的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。

具体可以查看AMQP文档详细了解。

简单理解:

可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,

而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。

如图:

clipboard.png

exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker

写个例子:

vim demon3.py

1
2
3
4
5
6
7
8
9
10
11
12
from  celery  import  Celery
app  =  Celery()
app.config_from_object( "celeryconfig" )
@app .task
def  taskA(x, y):
     return  *  y
@app .task
def  taskB(x, y, z):
     return  +  +  z
@app .task
def  add(x, y):
     return  +  y

vim celeryconfig.py

1
2
3
4
5
6
7
8
9
10
11
12
13
from  kombu  import  Queue
BORKER_URL  =  "redis://192.168.48.131:6379/1"               #1库
CELERY_RESULT_BACKEND  =  "redis://192.168.48.131:6379/2"    #2库
CELERY_QUEUES  =  {
     Queue( "default" , Exchange( "default" ), routing_key  =  "default" ),
     Queue( "for_task_A" , Exchange( "for_task_A" ), routing_key  =  "for_task_A" ),
     Queue( "for_task_B" , Exchange( "for_task_B" ), routing_key  =  "for_task_B" )
}
#路由
CELERY_ROUTES  =  {
     "demon3.taskA" :{ "queue" "for_task_A" ,   "routing_key" "for_task_A" },
     "demon3.taskB" :{ "queue" "for_task_B" ,   "routing_key" "for_task_B" }
}

下面把两个脚本导入服务器:

指定taskA启动一个worker:

1
# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

同理:

1
# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

下面远程客户端调用:新文件

vim remote.py

1
2
3
4
5
6
7
8
9
10
11
12
from  demon3  import  *
r1  =  taskA.delay( 10 20 )
print  (r1.result)
print  (r1.status)
r2  =  taskB.delay( 10 20 30 )
time.sleep( 1 )
prnit (r2.result)
print  (r2.status)
#print (dir(r2))
r3  =  add.delay( 100 200 )
print  (r3.result)
print  (r3.status)   #PENDING

看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。

下面,我们来启动一个worker来执行celery队列中的任务

1
# celery -A tasks worker -l info -n worker.%h -Q celery      ##默认的

可以看到这行的结果为success

print(re3.status)    #SUCCESS


定时任务:

Celery 与 定时任务

在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。

下面我们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CELERY_TIMEZONE  =  'UTC'
CELERYBEAT_SCHEDULE  =  {
     'taskA_schedule'  : {
         'task' : 'tasks.taskA' ,
         'schedule' : 20 ,
         'args' :( 5 , 6 )
     },
'taskB_scheduler'  : {
     'task' : "tasks.taskB" ,
     "schedule" : 200 ,
     "args" :( 10 , 20 , 30 )
     },
'add_schedule' : {
     "task" : "tasks.add" ,
     "schedule" : 10 ,
     "args" :( 1 , 2 )
     }
}

注意格式,否则会有问题

启动:

celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

celery -A tasks worker -l info -n worker.%h -Q celery

celery -A demon3 beat



本文转自 听丶飞鸟说 51CTO博客,原文链接:http://blog.51cto.com/286577399/2052690


相关文章
|
8月前
|
存储 Java 调度
Python定时任务实战:APScheduler从入门到精通
APScheduler是Python强大的定时任务框架,通过触发器、执行器、任务存储和调度器四大组件,灵活实现各类周期性任务。支持内存、数据库、Redis等持久化存储,适用于Web集成、数据抓取、邮件发送等场景,解决传统sleep循环的诸多缺陷,助力构建稳定可靠的自动化系统。(238字)
1422 1
|
算法 Python
Apriori算法的Python实例演示
经过运行,你会看到一些集合出现,每个集合的支持度也会给出。这些集合就是你想要的,经常一起被购买的商品组合。不要忘记,`min_support`参数将决定频繁项集的数量和大小,你可以根据自己的需要进行更改。
472 18
|
前端开发 搜索推荐 编译器
【01】python开发之实例开发讲解-如何获取影视网站中经过保护后的视频-用python如何下载无法下载的视频资源含m3u8-python插件之dlp-举例几种-详解优雅草央千澈
【01】python开发之实例开发讲解-如何获取影视网站中经过保护后的视频-用python如何下载无法下载的视频资源含m3u8-python插件之dlp-举例几种-详解优雅草央千澈
1394 34
【01】python开发之实例开发讲解-如何获取影视网站中经过保护后的视频-用python如何下载无法下载的视频资源含m3u8-python插件之dlp-举例几种-详解优雅草央千澈
|
人工智能 编译器 Python
python已经安装有其他用途如何用hbuilerx配置环境-附带实例demo-python开发入门之hbuilderx编译器如何配置python环境—hbuilderx配置python环境优雅草央千澈
python已经安装有其他用途如何用hbuilerx配置环境-附带实例demo-python开发入门之hbuilderx编译器如何配置python环境—hbuilderx配置python环境优雅草央千澈
368 0
python已经安装有其他用途如何用hbuilerx配置环境-附带实例demo-python开发入门之hbuilderx编译器如何配置python环境—hbuilderx配置python环境优雅草央千澈
|
数据挖掘 vr&ar C++
让UE自动运行Python脚本:实现与实例解析
本文介绍如何配置Unreal Engine(UE)以自动运行Python脚本,提高开发效率。通过安装Python、配置UE环境及使用第三方插件,实现Python与UE的集成。结合蓝图和C++示例,展示自动化任务处理、关卡生成及数据分析等应用场景。
1755 5
|
Python
Python中的push方法详解与实例
Python中的push方法详解与实例
338 3
|
测试技术 API 数据安全/隐私保护
Python连接到Jira实例、登录、查询、修改和创建bug
通过使用Python和Jira的REST API,可以方便地连接到Jira实例并进行各种操作,包括查询、修改和创建Bug。`jira`库提供了简洁的接口,使得这些操作变得简单易行。无论是自动化测试还是开发工作流的集成,这些方法都可以极大地提高效率和准确性。希望通过本文的介绍,您能够更好地理解和应用这些技术。
1660 0
|
数据可视化 Python
Python绘制基频曲线——实例解析与应用探讨
Python绘制基频曲线——实例解析与应用探讨
176 0
|
Python 容器
AutoDL Python实现 自动续签 防止实例过期释放 小脚本 定时任务 apscheduler requests
AutoDL Python实现 自动续签 防止实例过期释放 小脚本 定时任务 apscheduler requests
706 0
|
存储 NoSQL Java
万字总结!Python 实现定时任务的八种方案(上)
万字总结!Python 实现定时任务的八种方案
3692 0
万字总结!Python 实现定时任务的八种方案(上)

推荐镜像

更多