MQ-消息堆积-JDK Bug导致线程阻塞案例分析

本文涉及的产品
Serverless 应用引擎 SAE,800核*时 1600GiB*时
容器镜像服务 ACR,镜像仓库100个 不限时长
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 一个JDK BUG导致系统LOAD高的案例分析

背景介绍

业务介绍

在某学习APP浏览文章,客户端会将浏览的文章信息上传到服务端,服务端将浏览信息最终存储到HBase;

在某学习APP首页点击【我的】->【历史】,会展示用户浏览文章的历史记录。

技术介绍

服务端的服务是【阅读历史离线服务】,从metaq消费用户阅读文章的信息,解析、处理相关业务逻辑,最后存储到HBase。

问题现象

ECS监控

两台机器【xx-xxxx-xxx-xxx-xxx-xxx-6、xx-xxx-xxx-xxx-xxx-xxx-1】在早高峰的时候Load很高,CPU使用率正常。

metaq监控

造成消息消费的慢,每天早上都有大量消息堆积,导致用户看不到自己的阅读历史。

问题分析

基本情况

【阅读历史离线服务】共有x台ECS,每台ECS配置是8c16g。其中x台机器正常,2台机器不正常。

排查思路

找不同

分析不正常机器与正常机器有哪些差异:对比了【应用程序版本】、【应用程序配置】、【JVM配置参数】、【JDK版本】、【操作系统版本】,发现【JDK版本】不一致。

正常机器:

openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-b10)
OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)

异常机器:

openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)

到此初步定位不同机器运行状态不一致的现象是由于【JDK版本】不一致造成的,所以将【问题机器的JDK版本】替换为【正常机器的JDK版本】问题就可以解决了。

定位问题代码

但是问题的根因还需要尝试排查一下,既然是【JDK版本】不一致造成的,那么会不会是【1.8.0_222】这个版本中有BUG,刚好我们写的程序触发了这个BUG?

所以接下来需要弄清楚程序运行过程中执行了哪些业务逻辑、这些业务逻辑涉及到了哪些JDK API,直接想到的工具是arthas profiler,下面是抓到的热点方法。

通过对比【异常机器】与【正常机器】的热点方法,发现Runtime.getRuntime().availableProcessors()很可疑:

业务相关代码:

CompletableFuture<Result> completableFuture = //业务逻辑,调用hbase-client中api
completableFuture.whenCompleteAsync((result, t) -> {
   //业务逻辑处理            
}, Pool.getSubmitPool()).exceptionally((t) -> {
   //业务逻辑处理
}).get();

CompletableFuture相关代码:

/**
     * Waits if necessary for this future to complete, and then
     * returns its result.
     *
     * @return the result value
     * @throws CancellationException if this future was cancelled
     * @throws ExecutionException if this future completed exceptionally
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }
/**
     * Returns raw result after waiting, or null if interruptible and
     * interrupted.
     */
    private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                    1 << 8 : 0; // Use brief spin-wait on multiprocessors
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }

猜测验证

public class Processors {
    public static void main(String []args) {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("Available Processors: " + availableProcessors);
        for(int i = 0;i < availableProcessors;i++){
            Thread t = new Thread(()-> {
                    while (true){
                        try {
                            int ps = Runtime.getRuntime().availableProcessors();
                            Thread.sleep(1L);
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                    }
            });
            t.start();
        }
    }
}

将验证代码在【JDK版本】为【1.8.0_222】的机器上运行,随即复现了线上问题。

定位根因

那么【1.8.0_222】与【1.8.0_171】版本在Runtime.getRuntime().availableProcessors()的实现上有什么差别呢?【1.8.0_222】增加了容器环境的逻辑,比【1.8.0_171】复杂了很多。

最后我们看看在https://bugs.openjdk.java.net/的解释吧。

总结:

Runtime.getRuntime().availableProcessors()在不同JDK版本上的实现是没有问题的,CompletableFuture.waitingGet在【1.8.0_222】版本上,没有测试到Runtime.getRuntime().availableProcessors()对性能的影响,导致了性能问题。

解决方法

openjdk在1.8.0_191~1.8.0_222之间的版本都存在问题,换成1.8.0_191之前,或1.8.0_232及以后的版本可以。

问题根因

CompletableFuture.get()的实现方式在一些jdk版本存在缺陷,

详情见:CompletableFuture should not call Runtime.availableProcessors on fast path

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
13天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
59 2
|
8天前
|
Oracle Java 关系型数据库
CentOS 7.6操作系统部署JDK实战案例
这篇文章介绍了在CentOS 7.6操作系统上通过多种方式部署JDK的详细步骤,包括使用yum安装openjdk、基于rpm包和二进制包安装Oracle JDK,并提供了配置环境变量的方法。
161 80
|
15天前
|
消息中间件 存储 数据中心
RocketMQ的长轮询(Long Polling)实现分析
文章深入分析了RocketMQ的长轮询实现机制,长轮询结合了推送(push)和拉取(pull)两种消息消费模式的优点,通过客户端和服务端的配合,确保了消息的实时性同时将主动权保留在客户端。文中首先解释了长轮询的基本概念和实现步骤,然后通过一个简单的实例模拟了长轮询的过程,最后详细介绍了RocketMQ中DefaultMQPushConsumer的长轮询实现方式,包括PullMessage服务、PullMessageProcessor服务和PullCallback回调的工作原理。
38 1
|
1月前
|
消息中间件 Arthas Java
RocketMQ—一次连接namesvr失败的案例分析
项目组在使用RocketMQ时遇到Consumer连接Name Server失败的问题,异常显示连接特定地址失败。通过Arthas工具逐步分析代码执行路径,定位到创建Channel返回空值导致异常。进一步跟踪发现,问题源于Netty组件在初始化`ByteBufAllocator`时出现错误。分析依赖后确认存在Netty版本冲突。解决方法为排除冲突的Netty包,仅保留兼容版本。
105 0
RocketMQ—一次连接namesvr失败的案例分析
【多线程面试题十二】、阻塞线程的方式有哪些?
线程阻塞的方式包括调用sleep()方法、阻塞式IO操作、等待同步监视器的获取、等待通知(notify),以及慎用的suspend()方法。
|
25天前
|
消息中间件 固态存储 RocketMQ
RocketMQ消息堆积常见场景与处理方案
文章分析了在使用RocketMQ时消息堆积的常见场景,如消费者注册失败或消费速度慢于生产速度,并提供了相应的处理方案,包括提高消费并行度、批量消费、跳过非重要消息以及优化消费代码业务逻辑等。
|
3月前
|
消息中间件 负载均衡 开发工具
消息队列 MQ产品使用合集之当一个服务出现堆积后,为什么不把后面的流量负载到其它服务上
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之当一个服务出现堆积后,为什么不把后面的流量负载到其它服务上
|
2月前
|
消息中间件 数据安全/隐私保护 RocketMQ
消息队列 MQ使用问题之遇到消费速度是固定的并且导致了堆积,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 容灾 物联网
【RocketMQ系列十四】RocketMQ中消息堆积如何处理
【RocketMQ系列十四】RocketMQ中消息堆积如何处理
789 3

相关产品

  • 云消息队列 MQ
  • 下一篇
    DDNS