一个线程罢工的诡异事件

简介: 线上某个应用里业务逻辑没有执行,导致的结果是数据库里的某些数据没有更新。

背景


简单理了下如图:



  1. 有一个生产线程一直源源不断的往队列写数据。


  1. 消费线程也一直不停的取出数据后写入后续的业务线程池。


  1. 业务线程池里的线程会对每个任务进行入库操作。


整个过程还是比较清晰的,就是一个典型的生产者消费者模型。


尝试定位


接下来便是尝试定位这个问题,首先例行检查了以下几项:


  • 是否内存有内存溢出?


  • 应用 GC 是否有异常?


通过日志以及监控发现以上两项都是正常的。


紧接着便 dump 了线程快照查看业务线程池中的线程都在干啥。



结果发现所有业务线程池都处于 waiting 状态,队列也是空的。


同时生产者使用的队列却已经满了,没有任何消费迹象。


结合上面的流程图不难发现应该是消费队列的 Consumer 出问题了,导致上游的队列不能消费,下有的业务线程池没事可做。


review 代码


于是查看了消费代码的业务逻辑,同时也发现消费线程是一个单线程



结合之前的线程快照,我发现这个消费线程也是处于 waiting 状态,和后面的业务线程池一模一样。


他做的事情基本上就是对消息解析,之后丢到后面的业务线程池中,没有发现什么特别的地方。


但是由于里面的分支特别多(switch case),看着有点头疼;所以我与写这个业务代码的同学沟通后他告诉我确实也只是入口处解析了一下数据,后续所有的业务逻辑都是丢到线程池中处理的,于是我便带着这个前提去排查了(埋下了伏笔)。


因为这里消费的队列其实是一个 disruptor 队列;它和我们常用的 BlockQueue 不太一样,不是由开发者自定义一个消费逻辑进行处理的;而是在初始化队列时直接丢一个线程池进去,它会在内部使用这个线程池进行消费,同时回调一个方法,在这个方法里我们写自己的消费逻辑。


所以对于开发者而言,这个消费逻辑其实是一个黑盒。


于是在我反复 review 了消费代码中的数据解析逻辑发现不太可能出现问题后,便开始疯狂怀疑是不是 disruptor 自身的问题导致这个消费线程罢工了。


再翻了一阵 disruptor 的源码后依旧没发现什么问题后我咨询对 disruptor 较熟的@咖啡拿铁,在他的帮助下在本地模拟出来和生产一样的情况。


本地模拟



本地也是创建了一个单线程的线程池,分别执行了两个任务。


  • 第一个任务没啥好说的,就是简单的打印。


  • 第二个任务会对一个数进行累加,加到 10 之后就抛出一个未捕获的异常。


接着我们来运行一下。



发现当任务中抛出一个没有捕获的异常时,线程池中的线程就会处于 waiting 状态,同时所有的堆栈都和生产相符。


细心的朋友会发现正常运行的线程名称和异常后处于 waiting 状态的线程名称是不一样的,这个后续分析。


解决问题



当加入异常捕获后又如何呢?



程序肯定会正常运行。


同时会发现所有的任务都是由一个线程完成的。


虽说就是加了一行代码,但我们还是要搞清楚这里面的门门道道。


源码分析


于是只有直接 debug 线程池的源码最快了;



通过刚才的异常堆栈我们进入到 ThreadPoolExecutor.java:1142 处。


  • 发现线程池已经帮我们做了异常捕获,但依然会往上抛。


  • finally 块中会执行 processWorkerExit(w, completedAbruptly) 方法。



看过之前《如何优雅的使用和理解线程池》的朋友应该还会有印象。


线程池中的任务都会被包装为一个内部 Worker 对象执行。


processWorkerExit 可以简单的理解为是把当前运行的线程销毁(workers.remove(w))、同时新增(addWorker())一个 Worker 对象接着处理;


就像是哪个零件坏掉后重新换了一个新的接着工作,但是旧零件负责的任务就没有了。


接下来看看 addWorker() 做了什么事情:



只看这次比较关心的部分;添加成功后会直接执行他的 start() 的方法。



由于 Worker 实现了 Runnable 接口,所以本质上就是调用了 runWorker() 方法。


runWorker() 其实就是上文 ThreadPoolExecutor 抛出异常时的那个方法。



它会从队列里一直不停的获取待执行的任务,也就是 getTask();在 getTask 也能看出它会一直从内置的队列取出任务。


而一旦队列是空的,它就会 waitingworkQueue.take(),也就是我们从堆栈中发现的 1067 行代码。


线程名字的变化



上文还提到了异常后的线程名称发生了改变,其实在 addWorker() 方法中可以看到 new Worker()时就会重新命名线程的名称,默认就是把后缀的计数+1。


这样一切都能解释得通了,真相只有一个:


在单个线程的线程池中一但抛出了未被捕获的异常时,线程池会回收当前的线程并创建一个新的 Worker; 它也会一直不断的从队列里获取任务来执行,但由于这是一个消费线程,根本没有生产者往里边丢任务,所以它会一直 waiting 在从队列里获取任务处,所以也就造成了线上的队列没有消费,业务线程池没有执行的问题。


总结


所以之后线上的那个问题加上异常捕获之后也变得正常了,但我还是有点纳闷的是:


既然后续所有的任务都是在线程池中执行的,也就是纯异步了,那即便是出现异常也不会抛到消费线程中啊。


结果发现在上文提到的众多 switch case 中,最后一个竟然是直接操作的数据库,导致一个非空字段报错了🤬!!


虽然这个问题改动很小解决了,但复盘整个过程还是有许多需要改进的:


  1. 消费队列的线程名称竟然和业务线程的前缀一样,导致我光找它就花了许多时间,命名必须得调整。


  1. 开发规范,防御式编程大家需要养成习惯。


  1. 未知的技术栈需要谨慎,比如 disruptor,之前的团队应该只是看了个高性能的介绍就直接使用,并没有深究其原理;导致出现问题后对它拿不准。


实例代码:


github.com/crossoverJi…


相关文章
|
1月前
SDL事件处理以及线程使用(2)
SDL库中事件处理和多线程编程的基本概念和示例代码,包括如何使用SDL事件循环来处理键盘和鼠标事件,以及如何创建和管理线程、互斥锁和条件变量。
27 1
SDL事件处理以及线程使用(2)
|
3月前
|
数据采集 Java Python
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
python 递归锁、信号量、事件、线程队列、进程池和线程池、回调函数、定时器
|
4月前
|
设计模式 存储 安全
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:设计一个线程安全的单例类并解释其内存占用情况?使用Java多线程工具类实现一个高效的线程池,并解释其背后的原理。结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
61 1
|
4月前
|
设计模式 存储 缓存
Java面试题:结合单例模式与Java内存模型,设计一个线程安全的单例类?使用内存屏障与Java并发工具类,实现一个高效的并发缓存系统?结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
Java面试题:结合单例模式与Java内存模型,设计一个线程安全的单例类?使用内存屏障与Java并发工具类,实现一个高效的并发缓存系统?结合观察者模式与Java并发框架,设计一个可扩展的事件处理系统
37 0
|
6月前
|
存储 JSON 运维
【运维】Powershell 服务器系统管理信息总结(进程、线程、磁盘、内存、网络、CPU、持续运行时间、系统账户、日志事件)
【运维】Powershell 服务器系统管理信息总结(进程、线程、磁盘、内存、网络、CPU、持续运行时间、系统账户、日志事件)
158 0
|
消息中间件 缓存 资源调度
【Java项目】使用Nacos实现动态线程池技术以及Nacos配置文件更新监听事件
【Java项目】使用Nacos实现动态线程池技术以及Nacos配置文件更新监听事件
392 0
|
数据采集 监控 NoSQL
一日一技:Python多线程的事件监控
一日一技:Python多线程的事件监控
161 0
|
数据采集 监控 NoSQL
一日一技:Python多线程的事件监控
一日一技:Python多线程的事件监控
236 0
|
消息中间件 监控 安全
单线程事件处理器ControllerEventManager
单线程事件处理器,Controller端定义的一个组件。该组件内置了一个专属线程,负责处理其他线程发送过来的Controller事件。还定义了一些管理方法,为专属线程输送待处理事件。
77 0
|
消息中间件 Java Shell
spring学习笔记(二)spring中的事件及多线程
spring学习笔记(二)spring中的事件及多线程
221 0
spring学习笔记(二)spring中的事件及多线程