血的教训--如何正确使用线程池submit和execute方法

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 血的教训--如何正确使用线程池submit和execute方法


血的教训之背景:使用线程池对存量数据进行迁移,但是总有一批数据迁移失败,无异常日志打印

凶案起因

听说parallelStream并行流是个好东西,由于日常开发stream串行流的场景比较多,这次需要写迁移程序刚好可以用得上,那还不赶紧拿来装*一下,此时不装更待何时。机智的我还知道在 JVM 的后台,使用通用的 fork/join 池来完成上述功能,该池是所有并行流共享的,默认情况,fork/join 池会为每个处理器分配一个线程,对应的变通方案就是创建自己的线程池如

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.submit(() -> {
            list.parallelStream().collect(Collectors.toList());
        });
复制代码

于是地雷就是从这里埋下的。

submit还是execute

public static void main(String[] args) throws InterruptedException, ExecutionException {
        final ExecutorService pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        List<Integer> list = Lists.newArrayList(1, 2, 3, null);
        //1.使用submit
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        TimeUnit.SECONDS.sleep(3);
        //2.使用 execute
        pool.execute(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        });
        //3.使用submit,调用get()
        pool.submit(() -> {
            list.parallelStream().map(a -> a.toString()).collect(Collectors.toList());
        }).get();
        TimeUnit.SECONDS.sleep(3);
    }
复制代码

读者自行跑一下上面的用例,会发现单独使用submit方法的并不会打印出错误日志,而使用execute方法打印出了错误日志,但是对submit返回的FutureJoinTask调用get()方法,又会抛出异常。于是真相大白,部分批次中的数据存在脏数据,为null值,遍历到该null值的时候出现了异常,但是异常日志在submit方法中给catch住,没有打印出来(心痛的感觉),而被捕获的异常,被包装在返回的结果类FutureJoinTask中,并没有再次抛出。

如果不需要异步返回结果,请不要用submit 方法

结论先行,我犯的错误就是,浅显的认为submitexecute的区别就只是一个有返回异步结果,一个没有返回一步结果,但是事实是残酷的。submit()中逻辑一定包含了将异步任务抛出的异常捕获,而因为使用方法不当而导致该异常没有再次抛出。

现在提出一个问题,ForkJoinPool#submit()中返回的ForkJoinTask可以获取异步任务的结果,现这个异步抛出了异常,我们尝试获取该任务的结果会是如何? 我们直接看ForkJoinTask#get()的源码。

public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();
    //这里可以直接看到,异步任务出现异常会在调用get()获取结果的时候,会被包装成ExecutionException再次抛出
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        throw new ExecutionException(ex);
    return getRawResult();
}
复制代码

异步任务出现异常会在调用get()获取结果的时候,会被包装成ExecutionException再次抛出,但是异常是在哪里被捕获的呢?万变不离其宗,所有线程的线程都需要重写Thread#run()方法, 投递到ForkJoinPool的线程会被包装成ForkJoinWorkerThread,因此我们看一下ForkJoinWorkerThread#run()的实现.

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            //出现异常,捕获,再次抛出会在调用ForkJoinTask#get()的时候
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}
复制代码

上面的分析是基于ForkJoinPool的,是不是所有的线程池的submitexecute方法的实现都是类似这样,我们常用的线程池ThreadPoolThread实现会是怎样的,同样的思路,我们需要找到投递到ThreadPoolThread的异步任务最终被包装为哪个Thread的子类或者是实现java.lang.Runnable#run,答案就是java.util.concurrent.FutureTask

public void run() {
      ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    //捕获异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } 
     ....
    }
复制代码

总结

java.util.concurrent.ExecutorService#submit(java.lang.Runnable)为何线程池会有这种设定,实际上我们的思路不应该局限于线程池,而是放在获取异步任务结果,异常是否也是属于异步结果FutureTask作为JDK提供的并发工具类的实现中,已经给出了很好的答案,即获取异步任务结果,异常也是属于异步结果,如果异步任务出现运行时异常,那么在获取该任务的结果时,该异常会被重新包装抛出



相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
目录
相关文章
|
9月前
|
缓存 安全 Java
【JavaEE】——单例模式引起的多线程安全问题:“饿汉/懒汉”模式,及解决思路和方法(面试高频)
单例模式下,“饿汉模式”,“懒汉模式”,单例模式下引起的线程安全问题,解锁思路和解决方法
|
9月前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
11月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
184 3
|
11月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
97 2
|
11月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
140 1
|
11月前
|
Java 开发者
Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点
【10月更文挑战第20天】Java多线程初学者指南:介绍通过继承Thread类与实现Runnable接口两种方式创建线程的方法及其优缺点,重点解析为何实现Runnable接口更具灵活性、资源共享及易于管理的优势。
213 1
|
11月前
|
Java
在Java多线程编程中,`wait()`和`notify()`方法的相遇如同一场奇妙的邂逅
在Java多线程编程中,`wait()`和`notify()`方法的相遇如同一场奇妙的邂逅。它们用于线程间通信,使线程能够协作完成任务。通过这些方法,生产者和消费者线程可以高效地管理共享资源,确保程序的有序运行。正确使用这些方法需要遵循同步规则,避免虚假唤醒等问题。示例代码展示了如何在生产者-消费者模型中使用`wait()`和`notify()`。
97 1
|
2月前
|
安全 算法 Java
Java 多线程:线程安全与同步控制的深度解析
本文介绍了 Java 多线程开发的关键技术,涵盖线程的创建与启动、线程安全问题及其解决方案,包括 synchronized 关键字、原子类和线程间通信机制。通过示例代码讲解了多线程编程中的常见问题与优化方法,帮助开发者提升程序性能与稳定性。
114 0
|
2月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
3月前
|
Java 数据挖掘 调度
Java 多线程创建零基础入门新手指南:从零开始全面学习多线程创建方法
本文从零基础角度出发,深入浅出地讲解Java多线程的创建方式。内容涵盖继承`Thread`类、实现`Runnable`接口、使用`Callable`和`Future`接口以及线程池的创建与管理等核心知识点。通过代码示例与应用场景分析,帮助读者理解每种方式的特点及适用场景,理论结合实践,轻松掌握Java多线程编程 essentials。
211 5