MQ-消息堆积-JDK Bug导致线程阻塞案例分析

本文涉及的产品
云原生网关 MSE Higress,422元/月
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
简介: 一个JDK BUG导致系统LOAD高的案例分析

背景介绍

业务介绍

在某学习APP浏览文章,客户端会将浏览的文章信息上传到服务端,服务端将浏览信息最终存储到HBase;

在某学习APP首页点击【我的】->【历史】,会展示用户浏览文章的历史记录。

技术介绍

服务端的服务是【阅读历史离线服务】,从metaq消费用户阅读文章的信息,解析、处理相关业务逻辑,最后存储到HBase。

问题现象

ECS监控

两台机器【xx-xxxx-xxx-xxx-xxx-xxx-6、xx-xxx-xxx-xxx-xxx-xxx-1】在早高峰的时候Load很高,CPU使用率正常。

metaq监控

造成消息消费的慢,每天早上都有大量消息堆积,导致用户看不到自己的阅读历史。

问题分析

基本情况

【阅读历史离线服务】共有x台ECS,每台ECS配置是8c16g。其中x台机器正常,2台机器不正常。

排查思路

找不同

分析不正常机器与正常机器有哪些差异:对比了【应用程序版本】、【应用程序配置】、【JVM配置参数】、【JDK版本】、【操作系统版本】,发现【JDK版本】不一致。

正常机器:

openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-b10)
OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)

异常机器:

openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)

到此初步定位不同机器运行状态不一致的现象是由于【JDK版本】不一致造成的,所以将【问题机器的JDK版本】替换为【正常机器的JDK版本】问题就可以解决了。

定位问题代码

但是问题的根因还需要尝试排查一下,既然是【JDK版本】不一致造成的,那么会不会是【1.8.0_222】这个版本中有BUG,刚好我们写的程序触发了这个BUG?

所以接下来需要弄清楚程序运行过程中执行了哪些业务逻辑、这些业务逻辑涉及到了哪些JDK API,直接想到的工具是arthas profiler,下面是抓到的热点方法。

通过对比【异常机器】与【正常机器】的热点方法,发现Runtime.getRuntime().availableProcessors()很可疑:

业务相关代码:

CompletableFuture<Result> completableFuture = //业务逻辑,调用hbase-client中api
completableFuture.whenCompleteAsync((result, t) -> {
   //业务逻辑处理            
}, Pool.getSubmitPool()).exceptionally((t) -> {
   //业务逻辑处理
}).get();

CompletableFuture相关代码:

/**
     * Waits if necessary for this future to complete, and then
     * returns its result.
     *
     * @return the result value
     * @throws CancellationException if this future was cancelled
     * @throws ExecutionException if this future completed exceptionally
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }
/**
     * Returns raw result after waiting, or null if interruptible and
     * interrupted.
     */
    private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }

猜测验证

public class Processors {
    public static void main(String []args) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("Available Processors: " + availableProcessors);
        for(int i = 0;i < availableProcessors;i++){
            Thread t = new Thread(()-> {
                    while (true){
                        try {
                            int ps = Runtime.getRuntime().availableProcessors();
                            Thread.sleep(1L);
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
            });
            t.start();
        }
    }
}

将验证代码在【JDK版本】为【1.8.0_222】的机器上运行,随即复现了线上问题。

定位根因

那么【1.8.0_222】与【1.8.0_171】版本在Runtime.getRuntime().availableProcessors()的实现上有什么差别呢?【1.8.0_222】增加了容器环境的逻辑,比【1.8.0_171】复杂了很多。

最后我们看看在https://bugs.openjdk.java.net/的解释吧。

总结:

Runtime.getRuntime().availableProcessors()在不同JDK版本上的实现是没有问题的,CompletableFuture.waitingGet在【1.8.0_222】版本上,没有测试到Runtime.getRuntime().availableProcessors()对性能的影响,导致了性能问题。

解决方法

openjdk在1.8.0_191~1.8.0_222之间的版本都存在问题,换成1.8.0_191之前,或1.8.0_232及以后的版本可以。

问题根因

CompletableFuture.get()的实现方式在一些jdk版本存在缺陷,

详情见:CompletableFuture should not call Runtime.availableProcessors on fast path

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
229 2
|
2月前
|
Oracle Java 关系型数据库
CentOS 7.6操作系统部署JDK实战案例
这篇文章介绍了在CentOS 7.6操作系统上通过多种方式部署JDK的详细步骤,包括使用yum安装openjdk、基于rpm包和二进制包安装Oracle JDK,并提供了配置环境变量的方法。
273 80
|
26天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
17 1
|
1月前
|
Dubbo Java 应用服务中间件
剖析Tomcat线程池与JDK线程池的区别和联系!
剖析Tomcat线程池与JDK线程池的区别和联系!
109 0
剖析Tomcat线程池与JDK线程池的区别和联系!
|
26天前
|
监控 数据可视化 Java
如何使用JDK自带的监控工具JConsole来监控线程池的内存使用情况?
如何使用JDK自带的监控工具JConsole来监控线程池的内存使用情况?
|
2月前
|
监控 数据可视化 Java
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
使用JDK自带的监控工具JConsole来监控线程池的内存使用情况
|
2月前
|
安全 Java 调度
python3多线程实战(python3经典编程案例)
该文章提供了Python3中多线程的应用实例,展示了如何利用Python的threading模块来创建和管理线程,以实现并发执行任务。
44 0
|
3月前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
115 1
|
3月前
|
缓存 Java 调度
【Java 并发秘籍】线程池大作战:揭秘 JDK 中的线程池家族!
【8月更文挑战第24天】Java的并发库提供多种线程池以应对不同的多线程编程需求。本文通过实例介绍了四种主要线程池:固定大小线程池、可缓存线程池、单一线程线程池及定时任务线程池。固定大小线程池通过预设线程数管理任务队列;可缓存线程池能根据需要动态调整线程数量;单一线程线程池确保任务顺序执行;定时任务线程池支持周期性或延时任务调度。了解并正确选用这些线程池有助于提高程序效率和资源利用率。
52 2
|
3月前
|
消息中间件 安全 Kafka
"深入实践Kafka多线程Consumer:案例分析、实现方式、优缺点及高效数据处理策略"
【8月更文挑战第10天】Apache Kafka是一款高性能的分布式流处理平台,以高吞吐量和可扩展性著称。为提升数据处理效率,常采用多线程消费Kafka数据。本文通过电商订单系统的案例,探讨了多线程Consumer的实现方法及其利弊,并提供示例代码。案例展示了如何通过并行处理加快订单数据的处理速度,确保数据正确性和顺序性的同时最大化资源利用。多线程Consumer有两种主要模式:每线程一个实例和单实例多worker线程。前者简单易行但资源消耗较大;后者虽能解耦消息获取与处理,却增加了系统复杂度。通过合理设计,多线程Consumer能够有效支持高并发数据处理需求。
169 4

相关产品

  • 云消息队列 MQ