kafka-python 执行两次初始化导致进程卡住

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Python logging库重复初始化导致进程卡住, 原因竟是因为kafkaProducer初始化时存在未解开的锁导致的

Python logging库重复初始化导致进程卡住


### 前置知识

1. python的logging库

   Python 的 logging 库是一个灵活且强大的日志记录工具,用于在应用程序中捕获、记录和处理日志信息。它提供了一种配置日志记录的方式,可以满足不同需求的应用程序。


   以下是 logging 库的一些关键概念和组件:


   Logger(记录器): 记录器是日志记录的入口点,负责发出各种日志消息。


   Handler(处理器): 处理器将日志消息发送到目标,如控制台、文件或网络。


   Formatter(格式化器): 格式化器定义日志输出的格式,用于美化和定制日志消息。


   Level(级别): 级别用于控制日志消息的重要性,包括 DEBUG、INFO、WARNING、ERROR 和 CRITICAL。


   Filter(过滤器): 过滤器允许更精细地控制哪些日志消息被记录。


   配置文件: 日志配置文件提供一种灵活的配置方式,允许通过文件而非代码进行日志配置。

 

2. python的celery框架

   Celery 是一个开源的分布式任务队列系统,用于处理大量的异步任务。它允许你将任务从应用程序中分离出来,异步地执行它们,提高应用程序的性能和可伸缩性。Celery主要用于处理耗时的任务,如发送电子邮件、生成报告、处理图像等。


   以下是 Celery 的一些主要特性和概念:


   分布式任务队列: Celery 是一个分布式系统,用于处理异步任务,将任务分发到多个工作节点。


   异步任务: 允许将任务提交到队列,实现异步执行,提高应用性能和响应速度。


   任务调度: 支持定时任务调度,类似于 cron,可以在未来的特定时间执行任务。


   消息代理: 与多种消息代理(如 RabbitMQ、Redis、Amazon SQS)集成,用于在应用程序和工作节点之间传递任务消息。


   结果存储: 可将任务执行的结果保存在不同的后端存储中,例如数据库、缓存等。


   任务重试: 具备自动重试机制,可配置任务在失败时进行重试。


   监控和管理: 提供工具和界面用于监控和管理任务队列,包括 Web 界面和命令行工具。


   多语言支持: 主要用于 Python,但提供了多语言客户端库,支持其他编程语言的集成。

 

3. python连接kafka的库python-kakfa


`

kafka-python ` 是一个用于在 Python 中与 Apache Kafka 集成的客户端库。它提供了 `KafkaProducer` 类用于将消息发送到 Kafka 主题,以及 `KafkaConsumer` 类用于从 Kafka 主题中消费消息。通过这个库,你可以方便地在 Python 中与 Kafka 集群进行通信,实现消息的发布和订阅功能。`kafka-python` 还支持各种配置选项,允许你调整客户端的行为,以满足特定需求。



### 现象描述


python的celery启动后, celery worker 进程卡住, 无法处理任务

并且没有任何日志输出



### 原因概述


我们有一个代码仓库,


既有定时任务的代码, 又有Api应用的代码, 有同事加了一个定时任务, 不小心引入的Api的一个util代码, 导致执行了两次init_logger()初始化日志器方法,(一次定时任务本身的, 一次依赖链中的Api的)


init_logger()里面其中包含了一个初始化日志处理器(发送邮件)名为EmailHandler


EmailHandler里面初始化了kafkaProducer


但是执行第二次init_logger()的时候, 有这么一个逻辑, 执行了这个函数_clearExistingHandlers()


_clearExistingHandlers()这个函数的作用是清空已有的日志处理器列表


这个时间kafkaProducer刚刚在前一次初始化好相关资源, 还有相关的锁没有被释放


这个时候去清EmailHandler,就会导致那个锁没有释放, 无法创建第二个实例, 导致进程卡主没有日志


### 源码分析

/venv/lib/python3.7/site-packages/kafka/producer/kafka.py <br>

line 445

```python

def close(self, timeout=None):

   """Close the producer.


   This method sends a ProducerClose request to the Kafka broker and

   waits for acknowledgement. It is a good practice to call this method

   before the Python process exits to ensure any outstanding messages

   are delivered.


   Args:

       timeout (float): Maximum time to wait for the producer to complete

           the close operation, in seconds. If `None`, the method will

           block until the close is complete.


   Raises:

       KafkaException: If the producer close failed.

   """

   # ...


   # Acquire the lock to ensure thread safety during close

   with self._lock:

       # Check if the producer is already closed

       if self._closed:

           return


       # Set _closed flag to True to mark the producer as closed

       self._closed = True


   # ...


   # Wait for the background thread to finish

   if timeout is not None:

       self._sender_thread.join(timeout)


   # Release the lock

   with self._lock:

       # Check if the producer is already closed

       if self._closed:

           return


       # ...


   # ...


```

概括


```python


with self._lock::通过 with 语句,获取 _lock 锁,确保在多线程环境下的线程安全性。


if self._closed::检查生产者是否已经关闭,如果已经关闭,直接返回,避免重复关闭。


self._closed = True:将 _closed 标志设置为 True,表示生产者已关闭。


self._sender_thread.join(timeout):等待后台线程完成。_sender_thread 是一个在生产者初始化时启动的后台线程,负责异步发送消息到 Kafka broker。


with self._lock::再次获取锁,确保在关闭期间不会有其他线程对生产者进行操作。


if self._closed::再次检查生产者是否已经关闭,避免重复关闭。


```


此部分代码主要是为了确保在多线程环境下,对生产者的关闭操作是线程安全的,并等待后台线程完成。这有助于确保在关闭过程中不会出现竞态条件,从而确保生产者的关闭操作是可靠的。



### 排查步骤


由于我们的应用部署在华为云中, 所以日志使用的是华为云LTS, 而LTS没有采集到任何日志, 所以

手动进入k8s的pod中, 执行`kubectl logs -f` 查看日志, 还是什么日志也没有


然后执行了`kubectl exec -it podname -n -- bash`进入pod, <br>

手动启动celery任务`celery -A tasks.app worker -l`

启动后打印了几行初始化日志后, 进程卡主, CTRL+C中断程序后, 打印出了错误的堆栈信息



image.png



### 重现步骤


```python


from kafka import producer

from config.config import ConfigInfo as Config

import time

bootstrap_servers = [f'{Config.KAFKA_QUOTE_MAIL_HOST1}:{Config.KAFKA_QUOTE_MAIL_PORT}',

                    f'{Config.KAFKA_QUOTE_MAIL_HOST2}:{Config.KAFKA_QUOTE_MAIL_PORT}',

                    f'{Config.KAFKA_QUOTE_MAIL_HOST3}:{Config.KAFKA_QUOTE_MAIL_PORT}']


kp = producer.KafkaProducer(bootstrap_servers=bootstrap_servers)


kp.close()


print('ok!')

```


这样就会报错, 如果close前面等待一段时间, 就不会报错


```python


from kafka import producer

from config.config import ConfigInfo as Config

import time

bootstrap_servers = [f'{Config.KAFKA_QUOTE_MAIL_HOST1}:{Config.KAFKA_QUOTE_MAIL_PORT}',

                    f'{Config.KAFKA_QUOTE_MAIL_HOST2}:{Config.KAFKA_QUOTE_MAIL_PORT}',

                    f'{Config.KAFKA_QUOTE_MAIL_HOST3}:{Config.KAFKA_QUOTE_MAIL_PORT}']


kp = producer.KafkaProducer(bootstrap_servers=bootstrap_servers)

time.sleep(1)

kp.close()


print('ok!')

```



### 解决方案


避免重复执行kafkaPruducer的销毁和初始化 <br>

应用发版后, 不仅需要检查应用运行状态, 还要检查是否有日志输出


相关文章
|
2月前
|
监控 编译器 Python
如何利用Python杀进程并保持驻留后台检测
本教程介绍如何使用Python编写进程监控与杀进程脚本,结合psutil库实现后台驻留、定时检测并强制终止指定进程。内容涵盖基础杀进程、多进程处理、自动退出机制、管理员权限启动及图形界面设计,并提供将脚本打包为exe的方法,适用于需持续清理顽固进程的场景。
|
4月前
|
存储 数据采集 大数据
Python推导式进阶指南:优雅初始化序列的科学与艺术
本文系统讲解Python推导式的用法与技巧,涵盖列表、字典、集合推导式及生成器表达式。通过代码示例和性能对比,展示推导式在数据结构初始化中的优势:简洁高效、执行速度快30%-50%。文章分析基础语法、核心应用场景(如序列构造、键值对转换、去重运算)及嵌套使用,并探讨使用边界与最佳实践,强调可读性优先原则。最后指出,合理运用推导式能显著提升代码质量和处理效率,同时避免过度复杂化的陷阱。
99 0
|
7月前
|
数据采集 Java 数据处理
Python实用技巧:轻松驾驭多线程与多进程,加速任务执行
在Python编程中,多线程和多进程是提升程序效率的关键工具。多线程适用于I/O密集型任务,如文件读写、网络请求;多进程则适合CPU密集型任务,如科学计算、图像处理。本文详细介绍这两种并发编程方式的基本用法及应用场景,并通过实例代码展示如何使用threading、multiprocessing模块及线程池、进程池来优化程序性能。结合实际案例,帮助读者掌握并发编程技巧,提高程序执行速度和资源利用率。
318 0
|
10月前
|
并行计算 数据处理 调度
Python中的并发编程:探索多线程与多进程的奥秘####
本文深入探讨了Python中并发编程的两种主要方式——多线程与多进程,通过对比分析它们的工作原理、适用场景及性能差异,揭示了在不同应用需求下如何合理选择并发模型。文章首先简述了并发编程的基本概念,随后详细阐述了Python中多线程与多进程的实现机制,包括GIL(全局解释器锁)对多线程的影响以及多进程的独立内存空间特性。最后,通过实例演示了如何在Python项目中有效利用多线程和多进程提升程序性能。 ####
|
10月前
|
调度 iOS开发 MacOS
python多进程一文够了!!!
本文介绍了高效编程中的多任务原理及其在Python中的实现。主要内容包括多任务的概念、单核和多核CPU的多任务实现、并发与并行的区别、多任务的实现方式(多进程、多线程、协程等)。详细讲解了进程的概念、使用方法、全局变量在多个子进程中的共享问题、启动大量子进程的方法、进程间通信(队列、字典、列表共享)、生产者消费者模型的实现,以及一个实际案例——抓取斗图网站的图片。通过这些内容,读者可以深入理解多任务编程的原理和实践技巧。
552 1
|
11月前
|
Python
Python中的多线程与多进程
本文将探讨Python中多线程和多进程的基本概念、使用场景以及实现方式。通过对比分析,我们将了解何时使用多线程或多进程更为合适,并提供一些实用的代码示例来帮助读者更好地理解这两种并发编程技术。
|
11月前
|
数据挖掘 程序员 调度
探索Python的并发编程:线程与进程的实战应用
【10月更文挑战第4天】 本文深入探讨了Python中实现并发编程的两种主要方式——线程和进程,通过对比分析它们的特点、适用场景以及在实际编程中的应用,为读者提供清晰的指导。同时,文章还介绍了一些高级并发模型如协程,并给出了性能优化的建议。
126 3
|
10月前
|
监控 JavaScript 前端开发
python中的线程和进程(一文带你了解)
欢迎来到瑞雨溪的博客,这里是一位热爱JavaScript和Vue的大一学生分享技术心得的地方。如果你从我的文章中有所收获,欢迎关注我,我将持续更新更多优质内容,你的支持是我前进的动力!🎉🎉🎉
140 0
|
11月前
|
Python
深入解析 Python 中的对象创建与初始化:__new__ 与 __init__ 方法
深入解析 Python 中的对象创建与初始化:__new__ 与 __init__ 方法
110 1
|
12月前
|
负载均衡 Java 调度
探索Python的并发编程:线程与进程的比较与应用
本文旨在深入探讨Python中的并发编程,重点比较线程与进程的异同、适用场景及实现方法。通过分析GIL对线程并发的影响,以及进程间通信的成本,我们将揭示何时选择线程或进程更为合理。同时,文章将提供实用的代码示例,帮助读者更好地理解并运用这些概念,以提升多任务处理的效率和性能。
176 3

热门文章

最新文章

推荐镜像

更多