python3.7+Tornado5.1.1+Celery3.1+Rabbitmq3.7.16实现异步队列任务

简介: 在之前的一篇文章中提到了用[Django+Celery+Redis实现了异步任务队列](https://v3u.cn/a_id_54),只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。

在之前的一篇文章中提到了用Django+Celery+Redis实现了异步任务队列,只不过消息中间件使用了redis,redis作为消息中间件可谓是差强人意,功能和性能上都不如Rabbitmq,所以本次使用tornado框架结合celery,同时消息中间件使用Rabbitmq来实现异步发邮件,并且使用flower来监控任务队列。

首先安装rabbitmq

Mac os直接运行brew命令安装

#安装服务
brew install rabbitmq
#启动服务
brew services start rabbitmq

Win10系统就要下载安装包进行安装了,由于rabbitmq是基于erlang的,所以要首先安装erlang

1、首先,下载并运行Erlang for Windows 安装程序 (地址:http://www.erlang.org/downloads)下载完毕并安装(注意:安装目录请选择默认目录)

2、下载 RabbitMQ,(地址:http://www.rabbitmq.com/download.html )(注意:安装目录请选择默认目录)

安装成功后,启用web管理UI,进入RabbitMQ Serverrabbitmq\_server-3.6.6sbin,输入命令rabbitmq-plugins enable rabbitmq\_management

在系统的开始菜单里找到RabbitMQ的启动菜单,启动服务

浏览器输入,http://localhost:15672/,使用默认用户guest/guest进入网页端控制台:

代表没有问题了

然后安装tornado和celery,注意指定版本号

pip3 install tornado==5.1.1
pip3 install celery ==3.1
pip3 install pika ==0.9.14
pip3 install tornado-celery
pip3 install flower 

需要注意一点,由于python3.7中async已经作为关键字存在,但是有的三方库还没有及时修正,导致它们自己声明的变量和系统关键字重名,所以我们要深入三方库的源码,帮他们修改async关键字为async\_my,需要修改的文件夹和文件包含但不限于:

/site-packages/pika/adapters/libev\_connection.py

/site-packages/celery下面的文件

/site-packages/kombu下面的文件夹

在tornado项目下新建一个任务队列文件task.py:

import time
from celery import Celery
from func_tool import mail

C_FORCE_ROOT=True

celery = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
celery.conf.CELERY_RESULT_BACKEND = "amqp"


@celery.task
def sleep(seconds):
    time.sleep(float(seconds))
    return seconds

@celery.task
def sendmail(title,text,tomail):
    mail(title,text,tomail)
    return '发送邮件成功'

if __name__ == "__main__":
    celery.start()

然后编写服务端代码:

from celery import Celery
from tornado import gen
import tcelery
sys.path.append("..")
import task

#异步任务
class CeleryHandler(BaseHandler):
    @gen.coroutine
    def get(self):
        response = yield gen.Task(task.sendmail.apply_async,args=['你好','非常好','164850527@qq.com'])
        self.write('ok')
        self.finish()

路由器代码:

import tornado.web
from views import Index
import config


#路由 

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/celery", Index.CeleryHandler)
        ]
        super(Application,self).__init__(handlers,**config.setting)

程序入口代码server.py:

import tornado.ioloop
import tornado.httpserver
import config
 
from application import Application
 
 
 
 
if __name__ == "__main__":
    print('启动...')
    app = Application()
    httpServer = tornado.httpserver.HTTPServer(app)
    # httpServer.listen(8888)
    #绑定端口
    httpServer.bind(config.options['port'])
    #开启5个子进程(默认1,若为None或者小于0,开启对应硬件的CPU核心数个子进程)
    httpServer.start(1)
    tornado.ioloop.IOLoop.current().start()

进入项目目录,分别启动tornado服务,celery服务,以及flower服务

python server.py
celery -A task worker --loglevel=info
celery flower -A task --broker=amqp://guest:guest@localhost:5672//

访问网址http://localhost:8000/celery 用来触发异步任务

后台服务显示任务返回值:

进入flower在线任务监控网址:http://localhost:5555/

至此,整个流程就走完了。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
数据采集 缓存 Java
Python vs Java:爬虫任务中的效率比较
Python vs Java:爬虫任务中的效率比较
|
2天前
|
Python
深入理解 Python 中的异步操作:async 和 await
Python 的异步编程通过 `async` 和 `await` 关键字处理 I/O 密集型任务,如网络请求和文件读写,显著提高性能。`async` 定义异步函数,返回 awaitable 对象;`await` 用于等待这些对象完成。本文介绍异步编程基础、`async` 和 `await` 的用法、常见模式(并发任务、异常处理、异步上下文管理器)及实战案例(如使用 aiohttp 进行异步网络请求),帮助你高效利用系统资源并提升程序性能。
17 7
|
27天前
|
数据采集 存储 监控
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
86 7
21个Python脚本自动执行日常任务(2)
|
3天前
|
SQL 网络协议 安全
Python异步: 什么时候使用异步?
Asyncio 是 Python 中用于异步编程的库,适用于协程、非阻塞 I/O 和异步任务。使用 Asyncio 的原因包括:1) 使用协程实现轻量级并发;2) 采用异步编程范式提高效率;3) 实现非阻塞 I/O 提升 I/O 密集型应用性能。然而,Asyncio 并不适合所有场景,特别是在 CPU 密集型任务或已有线程/进程方案的情况下。选择 Asyncio 应基于项目需求和技术优势。
|
30天前
|
Python
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
49 18
|
1月前
|
数据采集 分布式计算 大数据
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。
|
1月前
|
数据采集 JSON 测试技术
Grequests,非常 Nice 的 Python 异步 HTTP 请求神器
在Python开发中,处理HTTP请求至关重要。`grequests`库基于`requests`,支持异步请求,通过`gevent`实现并发,提高性能。本文介绍了`grequests`的安装、基本与高级功能,如GET/POST请求、并发控制等,并探讨其在实际项目中的应用。
44 3
|
2月前
|
运维 监控 网络安全
自动化运维的崛起:如何利用Python脚本简化日常任务
【10月更文挑战第43天】在数字化时代的浪潮中,运维工作已从繁琐的手工操作转变为高效的自动化流程。本文将引导您了解如何运用Python编写脚本,以实现日常运维任务的自动化,从而提升工作效率和准确性。我们将通过一个实际案例,展示如何使用Python来自动部署应用、监控服务器状态并生成报告。文章不仅适合运维新手入门,也能为有经验的运维工程师提供新的视角和灵感。
|
2月前
|
运维 监控 Python
自动化运维:使用Python脚本简化日常任务
【10月更文挑战第36天】在数字化时代,运维工作的效率和准确性成为企业竞争力的关键。本文将介绍如何通过编写Python脚本来自动化日常的运维任务,不仅提高工作效率,还能降低人为错误的风险。从基础的文件操作到进阶的网络管理,我们将一步步展示Python在自动化运维中的应用,并分享实用的代码示例,帮助读者快速掌握自动化运维的核心技能。
104 3
|
2月前
|
运维 监控 Linux
自动化运维:如何利用Python脚本优化日常任务##
【10月更文挑战第29天】在现代IT运维中,自动化已成为提升效率、减少人为错误的关键技术。本文将介绍如何通过Python脚本来简化和自动化日常的运维任务,从而让运维人员能够专注于更高层次的工作。从备份管理到系统监控,再到日志分析,我们将一步步展示如何编写实用的Python脚本来处理这些任务。 ##