python使用Flask,Redis和Celery的异步任务

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: python使用Flask,Redis和Celery的异步任务

介绍

随着Web应用程序的发展和使用的增加,用例也变得多样化。我们现在正在建设和使用网站来执行比以往任何时候都更复杂的任务。其中一些任务可以进行处理,并将反馈立即转发给用户,而其他任务则需要稍后进行进一步处理和结果转发。越来越多地采用Internet访问和支持Internet的设备导致最终用户流量增加。

在本文中,我们将探讨Celery在Flask应用程序中安排后台任务的使用,以减轻资源密集型任务的负担并确定对最终用户的响应的优先级。


什么是任务队列?

任务队列是一种分配小的工作单元或任务的机制,可以在不干扰大多数基于Web的应用程序的请求-响应周期的情况下执行这些任务。

任务队列有助于委派工作,否则将在等待响应时降低应用程序的速度。它们还可以用于在主机或进程与用户交互时处理资源密集型任务。


示范

我们将构建一个Flask应用程序,该应用程序允许用户设置提醒,该提醒将在设定的时间传递到他们的电子邮件中。

我们还将提供自定义消息或提醒被调用并将消息发送给用户之前的时间的功能。

设定

与其他项目一样,我们的工作将在虚拟环境中进行 :


$ pipenv install --three$ pipenv shell

对于此项目,我们将需要安装Flask和Celery软件包以开始:

$ pipenv install flask celery

我们的Flask应用程序文件结构如下所示:

.├── Pipfile                    #管理我们的环境├── Pipfile.lock├── README.md├── __init__.py├── app.py                     # Flask应用程序的主要实现├── config.py                  # 托管配置├── requirements.txt           # 储存我们的要求└── templates└── index.html             # 登陆页面


1 directory, 8 files

让我们从创建Flask应用程序开始,该应用程序将呈现一个表单,该表单允许用户输入将来发送的消息的详细信息。

我们将以下内容添加到我们的app.py文件中:


from flask import Flask, flash, render_template, request, redirect, url_for
app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY']
@app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html')
elif request.method == 'POST':email = request.form['email']first_name = request.form['first_name']last_name = request.form['last_name']message = request.form['message']duration = request.form['duration']duration_unit = request.form['duration_unit']
flash(“Message scheduled”)return redirect(url_for('index'))

if __name__ == '__main__':app.run(debug=True)

这是一个非常简单的应用程序,只需一条路由即可处理GETPOST请求表单。提交详细信息后,我们可以将数据交给计划工作的功能。

为了整理主应用程序文件,我们将配置变量放在单独的config.py文件中,然后从文件中加载配置:


app.config.from_object("config")

我们的config.py文件将与该app.py文件位于同一文件夹中,并包含一些基本配置:


SECRET_KEY = 'very_very_secure_and_secret'# 更多配置

现在,让我们将目标网页实现为index.html



{% for message in get_flashed_messages() %}<p style="color: red;">{{ message }}</p>{% endfor %}
<form method="POST">First Name: <input id="first_name" name="first_name" type="text">Last Name: <input id="last_name" name="last_name" type="text">Email: <input id="email" name="email" type="email">Message: <textarea id="textarea" name="message"></textarea>Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">
<select name="duration_unit"><option value="" disabled selected>Choose the duration</option><option value="1">Minutes</option><option value="2">Hours</option><option value="3">Days</option></select>
<button type="submit" name="action">Submit </button></form>

现在,我们可以启动我们的应用程序:

1713288507803.png

使用邮件发送电子邮件

为了从Flask应用程序发送电子邮件,我们将使用Flask-Mail库,该库如下所示添加到我们的项目中:


$ pipenv install flask-mail

有了Flask应用程序和表单,我们现在可以将Flask-Mail集成到我们的app.py





from flask_mail import Mail, Message
app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY']
# 设置Flask-Mail集成
mail = Mail(app)
def send_mail(data):""" Function to send emails."""with app.app_context():msg = Message("Ping!",sender="admin.ping",recipients=[data['email']])msg.body = data['message']mail.send(msg)

功能 send_main(data)将接收要发送的消息和电子邮件的收件人,然后在经过指定的时间后将其调用以将电子邮件发送给用户。config.py为了使Flask-Mail正常运行,我们还需要向我们添加以下变量:

# Flask-邮件MAIL_SERVER = 'smtp.googlemail.com'MAIL_PORT = 587MAIL_USE_TLS = TrueMAIL_USERNAME = 'mail-username'MAIL_PASSWORD = 'mail-password'

整合

在我们的Flask应用程序准备就绪并配备了电子邮件发送功能之后,我们现在可以集成Celery,以便计划在以后发送电子邮件。我们app.py将再次被修改:




# 现有导入保持不变

from celery import Celery
# Flask应用程序和flask-mail配置被截断


# 设置客户端

client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])client.conf.update(app.config)
# 将此装饰器添加到我们的send_mail函数中

@client.taskdef send_mail(data):#函数保持不变


@app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html')
elif request.method == 'POST':data = {}data['email'] = request.form['email']data['first_name'] = request.form['first_name']data['last_name'] = request.form['last_name']data['message'] = request.form['message']duration = int(request.form['duration'])duration_unit = request.form['duration_unit']
if duration_unit == 'minutes':duration *= 60elif duration_unit == 'hours':duration *= 3600elif duration_unit == 'days':duration *= 86400
send_mail.apply_async(args=[data], countdown=duration)flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")
return redirect(url_for('index'))

celery通过附加消息传递代理的URL,我们导入并使用它在Flask应用程序中初始化Celery客户端。在我们的例子中,我们将使用Redis作为代理,因此我们将以下内容添加到我们的config.py


CELERY_BROKER_URL = 'redis://localhost:6379/0'CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

为了使我们的send_mail()功能作为后台任务执行,我们将添加@client.task装饰器,以便我们的Celery客户端会意识到这一点。

设置Celery客户端后,将修改还处理表单输入的主要功能。

首先,我们将send_mail()函数的输入数据打包在字典中。然后,我们使用函数通过Celery Task Calling API调用邮件功能,该函数apply_async接受函数所需的参数。

设置了一个可选countdown参数,定义了运行代码和执行任务之间的延迟。

 

汇集

为了运行我们的项目,我们将需要两个终端,一个终端启动我们的Flask应用程序,另一个终端启动Celery worker,后者将在后台发送消息。

在第一个终端中启动Flask应用程序:


$ python app.py

在第二个终端中,启动虚拟环境,然后启动Celery worker:


# 启动virtualenv
$ pipenv shell$ celery worker -A app.client --loglevel=info

如果一切顺利,我们将在运行Celery客户端的终端中获得以下反馈:

 

现在让我们导航到 http://localhost:5000并填写详细信息,以计划在提交2分钟后到达的电子邮件。

在表格上方,将显示一条消息,指示将接收电子邮件的地址以及发送电子邮件的持续时间。在我们的Celery终端中,我们还将能够看到一个日志条目,表明我们的电子邮件已被调度:


[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]

ETA条目的部分显示何时send_email()调用我们的函数,以及何时发送电子邮件。

因此,让我们为后台任务实现一个监视解决方案,以便我们可以查看任务,并注意出现问题以及未按计划执行任务的情况。


监控我们的群集

安装Flower非常简单:


$ pipenv install flower

之前,我们在app.py文件中指定了Celery客户的详细信息。我们需要将该客户传递给Flower以对其进行监控。

为此,我们需要打开第三个终端窗口,进入虚拟环境,然后启动监视工具:


$ pipenv shell$ flower -A app.client --port=5555

启动Flower时,我们通过将其传递给application(-A)参数来指定Celery客户端,并通过该参数来指定要使用的端口--port

有了我们的监控功能后,让我们安排在仪表板上发送另一封电子邮件,然后导航到http://localhost:5555,在以下位置我们会对此表示欢迎:

在此页面上,我们可以看到Celery集群中的工作人员列表,该列表当前仅由我们的机器组成。

要查看我们刚刚计划的电子邮件,请单击仪表板左上方的“ 任务”按钮,这将带我们到可以查看已计划的任务的页面:

在本部分中,我们可以看到我们已计划了两封电子邮件,并且已在计划的时间成功发送了一封电子邮件。出于测试目的,计划分别在1分钟和5分钟后发送电子邮件。

 

结论

我们已经成功建立了Celery集群并将其集成到我们的Flask应用程序中,该应用程序允许用户计划在将来的某个时间后发送电子邮件。


最受欢迎的见解

相关文章
|
27天前
|
供应链 并行计算 算法
1行Python搞定高频任务!26个实用技巧解决日常+进阶需求
本文整理了26个Python极简技巧,涵盖日常高频操作与进阶玩法,助你用最少代码高效解决问题,提升编程效率。适合各阶段Python学习者参考。
70 27
|
5月前
|
存储 NoSQL Redis
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 +  无锁架构 +  EDA架构  + 异步日志 + 集群架构
|
11月前
|
数据采集 缓存 Java
Python vs Java:爬虫任务中的效率比较
Python vs Java:爬虫任务中的效率比较
|
11月前
|
监控 并行计算 数据处理
构建高效Python应用:并发与异步编程的实战秘籍,IO与CPU密集型任务一网打尽!
在Python编程的征途中,面对日益增长的性能需求,如何构建高效的应用成为了每位开发者必须面对的课题。并发与异步编程作为提升程序性能的两大法宝,在处理IO密集型与CPU密集型任务时展现出了巨大的潜力。今天,我们将深入探讨这些技术的最佳实践,助你打造高效Python应用。
170 0
|
7月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
350 0
|
9月前
|
数据采集 存储 监控
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
21个Python脚本自动执行日常任务(2)
|
8月前
|
存储 安全 数据可视化
用Python实现简单的任务自动化
本文介绍如何使用Python实现任务自动化,提高效率和准确性。通过三个实用案例展示:1. 使用`smtplib`和`schedule`库自动发送邮件提醒;2. 利用`shutil`和`os`库自动备份文件;3. 借助`requests`库自动下载网页内容。每个案例包含详细代码和解释,并附带注意事项。掌握这些技能有助于个人和企业优化流程、节约成本。
308 3
|
9月前
|
Python
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
Python中的函数是**一种命名的代码块,用于执行特定任务或计算
165 18
|
9月前
|
数据采集 分布式计算 大数据
构建高效的数据管道:使用Python进行ETL任务
在数据驱动的世界中,高效地处理和移动数据是至关重要的。本文将引导你通过一个实际的Python ETL(提取、转换、加载)项目,从概念到实现。我们将探索如何设计一个灵活且可扩展的数据管道,确保数据的准确性和完整性。无论你是数据工程师、分析师还是任何对数据处理感兴趣的人,这篇文章都将成为你工具箱中的宝贵资源。
|
10月前
|
运维 监控 网络安全
自动化运维的崛起:如何利用Python脚本简化日常任务
【10月更文挑战第43天】在数字化时代的浪潮中,运维工作已从繁琐的手工操作转变为高效的自动化流程。本文将引导您了解如何运用Python编写脚本,以实现日常运维任务的自动化,从而提升工作效率和准确性。我们将通过一个实际案例,展示如何使用Python来自动部署应用、监控服务器状态并生成报告。文章不仅适合运维新手入门,也能为有经验的运维工程师提供新的视角和灵感。

推荐镜像

更多