以为很熟悉CountDownLatch的使用了,没想到在生产环境翻车了

简介: 我们知道用来控制并发流程的同步工具,主要的作用是为了等待多个线程同时完成任务后,在进行主线程任务。

前言

大家好,我是小郭,之前分享了CountDownLatch的使用,我们知道用来控制并发流程的同步工具,主要的作用是为了等待多个线程同时完成任务后,在进行主线程任务。

万万没想到,在生产环境中竟然翻车了,因为没有考虑到一些场景,导致了CountDownLatch出现了问题,接下来来分享一下由于CountDownLatch导致的问题。

需求背景

先简单介绍下业务场景,针对用户批量下载的文件进行修改上传

网络异常,图片无法展示
|

为了提高执行的速度,所以在采用线程池去执行 下载-修改-上传 的操作,并在全部执行完之后统一提交保存文件地址到数据库,于是加入了CountDownLatch来进行控制。

具体实现

根据服务本身情况,自定义一个线程池

public static ExecutorService testExtcutor() {
        return new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1));
    }

模拟执行

public static void main(String[] args) {
        // 下载文件总数
        List<Integer> resultList = new ArrayList<>(100);
        IntStream.range(0,100).forEach(resultList::add);
        // 下载文件分段
        List<List<Integer>> split = CollUtil.split(resultList, 10);
        ExecutorService executorService = BaseThreadPoolExector.testExtcutor();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (List<Integer> list : split) {
            executorService.execute(() -> {
                list.forEach(i ->{
                    try {
                        // 模拟业务操作
                        Thread.sleep(500);
                        System.out.println("任务进入");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        System.out.println(e.getMessage());
                    } finally {
                        System.out.println(countDownLatch.getCount());
                        countDownLatch.countDown();
                    }
                });
            });
        }
        try {
            countDownLatch.await();
            System.out.println("countDownLatch.await()");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

一开始我个人感觉没有什么问题,反正finally都能够做减一的操作,到最后调用await方法,进行主线程任务

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
  at Thread.executor.executorTestBlock.main(executorTestBlock.java:28)
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown
任务进入
countDownLatch.countDown

由于任务数量较多,阻塞队列中已经塞满了,所以默认的拒绝策略,当队列满时,处理策略报错异常,

要注意这个异常是线程池,自己抛出的,不是我们循环里面打印出来的,

这也造成了,线上这个线程池被中断了,打断主线程,执行不到await

利用jstack,我们就能够看到有问题

"pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007ff6198b7000 nid=0xa903 waiting on condition [0x0000700001c64000]
   java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
  at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007ff6198b6800 nid=0x5903 waiting on condition [0x0000700001b61000]
   java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x000000076b2283f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
  at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

解决方案

  1. 调大阻塞队列,但是问题来了,到底多少阻塞队列才是大呢,如果太大了会不由又造成内存溢出等其他的问题
  2. 在第一个的基础上,我们修改了拒绝策略,当触发拒绝策略的时候,用调用者所在的线程来执行任务
public static ThreadPoolExecutor queueExecutor(BlockingQueue<Runnable> workQueue){
        return new ThreadPoolExecutor(
                size,
                size,
                0L,
                TimeUnit.SECONDS,
                workQueue,
                new ThreadPoolExecutor.CallerRunsPolicy());
    }
  1. 你可能又会想说,会不会任务数量太多,导致调用者所在的线程执行不过来,任务提交的性能急剧下降
    那我们就应该自定义拒绝策略,将这下排队的消息记录下来,采用补偿机制的方式去执行
  2. 同时也要注意上面的那个异常是线程池抛出来的,我们自己也需要将线程池进行try catch,记录问题数据,并且在finally中执行countDownLatch.countDown来避免,线程池的使用

总结

目前根据业务部门的反馈,业务实际中任务数不很特别多的情况,所以暂时先采用了第二种方式去解决这个线上问题

在这里我们也可以看到,如果没有正确的关闭countDownLatch,可能会导致一直等待,这也是我们需要注意的。

工具虽然好,但是依然要注意他带来的问题,没有正确的去处理好,引发的一系列连锁反应。


相关文章
|
SQL 数据格式
视图有哪些特点?哪些使用场景?
视图有哪些特点?哪些使用场景?
|
11月前
|
缓存 监控 Java
深入解析java正则表达式
本文深入解析Java正则表达式的应用,从基础概念到实际开发技巧全面展开。正则表达式是一种强大的文本处理工具,广泛应用于格式验证、搜索替换等场景。Java通过`Pattern`和`Matcher`类支持正则表达式,`Pattern.compile()`方法将正则字符串编译为高效模式对象。文章详细介绍了核心类的功能、常用正则语法及实际案例(如邮箱和电话号码验证)。掌握这些内容,可显著提升文本处理能力,满足多种开发需求。
326 1
|
SQL 分布式计算 Java
【赵渝强老师】Hive的体系架构
Hive是基于Hadoop的数据仓库平台,提供SQL-like的HQL语言进行数据分析,无需编写复杂的Java代码。Hive支持丰富的数据模型,可将SQL语句转换为MapReduce任务在Yarn上运行,底层依赖HDFS存储数据。Hive可通过CLI、JDBC和Web界面执行SQL查询。
675 2
|
Java 数据库连接 数据库
深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能
在Java应用开发中,数据库操作常成为性能瓶颈。本文通过问题解答形式,深入探讨Java连接池技术如何通过复用数据库连接、减少连接建立和断开的开销,从而显著提升系统性能。文章介绍了连接池的优势、选择和使用方法,以及优化配置的技巧。
278 1
|
SQL JSON Go
Go - 基于 GORM 获取当前请求所执行的 SQL 信息
Go - 基于 GORM 获取当前请求所执行的 SQL 信息
482 3
IBSS、BSS和ESS之间的区别
【8月更文挑战第23天】
1582 0
|
存储 缓存 负载均衡
软件体系结构 - 数据分片(2)一致性哈希分片
【4月更文挑战第20天】软件体系结构 - 数据分片(2)一致性哈希分片
592 21
|
运维 Kubernetes 监控
一文读懂蓝绿发布、A/B 测试和金丝雀发布的优缺点
目前,业界已经总结出了几种常见的服务发布策略来解决版本升级过程中带来的流量有损问题。本文首先会对这些普遍的发布策略进行简单的原理解析,最后结合阿里云的云原生网关对这些发布策略进行实践。
3489 97
一文读懂蓝绿发布、A/B 测试和金丝雀发布的优缺点
|
机器学习/深度学习
【机器学习】特征筛选实例与代码详解
【机器学习】特征筛选实例与代码详解
787 0