阿里低调开源了一款Python版的 Arthas工具(PyFlightProfiler - 古法技能篇)

简介: PyFlightProfiler 是阿里巴巴开源的 Python 线上诊断利器,类比 Java 领域的 Arthas。它支持无侵入式方法观测、调用链追踪、GIL 监控、热重载修复与线程栈分析,底层基于 ptrace(Linux)或 sys.remote_exec(CPython 3.14+),让 Python 服务问题排查如“飞行记录仪”般实时精准。

b56a7962c04f4d0ea5ea411ac2cad0ad.png



先说Arthas:Java程序员的"瑞士军刀"



在Java开发者的工具箱里,Arthas(阿尔萨斯)绝对是一个如雷贯耳的名字。这款由阿里巴巴开源的Java诊断利器,凭借其无侵入、无需重启、实时监控等特性,拯救了无数个被线上Bug折磨的深夜。

Arthas的使用体验极为流畅:你只需要知道目标Java进程的PID,执行 java -jar arthas-boot.jar <pid>,就能进入一个交互式的诊断控制台。在这里,你可以:

  • 用 watch 命令实时观察某个方法的入参、返回值和执行耗时;
  • 用 trace 命令追踪方法内部的调用链路,精确定位每一步的耗时;
  • 用 stack 命令查看某个方法被谁调用,还原完整的调用栈;
  • 用 jad 命令反编译线上的字节码,确认部署的代码是否符合预期;
  • 用 redefine 命令热替换类的实现,在不重启的情况下修复Bug。

Arthas之所以能做到这一切,背后依赖的是JVM提供的两大利器:Java Attach API(用于动态挂载到运行中的JVM进程)和 Java Instrumentation API(用于在运行时对字节码进行增强和替换)。这套机制是JVM规范的一部分,成熟、稳定,且有官方背书。

然而,随着AI时代的到来,Python在生产环境中的比重急剧上升。无论是大模型推理服务、RAG应用,还是各种AI Agent,Python的身影无处不在。但Python开发者在面对线上问题时,往往只能依赖传统的日志打印,或者在本地艰难地复现问题。这种"加日志-重新部署-等待触发-查日志"的循环,不仅效率低下,在生产环境中更是一种奢侈。

就在最近,阿里巴巴低调地开源了一款名为 PyFlightProfiler 的工具。它就像是Python界的Arthas,为线上运行的Python应用程序提供了一种便捷、无侵入的排查手段。


bc608962747546d597fb75e276c32f37.png


PyFlightProfiler:Python的"飞行记录仪"



PyFlightProfiler 的名字颇有意境——"飞行记录仪"(Flight Profiler),暗示着它的使命:在程序"飞行"的过程中,默默记录下一切关键数据,以便在出现问题时能够还原现场。

PyFlightProfiler is a toolbox that lets you inspect the status of a running Python process. It allows developers to observe method execution, trace call paths, monitor GIL status, and more, without restarting the application.

其核心功能矩阵如下表所示:


命令

功能描述

对应Arthas命令

watch

观察方法的入参、返回值和执行耗时

watch

trace

追踪方法内部的调用链路

trace

stack

打印所有线程的Python调用栈

thread

tt

跨时间段记录方法调用并回放

tt

getglobal

查看模块全局变量和类静态变量

getstatic

vmtool

获取和操作运行中的类实例

vmtool

perf

生成火焰图进行性能热点分析

profiler

console

在目标进程中执行自定义Python脚本

ognl

mem

内存分析,包括内存差异和对象摘要

heapdump

gilstat

监控GIL的获取和释放耗时

(Python特有)

reload

热重载方法实现

redefine


安装方式极为简洁:


pip3 install flight_profiler


挂载到目标进程:


flight_profiler <pid>



使用场景:进程挂载解析



场景描述


线上服务突然CPU飙升,某个接口响应极慢。你不能重启服务,因为重启会破坏现场,甚至导致服务不可用。你需要立即"潜入"这个进程,一探究竟。

Arthas的做法


在Java中,Arthas利用了JVM提供的 Attach API。简单来说,JVM在启动时会创建一个特殊的Unix Domain Socket,Arthas通过向这个Socket发送指令,触发JVM加载一个外部的Java Agent(.jar 文件)。这个过程是JVM规范的一部分,安全且有保障。

PyFlightProfiler的原理图解

786b39cf3f7f41fcab00233f408dd256.png


Python并没有原生提供这样完善的机制(直到CPython 3.14才引入了 sys.remote_exec,但这是后话)。对于CPython 3.13及以下版本,PyFlightProfiler在Linux环境下采用了一种非常硬核的方式——ptrace 系统调用。

ptrace 是Linux内核提供的一个系统调用,它允许一个进程(Tracer)观察和控制另一个进程(Tracee)的执行,并检查和改变其内存和寄存器。GDB、strace 等经典调试工具都是基于 ptrace 实现的。它是操作系统层面最底层的进程控制机制之一。

让我们深入 csrc/attach/ 目录,看看是如何运作的。


第一步:暂停目标进程


在 ProcessTracer.cpp 中,通过 PTRACE_ATTACH 暂停目标进程:


// ProcessTracer.cpp

bool ProcessTracer::attach() {
    // 通过 ptrace 附加到目标进程,目标进程会收到 SIGSTOP 信号并暂停
    int result = ptrace(PTRACE_ATTACH, process_id_, NULL, NULL);
    CHECK_PTRACE_RESULT(result, PTRACE_ATTACH);
    int wait_status;
    // 等待目标进程真正停下来
    if (waitpid(process_id_, &wait_status, WUNTRACED) != process_id_) {
        return false;
    }
    is_attached_ = true;
    return true;
}


第二步:保存现场,准备注入


在 AttachAgent.cpp 中,保存目标进程的寄存器状态,并计算出 malloc、dlopen、free 等libc函数在目标进程中的地址:


// AttachAgent.cpp (节选)

// 保存原始寄存器状态,以便事后恢复
process_tracer_.getRegisters(&original_registers);
// 计算 malloc/dlopen/free 在目标进程中的地址
// 原理:两个进程加载了相同的 libc,函数偏移量相同
long current_libc_base = ProcessUtils::getLibcBaseAddress(getpid());
long target_libc_base  = ProcessUtils::getLibcBaseAddress(target_process_id_);
long target_malloc_addr = target_libc_base + (malloc_addr - current_libc_base);
long target_dlopen_addr = target_libc_base + (dlopen_addr - current_libc_base);
第三步:注入Shellcode,调用 dlopen
这是整个过程最精妙的部分。PyFlightProfiler 将一段汇编代码写入目标进程的内存,这段代码的作用是调用 dlopen 加载 Agent 动态库:
// AttachAgent.cpp 中的汇编 Shellcode(节选)
void load_shared_library(long malloc_addr, long free_addr,
                         long dlopen_addr, long lib_path_len) {
    asm(
        "and $0xfffffffffffffff0, %%rsp \n"  // 16字节对齐
        "callq *%%r8 \n"                     // 调用 malloc,分配内存存放库路径
        "int $3 \n"                          // 触发断点,让 Tracer 写入库路径字符串
        "pop %%r8 \n"                        // 恢复 dlopen 地址
        "mov %%rax, %%rdi \n"               // malloc 返回的内存地址作为 dlopen 的第一个参数
        "mov $0x1, %%rsi \n"               // RTLD_LAZY 标志
        "callq *%%r8 \n"                     // 调用 dlopen,加载 Agent .so 文件
        "int $3 \n"                          // 触发断点,通知 Tracer 加载完成
        // ... 调用 free 释放内存 ...
        ::: "memory", "rax", "rcx", "rdx", "rdi", "rsi", "r8", "r9"
    );
}


第三步:注入Shellcode,调用 dlopen


这里设计巧妙的地方是 PyFlightProfiler 将一段汇编代码写入目标进程的内存,这段代码的作用是调用 dlopen 加载 Agent 动态库:


// AttachAgent.cpp 中的汇编 Shellcode(节选)

void load_shared_library(long malloc_addr, long free_addr,
                         long dlopen_addr, long lib_path_len) {
    asm(
        "and $0xfffffffffffffff0, %%rsp \n"  // 16字节对齐
        "callq *%%r8 \n"                     // 调用 malloc,分配内存存放库路径
        "int $3 \n"                          // 触发断点,让 Tracer 写入库路径字符串
        "pop %%r8 \n"                        // 恢复 dlopen 地址
        "mov %%rax, %%rdi \n"               // malloc 返回的内存地址作为 dlopen 的第一个参数
        "mov $0x1, %%rsi \n"               // RTLD_LAZY 标志
        "callq *%%r8 \n"                     // 调用 dlopen,加载 Agent .so 文件
        "int $3 \n"                          // 触发断点,通知 Tracer 加载完成
        // ... 调用 free 释放内存 ...
        ::: "memory", "rax", "rcx", "rdx", "rdi", "rsi", "r8", "r9"
    );
}


第四步:恢复现场


Agent库被加载后,profiler_attach.cpp 中的 __attribute__((constructor)) 修饰的函数会自动执行,在目标进程中启动一个新的Python线程,运行 profiler_agent.py,建立起与客户端通信的Socket服务器。之后,Tracer 恢复目标进程的寄存器并 PTRACE_DETACH,目标进程恢复正常运行,毫无察觉地被"植入"了一个后台诊断服务。


// profiler_attach.cpp

// 这个函数在 .so 被 dlopen 加载时自动执行
__attribute__((constructor)) void profiler_attach_init() {
    // ... 读取参数 ...
    profiler_attach(py_code, port, base_addr);
}
static int start_thread() {
    // 获取 GIL 锁
    PyGILState_STATE old_gil_state = PyGILState_Ensure();
    boot->interp = PyThreadState_Get()->interp;
    // 启动一个新的 Python 线程来运行 Agent 代码
    ident = PyThread_start_new_thread(boot_entry, (void *)boot);
    // 释放 GIL 锁
    PyGILState_Release(old_gil_state);
    return 0;
}


对于CPython 3.14+,PyFlightProfiler 则直接使用了官方提供的 sys.remote_exec 接口,这是Python官方在 PEP-768中引入的安全远程代码执行机制,整个过程更加优雅和安全。



使用场景:方法观测——替换字节码



场景描述


某个推理服务的某个接口P99延迟异常高,你怀疑是某个特定的预处理函数在某些输入下会触发慢路径。你想实时观察这个函数的入参、返回值和执行耗时,但又不能修改代码重新部署。

使用方式


# 观察 __main__ 模块中 preprocess 函数的入参和返回值
watch __main__ preprocess --expr "args, return_obj" -f "cost > 100"

# 追踪 model_server 模块 InferenceService 类的 predict 方法内部调用链
trace model_server InferenceService predict


原理解析


f9e2689b957949a981f19300833aa103.png


在Java中,Arthas通过ASM框架在字节码层面插入探针代码。PyFlightProfiler 同样采用

了字节码替换技术,但它是纯Python实现的,其核心在 flight_profiler/common/bytecode_transformer.py 和 aop_decorator.py 中。


第一步:找到目标函数


aop_decorator.py 通过 importlib.import_module 和 inspect.getmembers 找到目标函数对象:


# aop_decorator.py

def find_module_function(module, method_name):
    for name, m in inspect.getmembers(
        module, lambda x: inspect.isfunction(x) or inspect.ismethod(x)
    ):
        if name == method_name:
            return m, False
    return None, False


第二步:生成Wrapper函数


watch_agent.py 中的 wrapper_generator 生成一个包装函数,它会在目标函数执行前后记录时间、参数和返回值:


# watch_agent.py (节选)

def wrapper_generator(watch_setting: WatchSetting):
    def watch_func(func):
        @functools.wraps(func)
        def wrapped(*args, **kwargs):
            if watch_setting.enter():
                s = time.time()
                try:
                    return_obj = func(*args, **kwargs)  # 调用原始函数
                    e = time.time()
                    # 记录并输出观测结果
                    watch_setting.dump_result(
                        int(s * 1000), target_obj,
                        (e - s) * 1000, return_obj, *args, **kwargs
                    )
                    return return_obj
                except Exception as ex:
                    # 记录异常信息
                    watch_setting.dump_error(...)
                    raise ex
        return wrapped
    return watch_func


第三步:字节码的"偷天换日"


这是整个机制最精妙的地方。PyFlightProfiler 并不是简单地替换函数引用(那样只对新的调用者生效),而是直接替换函数对象的 __code__ 属性。

在Python中,每个函数对象都有一个 __code__ 属性,它是一个 CodeType 对象,包含了函数的字节码指令、常量、变量名等所有编译后的信息。通过替换 __code__,可以在不改变函数引用的情况下,改变函数的行为。

bytecode_transformer.py 中的 _execute_bytecode_transform_intern 函数完成了这个精妙的替换:


# bytecode_transformer.py (核心逻辑简化版)

def _execute_bytecode_transform_intern(fn, wrapper_generator, wrapper_arg, ...):
    # 1. 生成 Wrapper 函数
    wrap_function = wrapper_generator(wrapper_arg)(fn)
    wrap_code = wrap_function.__code__
    # 2. Wrapper 函数通过闭包(LOAD_DEREF)引用了 watch_setting 和原始函数
    #    但我们不能直接用闭包,因为目标函数的自由变量数量是固定的
    #    所以要把 LOAD_DEREF 指令替换为 LOAD_CONST 指令
    #    将 watch_setting 和原始函数副本作为常量嵌入到新的字节码中
    alternate_code = replace_deref_with_const(wrap_code.co_code, ...)
    # 3. 创建原始函数的副本,避免无限递归
    copy_fn = types.FunctionType(fn.__code__, fn.__globals__, ...)
    # 4. 构造新的 CodeType 对象,将 watch_setting 和函数副本追加到常量池
    new_codeobj = types.CodeType(
        wrap_code.co_argcount, ...,
        alternate_code,                        # 替换后的字节码
        wrap_code.co_consts + (wrapper_arg, copy_fn),  # 追加常量
        ...
    )
    return new_codeobj
def transform_normal_method_by_aop_wrapper(fn, wrapper_generator, wrapper_arg, ...):
    new_codeobj = _execute_bytecode_transform_intern(fn, ...)
    # 直接替换函数的 __code__ 属性!
    fn.__code__ = new_codeobj
    


这种方式的妙处在于:由于修改的是函数对象本身的字节码,而不是某个模块中的引用,所以无论这个函数已经被多少个地方导入和缓存,这次修改都会全局生效。这正是它能够实现真正无侵入观测的关键。



使用场景:GIL状态监控



场景描述


你的多线程Python服务在高并发下性能急剧下降,top 命令显示CPU利用率很低,但线程数量很多。你怀疑是GIL(全局解释器锁)导致的线程争抢,但苦于没有量化数据。


使用方式


# 监控 GIL 状态,设置警告阈值(单位:毫秒)
gilstat --take-cost-warning 10 --hold-cost-warning 50 --stat-interval 5


原理解析

71b4e95957474f97915487e76327f206.png


GIL是CPython的底层C语言机制,纯Python代码无法直接感知它的状态。PyFlightProfiler 借助了 Frida 的底层引擎 frida-gum,在C层面对CPython的内核函数实施了拦截。

在 csrc/py_gil_intercept.cpp 中,它使用 gum_interceptor_attach 拦截了CPython内核中的两个关键函数:take_gil(获取GIL)和 drop_gil(释放GIL):


// py_gil_intercept.cpp (节选)

static void python_gil_listener_on_enter(GumInvocationListener *listener,
                                          GumInvocationContext *ic) {
    PythonGilHookId hook_id = (PythonGilHookId)(gsize)GUM_IC_GET_FUNC_DATA(ic, gpointer);
    pthread_t thread_id = pthread_self();
    switch (hook_id) {
    case PYTHON_GIL_HOOK_TAKE_GIL:
        gilStat->on_take_gil_enter(thread_id);  // 记录开始等待 GIL 的时间
        break;
    case PYTHON_GIL_HOOK_DROP_GIL:
        gilStat->on_drop_gil_enter(thread_id);  // 记录开始持有 GIL 的时间
        break;
    }
}
// 初始化拦截器
gum_interceptor_begin_transaction(interceptor);
gum_interceptor_attach(interceptor,
    GSIZE_TO_POINTER(take_gil_address),  // take_gil 函数地址
    listener,
    GSIZE_TO_POINTER(PYTHON_GIL_HOOK_TAKE_GIL));
gum_interceptor_attach(interceptor,
    GSIZE_TO_POINTER(drop_gil_address),  // drop_gil 函数地址
    listener,
    GSIZE_TO_POINTER(PYTHON_GIL_HOOK_DROP_GIL));
gum_interceptor_end_transaction(interceptor);


通过在 take_gil 的 enter/leave 回调中记录时间差,可以精确计算出每个线程等待GIL的时间(即GIL争抢开销);通过在 drop_gil 的 enter/leave 回调中记录时间差,可以计算出每个线程持有GIL的时间。这些数据对于诊断GIL相关的性能问题极为宝贵。



使用场景:热重载——不停机修复Bug



场景描述


线上发现了一个逻辑Bug,修复方案已经确定,但重新部署需要走完整的发布流程,至少需要半小时。你能不能先临时修复一下,让服务恢复正常?


使用方式


# 热重载 model_server 模块 InferenceService 类的 predict 方法
reload model_server InferenceService predict


原理解析

d651522660574468bf8e915aac45fa6c.png


reload_agent.py 中的 ReloadAgent.reload_function 实现了这一功能。其核心思路是:从磁盘上读取最新的源文件,解析出目标函数的源码,重新编译,然后替换运行中函数的 __code__。


# reload_agent.py (核心逻辑简化版)

class ReloadAgent:
    @staticmethod
    def reload_function(module_name, class_name, func_name, verbose):
        # 1. 找到目标函数对象
        method_target, _, module = find_method_by_mod_cls(
            module_name, class_name, func_name
        )
        # 2. 获取函数所在的源文件路径
        method_file_path = inspect.getfile(module)
        # 3. 用 AST 解析源文件,精确提取目标函数的源码
        class_source, method_source, has_decorators = \
            ASTMethodLocator.locate_cls_method_in_file(
                method_file_path, func_name, class_name
            )
        # 4. 将提取的源码包装成类,然后编译执行
        wrapped_source = f"class {class_name}:\n{method_source}"
        local_ns = {}
        exec(compile(wrapped_source, method_file_path, "exec"), {}, local_ns)
        new_cls = local_ns[class_name]
        new_method = getattr(new_cls, func_name)
        # 5. 将新函数的 __code__ 替换到原函数上
        method_target.__code__ = new_method.__code__
        


这个功能的前提是:你已经修改了磁盘上的源文件(但还没有重新部署)。reload 命令会读取最新的文件内容,重新编译,并将新的字节码注入到正在运行的函数中,实现真正的"热修复"。



使用场景:线程栈分析——快速定位阻塞



场景描述


服务无响应,但进程还在。你怀疑某个线程陷入了死锁或者长时间阻塞。你需要快速查看所有线程当前的调用栈。

使用方式


# 打印所有线程的 Python 调用栈
stack

# 打印所有异步协程的调用栈
stack async


原理解析


stack 命令的实现在 server_plugin_stack.py 中,它调用了CPython内置的 _Py_DumpTracebackThreads 函数,将所有线程的调用栈写入一个临时文件,然后读取并格式化输出。


# server_plugin_stack.py (节选)

async def do_action(self, param):
    # 通过符号解析找到 CPython 内置函数的地址
    addr = resolve_symbol_address("_Py_DumpTracebackThreads", os.getpid())
    # 调用 C 扩展,将所有线程栈 dump 到临时文件
    dump_all_threads_stack(tmp_fd, int(addr))
    with open(tmp_file_path, "r") as f:
        contents = f.readlines()
    # 将线程ID与线程名称关联,增强可读性
    await self.out_q.output_msg(
        Message(True, self.add_thread_name(contents))
    )
    


对于异步协程,它则通过访问 asyncio.tasks._all_tasks(Python 3.7-3.11)或 asyncio.tasks._scheduled_tasks(Python 3.12+)这些内部属性,遍历所有运行中的异步任务,并提取其调用栈帧。



结语



下面这张图是 PyFlightProfiler 的整体架构,如果你觉得这个项目对你有所帮助,请访问项目地址( https://github.com/alibaba/PyFlightProfiler)进行深度探索,(别忘了 star 哦: 🤭)。

e6b384ca529340a79bcf05f6a8305b31.png



参考资料:

[1] PyFlightProfiler GitHub Repository: https://github.com/alibaba/PyFlightProfiler

[2] Arthas - Alibaba Java Diagnostic Tool: https://arthas.aliyun.com/

[3] PEP 768 – Safe external debugger interface for CPython: https://peps.python.org/pep-0768/

[4] ptrace(2 ) - Linux manual page: https://man7.org/linux/man-pages/man2/ptrace.2.html

[5] Frida - Dynamic instrumentation toolkit: https://frida.re/


目录
相关文章
|
存储 缓存 文件存储
如何保证分布式文件系统的数据一致性
分布式文件系统需要向上层应用提供透明的客户端缓存,从而缓解网络延时现象,更好地支持客户端性能水平扩展,同时也降低对文件服务器的访问压力。当考虑客户端缓存的时候,由于在客户端上引入了多个本地数据副本(Replica),就相应地需要提供客户端对数据访问的全局数据一致性。
32686 78
如何保证分布式文件系统的数据一致性
|
前端开发 容器
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局(上)
HTML5+CSS3前端入门教程---从0开始通过一个商城实例手把手教你学习PC端和移动端页面开发第8章FlexBox布局
17736 19
|
设计模式 存储 监控
设计模式(C++版)
看懂UML类图和时序图30分钟学会UML类图设计原则单一职责原则定义:单一职责原则,所谓职责是指类变化的原因。如果一个类有多于一个的动机被改变,那么这个类就具有多于一个的职责。而单一职责原则就是指一个类或者模块应该有且只有一个改变的原因。bad case:IPhone类承担了协议管理(Dial、HangUp)、数据传送(Chat)。good case:里式替换原则定义:里氏代换原则(Liskov 
36674 19
设计模式(C++版)
|
存储 编译器 C语言
抽丝剥茧C语言(初阶 下)(下)
抽丝剥茧C语言(初阶 下)
|
机器学习/深度学习 人工智能 自然语言处理
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
带你简单了解Chatgpt背后的秘密:大语言模型所需要条件(数据算法算力)以及其当前阶段的缺点局限性
24749 14
|
机器学习/深度学习 弹性计算 监控
重生之---我测阿里云U1实例(通用算力型)
阿里云产品全线降价的一力作,2023年4月阿里云推出新款通用算力型ECS云服务器Universal实例,该款服务器的真实表现如何?让我先测为敬!
36657 15
重生之---我测阿里云U1实例(通用算力型)
|
SQL 存储 弹性计算
Redis性能高30%,阿里云倚天ECS性能摸底和迁移实践
Redis在倚天ECS环境下与同规格的基于 x86 的 ECS 实例相比,Redis 部署在基于 Yitian 710 的 ECS 上可获得高达 30% 的吞吐量优势。成本方面基于倚天710的G8y实例售价比G7实例低23%,总性价比提高50%;按照相同算法,相对G8a,性价比为1.4倍左右。
|
存储 算法 Java
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的限流器RateLimiter功能服务
随着互联网的快速发展,越来越多的应用程序需要处理大量的请求。如果没有限制,这些请求可能会导致应用程序崩溃或变得不可用。因此,限流器是一种非常重要的技术,可以帮助应用程序控制请求的数量和速率,以保持稳定和可靠的运行。
29833 52

热门文章

最新文章

下一篇
开通oss服务