Flask嵌套启动子线程如何读取请求上下文?

简介: Flask嵌套启动子线程如何读取请求上下文?

如果你在Flask中启动过子线程,然后在子线程中读写过g对象或者尝试从request对象中读取url参数,那么,你肯定对下面这个报错不陌生:RuntimeError: Working outside of request context..

例如下面这段Flask代码:

import threading
from flask import Flask, request
app = Flask(__name__)
def inner_func():
    doc_id = request.args.get('doc_id', '')
    print(f'用户ID为:{doc_id}')
@app.route('/start_thread')
def start_thread():
    thread = threading.Thread(target=inner_func)
    thread.start()
    return {'success': True, 'msg': '获取用户ID成功!'}

请求/start_thread接口就会报错,如下图所示:

如果你在网上搜索flask thread RuntimeError: Working outside of request context. ,那么你可能会看到官方文档或者StackOverFlow上面提供了一个装饰器@copy_current_request_context。如下图所示:

照着它这样写,确实能解决问题,如下图所示:

但无论是官网还是StackOverFlow,它的例子都非常简单。但是我们知道,启动线程有很多种方法,例如:

# 方法一,启动简单线程
import threading
job = threading.Thread(target=函数名, args=(参数1, 参数2), kwargs={'参数3': xxx, '参数4': yyy})
job.start()
# 方法2,使用类定义线程
import threading
class Job(threading.Thread):
    def __init__(self, 参数):
        super().__init__()
    def run(self):
        print('子线程开始运行')
job = Job(参数)
job.start()
# 方法3,使用线程池
from multiprocessing.dummy import Pool
pool = Pool(5)  # 5个线程的线程池
pool.map(函数名, 参数列表)

网上的方法只能解决第一种写法的问题。如果想使用方法2和方法3启动子线程,代码应该怎么写呢?如果在子线程中又启动子线程,再用一次@copy_current_request_context还行吗?

相信我,你在网上搜索一下午,只有两种结果:一是找不到答案,二是找到的答案是晚于2023年1月14日的,因为是别人看了我这篇文章以后,再写的。

解答上面的问题前,还是说明一下我对于在后端启动子线程这个行为的观点。例如有些人喜欢在后端挂一个爬虫,请求接口以后,通过线程启动爬虫,爬虫开始爬数据。又或者,有些人在后端上面挂了一些复杂的程序代码,请求接口以后,后端启动子线程,在子线程中运行这些代码。

我一向是不建议在后端又启动子线程去做复杂操作的。无论你使用的是Flask还是Django还是FastAPI。正确的做法应该是使用消息队列,后端只是把触发任务的相关参数发送到消息队列中。下游真正的运行程序从消息队列读取到触发参数以后,开始运行。

但有时候,你可能综合考虑性价比,觉得再增加一个消息队列,成本太高;或者干脆是要赶工期,不得不先暂时使用多线程来解决问题,那么这篇文章将会极大帮助到你。

尽量不要在子线程中读取请求相关的参数

如果你的子线程不需要读写g对象,也不需要从请求中读取各种参数,那么你就可以关闭这篇文章了。因为你的子线程可以直接运行,不会遇到什么的问题,例如:

所以最好的解决方法,就是在启动子线程之前,提前先获取到子线程需要的每一个参数,然后把这些参数在启动子线程的时候作为函数参数传进去。如果你是从零开始写代码,那么一开始这样做,就可以帮你避免很多麻烦。

但如果你是修改已有的代码,并且嵌套太深,已经没有办法一层一层传入参数,或者代码量太大,不知道哪些地方悄悄调用了g对象或者读写了请求上下文,那么你可以继续往下看。

装饰闭包函数而不是一级函数

上面的简单多线程写法,有一个地方需要特别注意,被@copy_current_request_context装饰的子线程入口函数inner_func,必须是闭包函数,不能是一级函数。如下图所示:

如果不小心装饰了一级函数,就会报如下的错误:

线程池复制请求上下文

当我们使用multiprocessing.dummy来实现线程池时,代码如下:

from multiprocessing.dummy import Pool
from flask import Flask, request, copy_current_request_context, g
app = Flask(__name__)
@app.route('/start_thread', methods=['POST'])
def start_thread():
    @copy_current_request_context
    def crawl(doc_id):
        url_template = request.json.get('url_template', '')
        url = url_template.format(doc_id=doc_id)
        print(f'开始爬取:{url}')
    doc_id_list = [123, 456, 789, 111, 222, 333, 444]
    pool = Pool(3)
    pool.map(crawl, doc_id_list)
    return {'success': True, 'msg': '爬取文章成功!'}

运行效果如下图所示:

写法上整体跟threading.Thread启动简单线程的方法差不多。

用类定义线程时复制请求上下文

当我们额外定义了一个线程类时,需要把被装饰的闭包函数传入到子线程中,然后在子线程的run()方法中运行:

import threading
from flask import Flask, request, copy_current_request_context
app = Flask(__name__)
class Job(threading.Thread):
    def __init__(self, func):
        super().__init__()
        self.func = func
    def run(self):
        self.func()
@app.route('/start_thread', methods=['POST'])
def start_thread():
    @copy_current_request_context
    def runner():
        doc_id = request.json.get('doc_id', '')
        print(f'docId的值是:{doc_id}')
    job = Job(runner)
    job.start()
    return {'success': True, 'msg': '读取文章成功!'}

运行效果如下图所示:

嵌套子线程复制请求上下文

有时候,我们先创建了一个子线程,然后在子线程中,又需要创建孙线程。并且在孙线程中读取请求上下文。例如下面的代码:

import threading
from multiprocessing.dummy import Pool
from flask import Flask, request, copy_current_request_context
app = Flask(__name__)
def deep_func_runner(doc_id_list):
    @copy_current_request_context
    def deep_func(doc_id):
        category = request.args.get('category', '')
        url = f'https://www.kingname.info/{category}/{doc_id}'
        print(f'开始爬取:{url}')
    pool = Pool(3)
    pool.map(deep_func, doc_id_list)
@app.route('/start_thread', methods=['POST'])
def start_thread():
    @copy_current_request_context
    def runner():
        doc_id_list = [111, 222, 333, 444, 555, 666, 777, 888, 999]
        deep_func_runner(doc_id_list)
    job = threading.Thread(target=runner)
    job.start()
    return {'success': True, 'msg': '读取文章成功!'}

此时使用@copy_current_request_context就会报您一个错误:ValueError: <Token var=<ContextVar name='flask.request_ctx' at 0x103ef69a0> at 0x104446700> was created in a different Context。如下图所示:

这个时候,我们就需要额外再创建一个装饰器:

def copy_current_app_context(f):
    from flask.globals import _app_ctx_stack
    appctx = _app_ctx_stack.top
    def _(*args, **kwargs):
        with appctx:
            return f(*args, **kwargs)
    return _

@copy_current_app_context这个装饰器需要放到孙线程里面@copy_current_request_context的上面。完整的代码为:

import threading
from multiprocessing.dummy import Pool
from flask import Flask, request, copy_current_request_context
app = Flask(__name__)
def copy_current_app_context(f):
    from flask.globals import _app_ctx_stack
    appctx = _app_ctx_stack.top
    def _(*args, **kwargs):
        with appctx:
            return f(*args, **kwargs)
    return _
def deep_func_runner(doc_id_list):
    @copy_current_app_context
    @copy_current_request_context
    def deep_func(doc_id):
        category = request.args.get('category', '')
        url = f'https://www.kingname.info/{category}/{doc_id}'
        print(f'开始爬取:{url}')
    pool = Pool(3)
    pool.map(deep_func, doc_id_list)
@app.route('/start_thread', methods=['POST'])
def start_thread():
    @copy_current_request_context
    def runner():
        doc_id_list = [111, 222, 333, 444, 555, 666, 777, 888, 999]
        deep_func_runner(doc_id_list)
    job = threading.Thread(target=runner)
    job.start()
    return {'success': True, 'msg': '读取文章成功!'}

运行效果如下图所示,孙线程也正常启动了:

总结

  1. 非必要不在后端中创建子线程
  2. 创建子线程时,如果能把参数从外面传入,就不要让子线程自己去Flask的上下文读取
  3. @copy_current_request_context需要装饰闭包函数,不能装饰一级函数
  4. 嵌套子线程需要同时使用@copy_current_app_context@copy_current_request_context两个装饰器来装饰孙线程的闭包函数
目录
相关文章
|
8月前
|
网络协议 Linux C++
Linux C/C++ 开发(学习笔记十一 ):TCP服务器(并发网络网络编程 一请求一线程)
Linux C/C++ 开发(学习笔记十一 ):TCP服务器(并发网络网络编程 一请求一线程)
155 0
|
29天前
|
安全 Java API
java如何请求接口然后终止某个线程
通过本文的介绍,您应该能够理解如何在Java中请求接口并根据返回结果终止某个线程。合理使用标志位或 `interrupt`方法可以确保线程的安全终止,而处理好网络请求中的各种异常情况,可以提高程序的稳定性和可靠性。
48 6
|
8月前
|
设计模式 监控 安全
多线程设计模式【多线程上下文设计模式、Guarded Suspension 设计模式、 Latch 设计模式】(二)-全面详解(学习总结---从入门到深化)
多线程设计模式【多线程上下文设计模式、Guarded Suspension 设计模式、 Latch 设计模式】(二)-全面详解(学习总结---从入门到深化)
126 0
|
3月前
|
JSON 中间件 数据格式
五、Flask的请求和响应方法与常用技巧
五、Flask的请求和响应方法与常用技巧
117 0
|
3月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
8月前
|
前端开发 数据库 Python
使用 Python 的 Web 框架(如 Django 或 Flask)来建立后端接口,用于处理用户的请求,从数据库中查找答案并返回给前端界面
【1月更文挑战第13天】使用 Python 的 Web 框架(如 Django 或 Flask)来建立后端接口,用于处理用户的请求,从数据库中查找答案并返回给前端界面
265 7
|
6月前
|
云计算
云计算问题之线程请求的栈深度大于虚拟机所允许的深度如何解决
云计算问题之线程请求的栈深度大于虚拟机所允许的深度如何解决
35 1
|
6月前
|
存储 前端开发 Java
(二)JVM成神路之剖析Java类加载子系统、双亲委派机制及线程上下文类加载器
上篇《初识Java虚拟机》文章中曾提及到:我们所编写的Java代码经过编译之后,会生成对应的class字节码文件,而在程序启动时会通过类加载子系统将这些字节码文件先装载进内存,然后再交由执行引擎执行。本文中则会对Java虚拟机的类加载机制以及执行引擎进行全面分析。
119 0
|
7月前
|
JSON API 数据格式
如何使用Flask request对象处理请求
在 Flask 中,request对象是处理 HTTP 请求的重要工具之一。它提供了许多属性和方法,可以帮助我们获取请求的相关信息和数据。本文将向你介绍request对象的常用方法以及如何在 Flask 应用程序中使用它。
153 3
|
6月前
|
存储 安全 Java
Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
82 0