python使用Flask,Redis和Celery的异步任务

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: python使用Flask,Redis和Celery的异步任务

介绍

随着Web应用程序的发展和使用的增加,用例也变得多样化。我们现在正在建设和使用网站来执行比以往任何时候都更复杂的任务。其中一些任务可以进行处理,并将反馈立即转发给用户,而其他任务则需要稍后进行进一步处理和结果转发。越来越多地采用Internet访问和支持Internet的设备导致最终用户流量增加。

在本文中,我们将探讨Celery在Flask应用程序中安排后台任务的使用,以减轻资源密集型任务的负担并确定对最终用户的响应的优先级。


什么是任务队列?

任务队列是一种分配小的工作单元或任务的机制,可以在不干扰大多数基于Web的应用程序的请求-响应周期的情况下执行这些任务。

任务队列有助于委派工作,否则将在等待响应时降低应用程序的速度。它们还可以用于在主机或进程与用户交互时处理资源密集型任务。


示范

我们将构建一个Flask应用程序,该应用程序允许用户设置提醒,该提醒将在设定的时间传递到他们的电子邮件中。

我们还将提供自定义消息或提醒被调用并将消息发送给用户之前的时间的功能。

设定

与其他项目一样,我们的工作将在虚拟环境中进行 :


$ pipenv install --three$ pipenv shell

对于此项目,我们将需要安装Flask和Celery软件包以开始:

$ pipenv install flask celery

我们的Flask应用程序文件结构如下所示:

.├── Pipfile                    #管理我们的环境├── Pipfile.lock├── README.md├── __init__.py├── app.py                     # Flask应用程序的主要实现├── config.py                  # 托管配置├── requirements.txt           # 储存我们的要求└── templates└── index.html             # 登陆页面


1 directory, 8 files

让我们从创建Flask应用程序开始,该应用程序将呈现一个表单,该表单允许用户输入将来发送的消息的详细信息。

我们将以下内容添加到我们的app.py文件中:


from flask import Flask, flash, render_template, request, redirect, url_for
app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY']
@app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html')
elif request.method == 'POST':email = request.form['email']first_name = request.form['first_name']last_name = request.form['last_name']message = request.form['message']duration = request.form['duration']duration_unit = request.form['duration_unit']
flash(“Message scheduled”)return redirect(url_for('index'))

if __name__ == '__main__':app.run(debug=True)

这是一个非常简单的应用程序,只需一条路由即可处理GETPOST请求表单。提交详细信息后,我们可以将数据交给计划工作的功能。

为了整理主应用程序文件,我们将配置变量放在单独的config.py文件中,然后从文件中加载配置:


app.config.from_object("config")

我们的config.py文件将与该app.py文件位于同一文件夹中,并包含一些基本配置:


SECRET_KEY = 'very_very_secure_and_secret'# 更多配置

现在,让我们将目标网页实现为index.html



{% for message in get_flashed_messages() %}<p style="color: red;">{{ message }}</p>{% endfor %}
<form method="POST">First Name: <input id="first_name" name="first_name" type="text">Last Name: <input id="last_name" name="last_name" type="text">Email: <input id="email" name="email" type="email">Message: <textarea id="textarea" name="message"></textarea>Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">
<select name="duration_unit"><option value="" disabled selected>Choose the duration</option><option value="1">Minutes</option><option value="2">Hours</option><option value="3">Days</option></select>
<button type="submit" name="action">Submit </button></form>

现在,我们可以启动我们的应用程序:

1713288507803.png

使用邮件发送电子邮件

为了从Flask应用程序发送电子邮件,我们将使用Flask-Mail库,该库如下所示添加到我们的项目中:


$ pipenv install flask-mail

有了Flask应用程序和表单,我们现在可以将Flask-Mail集成到我们的app.py





from flask_mail import Mail, Message
app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY']
# 设置Flask-Mail集成
mail = Mail(app)
def send_mail(data):""" Function to send emails."""with app.app_context():msg = Message("Ping!",sender="admin.ping",recipients=[data['email']])msg.body = data['message']mail.send(msg)

功能 send_main(data)将接收要发送的消息和电子邮件的收件人,然后在经过指定的时间后将其调用以将电子邮件发送给用户。config.py为了使Flask-Mail正常运行,我们还需要向我们添加以下变量:

# Flask-邮件MAIL_SERVER = 'smtp.googlemail.com'MAIL_PORT = 587MAIL_USE_TLS = TrueMAIL_USERNAME = 'mail-username'MAIL_PASSWORD = 'mail-password'

整合

在我们的Flask应用程序准备就绪并配备了电子邮件发送功能之后,我们现在可以集成Celery,以便计划在以后发送电子邮件。我们app.py将再次被修改:




# 现有导入保持不变

from celery import Celery
# Flask应用程序和flask-mail配置被截断


# 设置客户端

client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])client.conf.update(app.config)
# 将此装饰器添加到我们的send_mail函数中

@client.taskdef send_mail(data):#函数保持不变


@app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html')
elif request.method == 'POST':data = {}data['email'] = request.form['email']data['first_name'] = request.form['first_name']data['last_name'] = request.form['last_name']data['message'] = request.form['message']duration = int(request.form['duration'])duration_unit = request.form['duration_unit']
if duration_unit == 'minutes':duration *= 60elif duration_unit == 'hours':duration *= 3600elif duration_unit == 'days':duration *= 86400
send_mail.apply_async(args=[data], countdown=duration)flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")
return redirect(url_for('index'))

celery通过附加消息传递代理的URL,我们导入并使用它在Flask应用程序中初始化Celery客户端。在我们的例子中,我们将使用Redis作为代理,因此我们将以下内容添加到我们的config.py


CELERY_BROKER_URL = 'redis://localhost:6379/0'CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

为了使我们的send_mail()功能作为后台任务执行,我们将添加@client.task装饰器,以便我们的Celery客户端会意识到这一点。

设置Celery客户端后,将修改还处理表单输入的主要功能。

首先,我们将send_mail()函数的输入数据打包在字典中。然后,我们使用函数通过Celery Task Calling API调用邮件功能,该函数apply_async接受函数所需的参数。

设置了一个可选countdown参数,定义了运行代码和执行任务之间的延迟。

 

汇集

为了运行我们的项目,我们将需要两个终端,一个终端启动我们的Flask应用程序,另一个终端启动Celery worker,后者将在后台发送消息。

在第一个终端中启动Flask应用程序:


$ python app.py

在第二个终端中,启动虚拟环境,然后启动Celery worker:


# 启动virtualenv
$ pipenv shell$ celery worker -A app.client --loglevel=info

如果一切顺利,我们将在运行Celery客户端的终端中获得以下反馈:

 

现在让我们导航到 http://localhost:5000并填写详细信息,以计划在提交2分钟后到达的电子邮件。

在表格上方,将显示一条消息,指示将接收电子邮件的地址以及发送电子邮件的持续时间。在我们的Celery终端中,我们还将能够看到一个日志条目,表明我们的电子邮件已被调度:


[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

ETA条目的部分显示何时send_email()调用我们的函数,以及何时发送电子邮件。

因此,让我们为后台任务实现一个监视解决方案,以便我们可以查看任务,并注意出现问题以及未按计划执行任务的情况。


监控我们的群集

安装Flower非常简单:


$ pipenv install flower

之前,我们在app.py文件中指定了Celery客户的详细信息。我们需要将该客户传递给Flower以对其进行监控。

为此,我们需要打开第三个终端窗口,进入虚拟环境,然后启动监视工具:


$ pipenv shell$ flower -A app.client --port=5555

启动Flower时,我们通过将其传递给application(-A)参数来指定Celery客户端,并通过该参数来指定要使用的端口--port

有了我们的监控功能后,让我们安排在仪表板上发送另一封电子邮件,然后导航到http://localhost:5555,在以下位置我们会对此表示欢迎:

在此页面上,我们可以看到Celery集群中的工作人员列表,该列表当前仅由我们的机器组成。

要查看我们刚刚计划的电子邮件,请单击仪表板左上方的“ 任务”按钮,这将带我们到可以查看已计划的任务的页面:

在本部分中,我们可以看到我们已计划了两封电子邮件,并且已在计划的时间成功发送了一封电子邮件。出于测试目的,计划分别在1分钟和5分钟后发送电子邮件。

 

结论

我们已经成功建立了Celery集群并将其集成到我们的Flask应用程序中,该应用程序允许用户计划在将来的某个时间后发送电子邮件。


最受欢迎的见解

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
Python
python3之flask快速入门教程Demo
python3之flask快速入门教程Demo
27 6
|
7天前
|
开发框架 数据库 开发者
Web开发新境界:用Python玩转Django和Flask!
【6月更文挑战第12天】Python的Web开发框架Django和Flask各有千秋。Django是全能型框架,适合快速开发大型应用,提供ORM、模板引擎、URL路由和后台管理等全面功能。Flask则轻量级且灵活,适用于小型到中型应用,以其简单易用、高度可扩展和灵活路由著称。两者结合使用,能应对各种Web开发需求。
|
22小时前
|
关系型数据库 MySQL 数据库
如何使用Python的Flask框架来构建一个简单的Web应用
如何使用Python的Flask框架来构建一个简单的Web应用
5 0
|
3天前
|
运维 监控 API
自动化运维实践指南:Python脚本优化服务器管理任务
本文探讨了Python在自动化运维中的应用,介绍了使用Python脚本优化服务器管理的四个关键步骤:1) 安装必备库如paramiko、psutil和requests;2) 使用paramiko进行远程命令执行;3) 利用psutil监控系统资源;4) 结合requests自动化软件部署。这些示例展示了Python如何提升运维效率和系统稳定性。
24 8
|
6天前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
7天前
|
Python
并发编程,Python让你轻松驾驭多线程与异步IO!
【6月更文挑战第12天】本文探讨了Python中的并发编程,包括多线程和异步IO。通过`threading`模块展示了多线程编程,创建并运行多个线程以并发执行任务。同时,使用`asyncio`库演示了异步IO编程,允许在单线程中高效处理多个IO操作。两个示例代码详细解释了如何在Python中实现并发,展现了其在提升程序性能和响应速度方面的潜力。
|
9天前
|
调度 数据库 开发者
在Python编程中,并发编程和异步IO是两个重要的概念,它们对于提高程序性能和响应速度具有至关重要的作用
【6月更文挑战第10天】本文介绍了Python并发编程和异步IO,包括并发编程的基本概念如多线程、多进程和协程。线程和进程可通过threading及multiprocessing模块管理,但多线程受限于GIL。协程利用asyncio模块实现非阻塞IO,适合处理IO密集型任务。异步IO基于事件循环,能提高服务器并发处理能力,适用于网络编程和文件操作等场景。异步IO与多线程、多进程在不同任务中有各自优势,开发者应根据需求选择合适的技术。
18 0
|
10天前
|
DataWorks 监控 API
DataWorks产品使用合集之在赋值节点上面为什么不能使用全局变量o或odps
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
10天前
|
SQL DataWorks 监控
DataWorks产品使用合集之有没有办法用python获取到那几个任务的实例再调度
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
12天前
|
缓存 NoSQL Redis
Python与Redis:提升性能,确保可靠性,掌握最佳实践
Python与Redis:提升性能,确保可靠性,掌握最佳实践