::: hljs-center
使用 Flask 和 Celery 构建异步任务处理
:::
Flask 是一个轻量级的 Python Web 框架,而 Celery 是一个强大的分布式任务队列系统。结合这两者,你可以实现高效的异步任务处理,适用于需要执行长时间运行任务的 Web 应用程序。
为什么使用 Celery?
在实际应用中,某些操作可能会耗费大量时间,例如发送电子邮件、大数据处理、图像处理等。如果将这些操作放在主线程中,会导致用户体验不佳,因为用户在等待响应时无法进行其他操作。Celery 允许你将这些任务异步化,从而提高用户体验和系统效率。
示例:Flask 与 Celery 的集成
以下是一个简单的示例,展示如何将 Flask 和 Celery 结合使用,以异步发送电子邮件。
环境准备
首先,确保你安装了 Flask 和 Celery:
pip install Flask Celery redis
我们将使用 Redis 作为 Celery 的消息代理(broker)。
创建 Flask 应用
创建一个名为 app.py 的文件,内容如下:
from flask import Flask, request, jsonify
from celery import Celery
import time
app = Flask(__name__)
# 配置 Celery
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' # Redis 作为消息代理
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_email(email_address):
"""模拟发送电子邮件"""
print(f"开始发送电子邮件到 {email_address}")
time.sleep(10) # 模拟延迟
print(f"电子邮件已发送到 {email_address}")
@app.route('/send-email', methods=['POST'])
def send_email_route():
email_address = request.json.get('email')
if not email_address:
return jsonify({
'error': '缺少电子邮件地址'}), 400
# 调用异步任务
send_email.delay(email_address)
return jsonify({
'message': '电子邮件发送任务已提交!'}), 202
if __name__ == '__main__':
app.run(debug=True)
启动 Redis
确保你的 Redis 服务正在运行,可以通过以下命令启动:
redis-server
启动 Celery Worker
在终端中,打开一个新的窗口并导航到包含 app.py 文件的目录,然后运行以下命令以启动 Celery worker:
celery -A app.celery worker --loglevel=info
启动 Flask 应用
在另一个终端窗口中,运行 Flask 应用:
python app.py
测试 API
可以使用 Postman 或 curl 来测试我们的 send-email 路由。例如,使用 curl 发送请求:
curl -X POST http://127.0.0.1:5000/send-email -H "Content-Type: application/json" -d '{"email": "test@example.com"}'
你应该会收到如下响应:
{
"message": "电子邮件发送任务已提交!"
}
同时,在 Celery worker 的终端中,你将看到发送电子邮件的日志输出:
[2023-10-01 12:00:00,000: INFO/MainProcess] Starting sending email to test@example.com
[2023-10-01 12:00:10,000: INFO/MainProcess] Email sent to test@example.com
总结
通过这个简单的示例,我们展示了如何使用 Flask 和 Celery 构建一个能够处理异步任务的 Web 应用。结合 Flask 的 web 功能和 Celery 的异步处理能力,你可以轻松地构建出响应迅速且功能强大的应用程序。这种架构特别适合处理长时间运行的任务,提高了用户的体验和系统的可扩展性。