Python主进程hang住的两个原因

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/feilengcui008/article/details/52831354 最近使用Python遇到两个非常不好定位的问题,表现都是Python主进程hang住。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/feilengcui008/article/details/52831354

最近使用Python遇到两个非常不好定位的问题,表现都是Python主进程hang住。最终定位出一个是subprocess模块的问题,一个是threading.Timer线程的问题。


subprocess模块不当使用的问题

Python的subprocess比较强大,基本上能替换os.system、os.popen、commands.getstatusoutput的功能,但是在使用的过程中需要注意参数stdin/stdout/stderr使用subprocess.PIPE的情况,因为管道通常会有默认的buffer size(Linux x86_64下实测是64K,这里有个疑问io.DEFAULT_BUFFER_SIZE是8K,而ulimit -a的pipe size为512 * 8 = 4K?),父进程如果不使用communicate消耗掉子进程write pipe(stdout/stderr)中的数据,直接进入wait,此时子进程可能阻塞在了pipe的写上,从而导致父子进程都hang住。下面是测试代码。

# main.py
#!/usr/bin/env python
# encoding: utf-8

import subprocess
import os
import tempfile
import sys
import traceback
import commands


# both parent and child process will hang 
# if run.py stdout/stderr exceed 64K, since
# parent process is waiting child process exit
# but child process is blocked by writing pipe
def testSubprocessCallPipe():
    # call: just Popen().wait()
    p = subprocess.Popen(["python", "run.py"], 
        stdin=subprocess.PIPE, 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE)
    ret = p.wait()
    print ret


# will not hang since the parent process which
# call communicate will poll or thread to comsume
# the pipe buffer, so the child process can write
# all it's data to stdout or stderr pipe and it will
# not be blocked.
def testSubprocessCommunicate():
    p = subprocess.Popen(["python", "run.py"], 
        stdin=subprocess.PIPE, 
        stdout=subprocess.PIPE, 
        stderr=subprocess.PIPE)
    print p.communicate()[0]


# will not hang since sys.stdout and sys.stderr 
# don't have 64K default buffer limitation, child
# process can write all it's data to stdout or 
# stderr fd and exit
def testSubprocessCallStdout():
    # call: just Popen().wait()
    p = subprocess.Popen(["python", "run.py"], 
        stdin=sys.stdin, 
        stdout=sys.stdout, 
        stderr=sys.stderr)
    ret = p.wait()
    print ret


# will not hang since file has no limitation of 64K
def testSubprocessCallFile():
    stdout = tempfile.mktemp()
    stderr = tempfile.mktemp()
    print "stdout file %s" % (stdout,), "stderr file %s" % (stderr,)
    stdout = open(stdout, "w")
    stderr = open(stderr, "w")
    p = subprocess.Popen(["python", "run.py"], 
        stdin=None, 
        stdout=stdout, 
        stderr=stderr)
    ret = p.wait()
    print ret


print os.getpid()
# not hang
print "use file"
testSubprocessCallFile()
# not hang
print "use sys.stdout and sys.stderr"
testSubprocessCallStdout()
# not hang
print "use pipe and communicate"
testSubprocessCommunicate()
# hang
print "use pipe and call directly"
testSubprocessCallPipe()
# run.py
import os

print os.getpid()

string = ""
# > 64k will hang
for i in range(1024 * 64 - 4):
    string = string + "c"
# flush to my stdout which might 
# be sys.stdout/pipe/fd...
print string

另外,在subprocess模块源码中还注释说明了另外一种由于fork -> 子进程gc -> exec导致的进程hang住,详细信息可以阅读subprocess模块源码。


threading.Timer的使用不当的问题

定位步骤:

  • pstack 主进程,查看python语言源码的c调用栈,追踪主线程(图中线程1)的各个函数调用栈的python源码,猜测是阻塞在threading._shutdown方法上,修改threading模块源码,并添加日志,定位确实阻塞在_exitFunc的循环join thread上。
    这里写图片描述
    这里写图片描述

  • 线程2的表现是不断创建不断退出,为threading.start入口添加打印traceback,最终定位在一个模块的心跳计时器。调大心跳周期,观察步骤1中的线程id,确定是心跳计时器线程。注: approach 2中可用ctrl-c构造异常,构造hang住的情况。

  • 重现poc

import threading

import time
import sys


# approach 1
class TestClassA(object):
    timer = None
    count = 0
    def __del__(self):
        print "called del"
        if self.timer is not None:
            self.timer.cancel()

    def new_timer(self):
        # current reference 3 + getrefcount 1 = 4
        print "in new_timer: %d" % (sys.getrefcount(self))
        print "ffff"
        self.count += 1
        # my father timer thread exit, ref count -1, but start
        # a new thread will make it still 3
        self.timer = threading.Timer(1, self.new_timer)
        self.timer.start()

    def start_timer(self):
        self.timer = threading.Timer(1, self.new_timer)
        self.timer.start()

def test():
    t = TestClassA()
    print "enter test: %d" % (sys.getrefcount(t),)  # 2
    t.start_timer() # pass ref to a new timer thread through self.new_timer: 3
    print "before out test: %d" % (sys.getrefcount(t),) # 3


# approach 2
class TestClassB(object):
    timer = None
    count = 0
    def __del__(self):
        print "called del"

def func(*ins):
    print "fffff"
    ins[0].count += 1
    ins[0].timer = threading.Timer(1, func, ins) # will increase reference count of ins
    ins[0].timer.start()

def test_in_scope():
    t = TestClassB()
    print "enter test_in_scope: %d" % (sys.getrefcount(t))
    t.timer = threading.Timer(1, func, (t,))
    t.timer.start()
    while t.count < 4:
        time.sleep(1)
    #try:
    #    while t.count < 4:
    #        time.sleep(1)
    #except:
    #    pass

    # if we interrupt or raise some other exceptions and not catch that,
    # will hang
    t.timer.cancel()
    print "before exit test_in_scope: %d" % (sys.getrefcount(t))


# approachh 3
def test_closure():
    t = TestClassA()
    print "enter test_closure: %d" % (sys.getrefcount(t),)
    def func_inner():
        print "ffffffff"
        t.timer = threading.Timer(1, func_inner) # will increase reference count
        t.count += 1
        t.timer.start()
        print "in func: %d" % (sys.getrefcount(t))
    t.timer = threading.Timer(1, func_inner)
    t.timer.start()
    print "before out test_closure: %d" % (sys.getrefcount(t),)



#print "================= test approach 1 ==============="
#print "before test"
#test()
#print "after test"

print "================= test approach 2 ==============="
print "before test_in_scope"
test_in_scope()
print "after test_in_scope"


#print "================= test approach 3 ================"
#print "before test_closure"
#test_closure()
#print "after test_closure"


print "before exit main thread, it will wait and join all other threads"
sys.exit()
相关文章
|
2月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
2月前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
127 1
|
3月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
2月前
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
33 0
|
3月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
46 3
|
4月前
|
负载均衡 Java 调度
探索Python的并发编程:线程与进程的比较与应用
本文旨在深入探讨Python中的并发编程,重点比较线程与进程的异同、适用场景及实现方法。通过分析GIL对线程并发的影响,以及进程间通信的成本,我们将揭示何时选择线程或进程更为合理。同时,文章将提供实用的代码示例,帮助读者更好地理解并运用这些概念,以提升多任务处理的效率和性能。
74 3
|
3月前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
40 0
|
4月前
|
消息中间件 安全 Kafka
Python IPC机制全攻略:让进程间通信变得像呼吸一样自然
【9月更文挑战第12天】在编程领域,进程间通信(IPC)是连接独立执行单元的关键技术。Python凭借简洁的语法和丰富的库支持,提供了多种IPC方案。本文将对比探讨Python的IPC机制,包括管道与消息队列、套接字与共享内存。管道适用于简单场景,而消息队列更灵活,适合高并发环境。套接字广泛用于网络通信,共享内存则在本地高效传输数据。通过示例代码展示`multiprocessing.Queue`的使用,帮助读者理解IPC的实际应用。希望本文能让你更熟练地选择和运用IPC机制。
76 10
|
4月前
|
安全 开发者 Python
Python IPC大揭秘:解锁进程间通信新姿势,让你的应用无界连接
【9月更文挑战第11天】在编程世界中,进程间通信(IPC)如同一座无形的桥梁,连接不同进程的信息孤岛,使应用无界而广阔。Python凭借其丰富的IPC机制,让开发者轻松实现进程间的无缝交流。本文将揭开Python IPC的神秘面纱,介绍几种关键的IPC技术:管道提供简单的单向数据传输,适合父子进程间通信;队列则是线程和进程安全的数据共享结构,支持多进程访问;共享内存允许快速读写大量数据,需配合锁机制确保一致性;套接字则能实现跨网络的通信,构建分布式系统。掌握这些技术,你的应用将不再受限于单个进程,实现更强大的功能。
77 6
|
4月前
|
监控 Ubuntu API
Python脚本监控Ubuntu系统进程内存的实现方式
通过这种方法,我们可以很容易地监控Ubuntu系统中进程的内存使用情况,对于性能分析和资源管理具有很大的帮助。这只是 `psutil`库功能的冰山一角,`psutil`还能够提供更多关于系统和进程的详细信息,强烈推荐进一步探索这个强大的库。
61 1