python使用Flask,Redis和Celery的异步任务

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: python使用Flask,Redis和Celery的异步任务

介绍

随着Web应用程序的发展和使用的增加,用例也变得多样化。我们现在正在建设和使用网站来执行比以往任何时候都更复杂的任务。其中一些任务可以进行处理,并将反馈立即转发给用户,而其他任务则需要稍后进行进一步处理和结果转发。越来越多地采用Internet访问和支持Internet的设备导致最终用户流量增加。

在本文中,我们将探讨Celery在Flask应用程序中安排后台任务的使用,以减轻资源密集型任务的负担并确定对最终用户的响应的优先级。


什么是任务队列?

任务队列是一种分配小的工作单元或任务的机制,可以在不干扰大多数基于Web的应用程序的请求-响应周期的情况下执行这些任务。

任务队列有助于委派工作,否则将在等待响应时降低应用程序的速度。它们还可以用于在主机或进程与用户交互时处理资源密集型任务。


示范

我们将构建一个Flask应用程序,该应用程序允许用户设置提醒,该提醒将在设定的时间传递到他们的电子邮件中。

我们还将提供自定义消息或提醒被调用并将消息发送给用户之前的时间的功能。

设定

与其他项目一样,我们的工作将在虚拟环境中进行 :


$ pipenv install --three$ pipenv shell

对于此项目,我们将需要安装Flask和Celery软件包以开始:

$ pipenv install flask celery

我们的Flask应用程序文件结构如下所示:

.├── Pipfile                    #管理我们的环境├── Pipfile.lock├── README.md├── __init__.py├── app.py                     # Flask应用程序的主要实现├── config.py                  # 托管配置├── requirements.txt           # 储存我们的要求└── templates└── index.html             # 登陆页面


1 directory, 8 files

让我们从创建Flask应用程序开始,该应用程序将呈现一个表单,该表单允许用户输入将来发送的消息的详细信息。

我们将以下内容添加到我们的app.py文件中:


from flask import Flask, flash, render_template, request, redirect, url_for
app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY']
@app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html')
elif request.method == 'POST':email = request.form['email']first_name = request.form['first_name']last_name = request.form['last_name']message = request.form['message']duration = request.form['duration']duration_unit = request.form['duration_unit']
flash(“Message scheduled”)return redirect(url_for('index'))

if __name__ == '__main__':app.run(debug=True)

这是一个非常简单的应用程序,只需一条路由即可处理GETPOST请求表单。提交详细信息后,我们可以将数据交给计划工作的功能。

为了整理主应用程序文件,我们将配置变量放在单独的config.py文件中,然后从文件中加载配置:


app.config.from_object("config")

我们的config.py文件将与该app.py文件位于同一文件夹中,并包含一些基本配置:


SECRET_KEY = 'very_very_secure_and_secret'# 更多配置

现在,让我们将目标网页实现为index.html



{% for message in get_flashed_messages() %}<p style="color: red;">{{ message }}</p>{% endfor %}
<form method="POST">First Name: <input id="first_name" name="first_name" type="text">Last Name: <input id="last_name" name="last_name" type="text">Email: <input id="email" name="email" type="email">Message: <textarea id="textarea" name="message"></textarea>Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">
<select name="duration_unit"><option value="" disabled selected>Choose the duration</option><option value="1">Minutes</option><option value="2">Hours</option><option value="3">Days</option></select>
<button type="submit" name="action">Submit </button></form>

现在,我们可以启动我们的应用程序:

1713288507803.png

使用邮件发送电子邮件

为了从Flask应用程序发送电子邮件,我们将使用Flask-Mail库,该库如下所示添加到我们的项目中:


$ pipenv install flask-mail

有了Flask应用程序和表单,我们现在可以将Flask-Mail集成到我们的app.py





from flask_mail import Mail, Message
app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY']
# 设置Flask-Mail集成
mail = Mail(app)
def send_mail(data):""" Function to send emails."""with app.app_context():msg = Message("Ping!",sender="admin.ping",recipients=[data['email']])msg.body = data['message']mail.send(msg)

功能 send_main(data)将接收要发送的消息和电子邮件的收件人,然后在经过指定的时间后将其调用以将电子邮件发送给用户。config.py为了使Flask-Mail正常运行,我们还需要向我们添加以下变量:

# Flask-邮件MAIL_SERVER = 'smtp.googlemail.com'MAIL_PORT = 587MAIL_USE_TLS = TrueMAIL_USERNAME = 'mail-username'MAIL_PASSWORD = 'mail-password'

整合

在我们的Flask应用程序准备就绪并配备了电子邮件发送功能之后,我们现在可以集成Celery,以便计划在以后发送电子邮件。我们app.py将再次被修改:




# 现有导入保持不变

from celery import Celery
# Flask应用程序和flask-mail配置被截断


# 设置客户端

client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])client.conf.update(app.config)
# 将此装饰器添加到我们的send_mail函数中

@client.taskdef send_mail(data):#函数保持不变


@app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html')
elif request.method == 'POST':data = {}data['email'] = request.form['email']data['first_name'] = request.form['first_name']data['last_name'] = request.form['last_name']data['message'] = request.form['message']duration = int(request.form['duration'])duration_unit = request.form['duration_unit']
if duration_unit == 'minutes':duration *= 60elif duration_unit == 'hours':duration *= 3600elif duration_unit == 'days':duration *= 86400
send_mail.apply_async(args=[data], countdown=duration)flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")
return redirect(url_for('index'))

celery通过附加消息传递代理的URL,我们导入并使用它在Flask应用程序中初始化Celery客户端。在我们的例子中,我们将使用Redis作为代理,因此我们将以下内容添加到我们的config.py


CELERY_BROKER_URL = 'redis://localhost:6379/0'CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

为了使我们的send_mail()功能作为后台任务执行,我们将添加@client.task装饰器,以便我们的Celery客户端会意识到这一点。

设置Celery客户端后,将修改还处理表单输入的主要功能。

首先,我们将send_mail()函数的输入数据打包在字典中。然后,我们使用函数通过Celery Task Calling API调用邮件功能,该函数apply_async接受函数所需的参数。

设置了一个可选countdown参数,定义了运行代码和执行任务之间的延迟。

 

汇集

为了运行我们的项目,我们将需要两个终端,一个终端启动我们的Flask应用程序,另一个终端启动Celery worker,后者将在后台发送消息。

在第一个终端中启动Flask应用程序:


$ python app.py

在第二个终端中,启动虚拟环境,然后启动Celery worker:


# 启动virtualenv
$ pipenv shell$ celery worker -A app.client --loglevel=info

如果一切顺利,我们将在运行Celery客户端的终端中获得以下反馈:

 

现在让我们导航到 http://localhost:5000并填写详细信息,以计划在提交2分钟后到达的电子邮件。

在表格上方,将显示一条消息,指示将接收电子邮件的地址以及发送电子邮件的持续时间。在我们的Celery终端中,我们还将能够看到一个日志条目,表明我们的电子邮件已被调度:


[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

ETA条目的部分显示何时send_email()调用我们的函数,以及何时发送电子邮件。

因此,让我们为后台任务实现一个监视解决方案,以便我们可以查看任务,并注意出现问题以及未按计划执行任务的情况。


监控我们的群集

安装Flower非常简单:


$ pipenv install flower

之前,我们在app.py文件中指定了Celery客户的详细信息。我们需要将该客户传递给Flower以对其进行监控。

为此,我们需要打开第三个终端窗口,进入虚拟环境,然后启动监视工具:


$ pipenv shell$ flower -A app.client --port=5555

启动Flower时,我们通过将其传递给application(-A)参数来指定Celery客户端,并通过该参数来指定要使用的端口--port

有了我们的监控功能后,让我们安排在仪表板上发送另一封电子邮件,然后导航到http://localhost:5555,在以下位置我们会对此表示欢迎:

在此页面上,我们可以看到Celery集群中的工作人员列表,该列表当前仅由我们的机器组成。

要查看我们刚刚计划的电子邮件,请单击仪表板左上方的“ 任务”按钮,这将带我们到可以查看已计划的任务的页面:

在本部分中,我们可以看到我们已计划了两封电子邮件,并且已在计划的时间成功发送了一封电子邮件。出于测试目的,计划分别在1分钟和5分钟后发送电子邮件。

 

结论

我们已经成功建立了Celery集群并将其集成到我们的Flask应用程序中,该应用程序允许用户计划在将来的某个时间后发送电子邮件。


最受欢迎的见解

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
运维 监控 Python
自动化运维:使用Python脚本简化日常任务
【10月更文挑战第36天】在数字化时代,运维工作的效率和准确性成为企业竞争力的关键。本文将介绍如何通过编写Python脚本来自动化日常的运维任务,不仅提高工作效率,还能降低人为错误的风险。从基础的文件操作到进阶的网络管理,我们将一步步展示Python在自动化运维中的应用,并分享实用的代码示例,帮助读者快速掌握自动化运维的核心技能。
18 3
|
8天前
|
开发框架 前端开发 JavaScript
利用Python和Flask构建轻量级Web应用的实战指南
利用Python和Flask构建轻量级Web应用的实战指南
25 2
|
13天前
|
运维 监控 Linux
自动化运维:如何利用Python脚本优化日常任务##
【10月更文挑战第29天】在现代IT运维中,自动化已成为提升效率、减少人为错误的关键技术。本文将介绍如何通过Python脚本来简化和自动化日常的运维任务,从而让运维人员能够专注于更高层次的工作。从备份管理到系统监控,再到日志分析,我们将一步步展示如何编写实用的Python脚本来处理这些任务。 ##
|
17天前
|
JSON API 数据格式
如何使用Python和Flask构建一个简单的RESTful API。Flask是一个轻量级的Web框架
本文介绍了如何使用Python和Flask构建一个简单的RESTful API。Flask是一个轻量级的Web框架,适合小型项目和微服务。文章从环境准备、创建基本Flask应用、定义资源和路由、请求和响应处理、错误处理等方面进行了详细说明,并提供了示例代码。通过这些步骤,读者可以快速上手构建自己的RESTful API。
25 2
|
19天前
|
安全 数据库 C++
Python Web框架比较:Django vs Flask vs Pyramid
Python Web框架比较:Django vs Flask vs Pyramid
28 1
|
19天前
|
JSON API 数据格式
构建RESTful APIs:使用Python和Flask
构建RESTful APIs:使用Python和Flask
26 1
|
21天前
|
关系型数据库 MySQL 数据处理
探索Python中的异步编程:从asyncio到异步数据库操作
在这个快节奏的技术世界里,效率和性能是关键。本文将带你深入Python的异步编程世界,从基础的asyncio库开始,逐步探索到异步数据库操作的高级应用。我们将一起揭开异步编程的神秘面纱,探索它如何帮助我们提升应用程序的性能和响应速度。
|
30天前
|
安全 数据库 C++
Python Web框架比较:Django vs Flask vs Pyramid
Python Web框架比较:Django vs Flask vs Pyramid
24 4
|
30天前
|
JSON API 数据格式
使用Python和Flask构建简单的RESTful API
【10月更文挑战第12天】使用Python和Flask构建简单的RESTful API
42 1
|
16天前
|
NoSQL 关系型数据库 MySQL
python协程+异步总结!
本文介绍了Python中的协程、asyncio模块以及异步编程的相关知识。首先解释了协程的概念和实现方法,包括greenlet、yield关键字、asyncio装饰器和async/await关键字。接着详细讲解了协程的意义和应用场景,如提高IO密集型任务的性能。文章还介绍了事件循环、Task对象、Future对象等核心概念,并提供了多个实战案例,包括异步Redis、MySQL操作、FastAPI框架和异步爬虫。最后提到了uvloop作为asyncio的高性能替代方案。通过这些内容,读者可以全面了解和掌握Python中的异步编程技术。
34 0