介绍
随着Web应用程序的发展和使用的增加,用例也变得多样化。我们现在正在建设和使用网站来执行比以往任何时候都更复杂的任务。其中一些任务可以进行处理,并将反馈立即转发给用户,而其他任务则需要稍后进行进一步处理和结果转发。越来越多地采用Internet访问和支持Internet的设备导致最终用户流量增加。
在本文中,我们将探讨Celery在Flask应用程序中安排后台任务的使用,以减轻资源密集型任务的负担并确定对最终用户的响应的优先级。
什么是任务队列?
任务队列是一种分配小的工作单元或任务的机制,可以在不干扰大多数基于Web的应用程序的请求-响应周期的情况下执行这些任务。
任务队列有助于委派工作,否则将在等待响应时降低应用程序的速度。它们还可以用于在主机或进程与用户交互时处理资源密集型任务。
示范
我们将构建一个Flask应用程序,该应用程序允许用户设置提醒,该提醒将在设定的时间传递到他们的电子邮件中。
我们还将提供自定义消息或提醒被调用并将消息发送给用户之前的时间的功能。
设定
与其他项目一样,我们的工作将在虚拟环境中进行 :
$ pipenv install --three$ pipenv shell
对于此项目,我们将需要安装Flask和Celery软件包以开始:
$ pipenv install flask celery
我们的Flask应用程序文件结构如下所示:
.├── Pipfile #管理我们的环境├── Pipfile.lock├── README.md├── __init__.py├── app.py # Flask应用程序的主要实现├── config.py # 托管配置├── requirements.txt # 储存我们的要求└── templates└── index.html # 登陆页面 1 directory, 8 files
让我们从创建Flask应用程序开始,该应用程序将呈现一个表单,该表单允许用户输入将来发送的消息的详细信息。
我们将以下内容添加到我们的app.py
文件中:
from flask import Flask, flash, render_template, request, redirect, url_for app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY'] @app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html') elif request.method == 'POST':email = request.form['email']first_name = request.form['first_name']last_name = request.form['last_name']message = request.form['message']duration = request.form['duration']duration_unit = request.form['duration_unit'] flash(“Message scheduled”)return redirect(url_for('index')) if __name__ == '__main__':app.run(debug=True)
这是一个非常简单的应用程序,只需一条路由即可处理GET
和POST
请求表单。提交详细信息后,我们可以将数据交给计划工作的功能。
为了整理主应用程序文件,我们将配置变量放在单独的config.py
文件中,然后从文件中加载配置:
app.config.from_object("config")
我们的config.py
文件将与该app.py
文件位于同一文件夹中,并包含一些基本配置:
SECRET_KEY = 'very_very_secure_and_secret'# 更多配置
现在,让我们将目标网页实现为index.html
:
{% for message in get_flashed_messages() %}<p style="color: red;">{{ message }}</p>{% endfor %} <form method="POST">First Name: <input id="first_name" name="first_name" type="text">Last Name: <input id="last_name" name="last_name" type="text">Email: <input id="email" name="email" type="email">Message: <textarea id="textarea" name="message"></textarea>Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text"> <select name="duration_unit"><option value="" disabled selected>Choose the duration</option><option value="1">Minutes</option><option value="2">Hours</option><option value="3">Days</option></select> <button type="submit" name="action">Submit </button></form>
现在,我们可以启动我们的应用程序:
使用邮件发送电子邮件
为了从Flask应用程序发送电子邮件,我们将使用Flask-Mail库,该库如下所示添加到我们的项目中:
$ pipenv install flask-mail
有了Flask应用程序和表单,我们现在可以将Flask-Mail集成到我们的app.py
:
from flask_mail import Mail, Message app = Flask(__name__)app.config.from_object("config")app.secret_key = app.config['SECRET_KEY'] # 设置Flask-Mail集成 mail = Mail(app) def send_mail(data):""" Function to send emails."""with app.app_context():msg = Message("Ping!",sender="admin.ping",recipients=[data['email']])msg.body = data['message']mail.send(msg)
功能 send_main(data)
将接收要发送的消息和电子邮件的收件人,然后在经过指定的时间后将其调用以将电子邮件发送给用户。config.py
为了使Flask-Mail正常运行,我们还需要向我们添加以下变量:
# Flask-邮件MAIL_SERVER = 'smtp.googlemail.com'MAIL_PORT = 587MAIL_USE_TLS = TrueMAIL_USERNAME = 'mail-username'MAIL_PASSWORD = 'mail-password'
整合
在我们的Flask应用程序准备就绪并配备了电子邮件发送功能之后,我们现在可以集成Celery,以便计划在以后发送电子邮件。我们app.py
将再次被修改:
# 现有导入保持不变 from celery import Celery # Flask应用程序和flask-mail配置被截断 # 设置客户端 client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])client.conf.update(app.config) # 将此装饰器添加到我们的send_mail函数中 @client.taskdef send_mail(data):#函数保持不变 @app.route('/', methods=['GET', 'POST'])def index():if request.method == 'GET':return render_template('index.html') elif request.method == 'POST':data = {}data['email'] = request.form['email']data['first_name'] = request.form['first_name']data['last_name'] = request.form['last_name']data['message'] = request.form['message']duration = int(request.form['duration'])duration_unit = request.form['duration_unit'] if duration_unit == 'minutes':duration *= 60elif duration_unit == 'hours':duration *= 3600elif duration_unit == 'days':duration *= 86400 send_mail.apply_async(args=[data], countdown=duration)flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}") return redirect(url_for('index'))
celery
通过附加消息传递代理的URL,我们导入并使用它在Flask应用程序中初始化Celery客户端。在我们的例子中,我们将使用Redis作为代理,因此我们将以下内容添加到我们的config.py
:
CELERY_BROKER_URL = 'redis://localhost:6379/0'CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
为了使我们的send_mail()
功能作为后台任务执行,我们将添加@client.task
装饰器,以便我们的Celery客户端会意识到这一点。
设置Celery客户端后,将修改还处理表单输入的主要功能。
首先,我们将send_mail()
函数的输入数据打包在字典中。然后,我们使用函数通过Celery Task Calling API调用邮件功能,该函数apply_async
接受函数所需的参数。
设置了一个可选countdown
参数,定义了运行代码和执行任务之间的延迟。
汇集
为了运行我们的项目,我们将需要两个终端,一个终端启动我们的Flask应用程序,另一个终端启动Celery worker,后者将在后台发送消息。
在第一个终端中启动Flask应用程序:
$ python app.py
在第二个终端中,启动虚拟环境,然后启动Celery worker:
# 启动virtualenv $ pipenv shell$ celery worker -A app.client --loglevel=info
如果一切顺利,我们将在运行Celery客户端的终端中获得以下反馈:
现在让我们导航到 http://localhost:5000
并填写详细信息,以计划在提交2分钟后到达的电子邮件。
在表格上方,将显示一条消息,指示将接收电子邮件的地址以及发送电子邮件的持续时间。在我们的Celery终端中,我们还将能够看到一个日志条目,表明我们的电子邮件已被调度:
[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78] ETA:[2019-10-23 13:29:25.170622+00:00]
ETA
条目的部分显示何时send_email()
调用我们的函数,以及何时发送电子邮件。
因此,让我们为后台任务实现一个监视解决方案,以便我们可以查看任务,并注意出现问题以及未按计划执行任务的情况。
监控我们的群集
安装Flower
非常简单:
$ pipenv install flower
之前,我们在app.py
文件中指定了Celery客户的详细信息。我们需要将该客户传递给Flower以对其进行监控。
为此,我们需要打开第三个终端窗口,进入虚拟环境,然后启动监视工具:
$ pipenv shell$ flower -A app.client --port=5555
启动Flower时,我们通过将其传递给application(-A
)参数来指定Celery客户端,并通过该参数来指定要使用的端口--port
。
有了我们的监控功能后,让我们安排在仪表板上发送另一封电子邮件,然后导航到http://localhost:5555
,在以下位置我们会对此表示欢迎:
在此页面上,我们可以看到Celery集群中的工作人员列表,该列表当前仅由我们的机器组成。
要查看我们刚刚计划的电子邮件,请单击仪表板左上方的“ 任务”按钮,这将带我们到可以查看已计划的任务的页面:
在本部分中,我们可以看到我们已计划了两封电子邮件,并且已在计划的时间成功发送了一封电子邮件。出于测试目的,计划分别在1分钟和5分钟后发送电子邮件。
结论
我们已经成功建立了Celery集群并将其集成到我们的Flask应用程序中,该应用程序允许用户计划在将来的某个时间后发送电子邮件。
最受欢迎的见解