【自省】线程池里的定时任务跑的可欢了,可咋停掉特定的任务?

简介: 【自省】线程池里的定时任务跑的可欢了,可咋停掉特定的任务?

一、背景

《分布式锁主动续期的入门级实现-自省 | 简约而不简单》中通过【自省】的方式讨论了关于分布式锁自动续期功能的入门级实现方式,简单同步一下上下文:

  1. 客户端抢到分布式锁之后开始执行任务,执行完毕后再释放分布式锁。
  2. 持锁后因客户端异常未能把锁释放,会导致锁成为永恒锁。
  3. 为了避免这种情况,在创建锁的时候给锁指定一个过期时间。
  4. 到期之后锁会被自动删除掉,这个角度看是对锁资源的一种保护。
  5. 重点:但若锁过期被删除后,任务还没结束怎么办?
  6. 可以通过在一个额外的线程中主动推迟分布式锁的过期时间,下文也用续期一词来表述;避免当任务还没执行完,锁就被删除了。
  7. 但当分布式锁很多的情况下,每个锁都配置一个线程着实浪费,所以是否可以用线程池里的定时任务呢?

《【自省】使用Executors.xxx违反阿里Java代码规范,那还不写定时任务了?》 中仍然通过【自省】的方式讨论了也可以使用 ScheduledExecutorService#scheduleAtFixedRate来实现定时任务,它的运行机制大概是这样:

  • 如果上一个任务的执行时间大于等待时间,任务结束后,下一个任务马上执行。
  • 如果上一个任务的执行时间小于等待时间,任务结束后,下一个任务在(间隔时间-执行时间)后开始执行。

在分布式锁主动续期的场景下,它也满足如下定律:

二、理还乱?

ScheduledExecutorService#scheduleAtFixedRate逻辑看很简单,也很清晰,但任何事情都有两面性,把任务丢给线程池的方式,实现起来自然简单清晰,但肯定也有弊端。如果要把锁的功能做的健壮,总要从不断地自我质疑、自我反思中,理顺思路,寻找答案,我认为这属于自省式学习,以后也想尝试这种模式,一起再看看有啥问题:

  • 问题:锁主动释放的时候,续期的任务要关闭嘛?
    是的,当锁被用户主动关闭的时候,主动续期的任务是要主动取消掉的。
  • 问题:如果我不主动取消呢?
    对于不主动续期的锁,抢锁后配置一个合适的过期时间,到期之后锁自然会被释放;这种情况下,客户端本就没有续期任务需要取消。但如果有额外的线程|线程池在定时续期的话,锁用完了需要被释放掉,任务一定要主动取消掉。
  • 问题:可万一忘记了呢?
    有加锁解锁的代码模板,按照模板来;获取锁之后,在finally中执行释放锁的操作。
boolean lockResult = lockInstance.tryLock();
if(lockResult){
    //do work
}finally{
    lockInstance.unLock();
}
复制代码
  • 万一程序异常崩了,没执行finally呢?
    如果程序异常崩了,进程消失后,进程内的资源自然就都释放掉了:续期任务没有了,续期的线程|线程池也没有了。但锁资源就需要依赖锁服务,如 Redis ,在锁过期后主动释放掉锁资源。
  • 问题:关于停止任务,在前文独立线程的实现方式中,有介绍可通过中断机制;但是线程池里的任务怎么取消呢?
    遇事不决问百度,排名第一必有解

2Zmh5D.gif

咱得本意是取消一个任务,示例给出的方法是要把线程池关掉。

2Zmh5D.gif

  • 问题:取消一个任务,要把整个线程池都关掉?
    按照示例所给的办法是不行的,每个任务的取消,都要关闭整个线程池的话,若给每个任务都配有独立的取消能力,就需要给每个任务都配一个独立的线程池,这就跟每个锁配一个独立的线程没有区别了。
  • 问题:目标是多个任务共享一个线程池,怎么不关闭线程池而只关闭特定的任务呢?
    百度出来跟问题相关的文章本就不多,而多数文章提供的奇思妙招并不好使,笔者是浪费了一些时间的,但不能再耽误读者朋友的时间,直接给思路:解铃还须系铃人,scheduleAtFixedRate的返回值是是ScheduledFuture
  • 问题:看到 xxxFuture 是否想能想起Future接口的能力?
    猜测熟悉 get()方法的同学应该特别多,但不知道熟不熟悉cancel方法,如果看到这个方法感到惊喜,欢迎留言互动。
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    ...
    V get() throws InterruptedException, ExecutionException;
    ...
}
复制代码
  • 问题:cancel方法好使嘛?
    不看理论看实效果,试试看:
public static void testCancel() throws InterruptedException {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    System.out.println(" start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
        System.out.println("  work : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    }, 5, 5, TimeUnit.SECONDS);
    TimeUnit.SECONDS.sleep(15);
    scheduledFuture.cancel(true);
    System.out.println("cancel : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    TimeUnit.SECONDS.sleep(30);
}
复制代码
  • 效果满足预期,成功取消了。
start : 2022-12-10T19:24:31.508
  work : 2022-12-10T19:24:36.538
  work : 2022-12-10T19:24:41.539
  work : 2022-12-10T19:24:46.541
cancel : 2022-12-10T19:24:46.541 //成功取消


  • 问题:cancel 里都做了什么呢?看源码可知,其内有两层核心逻辑:
  • 尝试取消正在执行的任务
  • 避免任务再被定时执行
public boolean cancel(boolean mayInterruptIfRunning) {
    // 1. 先调用父类FutureTask#cancel来取消任务。
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    //2. 核心逻辑是从队列中删除该任务。
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}
复制代码

至此,关于使用线程池来执行|取消续期任务,看起来已经没啥问题了;美丽的心情应该是这样的。

OK,稍微开心一下就好,还有问题呢,不想划开的,咱不等了。

三、新的思考

  • 问题:cancel的参数mayInterruptIfRunning 是什么意思?
    从父类cancel方法的注释中可以寻找到答案,如果是 true 的话,即代表尝试通过中断的方式来停止任务

If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task.

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}
复制代码
  • 问题:那就是说也可能抛出 InterruptedException 了?
    如果是抛出 InterruptedException ,示例中,并未看到程序测试有异常中断,也未看到有异常日志信息。
  • 问题:怎么有点玄学了,还能不是interrupt机制?
    在任务内尝试捕获一下看看:
public static void testExceptionCatch() throws InterruptedException {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> scheduledFuture = null;
    System.out.println(" start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    try {
        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("  work : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //throw new RuntimeException("");
        }, 5, 5, TimeUnit.SECONDS);
    }catch (Exception exp){
        exp.printStackTrace();
    }
    TimeUnit.SECONDS.sleep(15);
    scheduledFuture.cancel(true);
    System.out.println("cancel : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    TimeUnit.SECONDS.sleep(30);
}
复制代码
  • 结果中的信息 java.lang.InterruptedException: sleep interrupted 可以明确是任务内的逻辑是可通过中断机制实现的。
start : 2022-12-10T20:10:31.248
  work : 2022-12-10T20:10:36.276
  work : 2022-12-10T20:10:41.272
  work : 2022-12-10T20:10:46.277
cancel : 2022-12-10T20:10:46.277
java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
        at com.wushiyii.lock.ScheduleTest.lambda$testExceptionCatch$1(ScheduleTest.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
复制代码
  • 问题:之前实例中取消任务时,外部也无异常信息,线程池内部留着这个异常干嘛了呢?
    直接抛出异常试试看
public static void testExceptionCatch() throws InterruptedException {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> scheduledFuture = null;
    System.out.println(" start : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    try {
        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println("  work : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
            throw new RuntimeException("just throw ");
            //throw new RuntimeException("");
        }, 5, 5, TimeUnit.SECONDS);
    }catch (Exception exp){
        exp.printStackTrace();
    }
    TimeUnit.SECONDS.sleep(15);
    scheduledFuture.cancel(true);
    System.out.println("cancel : " + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
    TimeUnit.SECONDS.sleep(30);
}
复制代码
  • 仔细观察能看出,结果变的有意思了,work只执行了一次,前文中的执行结果中work都执行了3次,这里却只执行了一次。
start : 2022-12-10T20:16:53.285
  work : 2022-12-10T20:16:58.307
cancel : 2022-12-10T20:17:08.305
复制代码
  • 问题:任务内抛出异常能导致定时任务失去定时执行的能力?是的,使用scheduleAtFixedRate有以下几个情况必须注意:
  1. 任务逻辑中未捕获的异常能导致本该定时执行的任务,后续不再执行。
  2. 任务逻辑中未捕获的异常不会外抛,外部感知不到。
  3. 任务逻辑中的异常,需在任务逻辑内捕获并记录,否则无处可知。
  • 看起来定时任务的使用的确是不能随心所欲的,毕竟大美也总是会说:

问题:那还有什么注意事项?

给线程池指定的线程数要合理,不要无限制的提交任务,也不要每提交一个任务就new一个线程池。还有...

老板亲情提示:面都坨了,快点吃,不然不好吃了。

嗯,【自省】也不能饿肚子,该吃饭了。

如果您还知道一些需要注意的玄妙机制,欢迎留言讨论;咱们下一篇再聊。

四、最后说一句

我是石页兄,如果这篇文章对您有帮助,或者有所启发的话,欢迎关注笔者的微信公众号【 架构染色 】进行交流和学习。您的支持是我坚持写作最大的动力。

相关文章
|
11天前
|
监控 Java
java异步判断线程池所有任务是否执行完
通过上述步骤,您可以在Java中实现异步判断线程池所有任务是否执行完毕。这种方法使用了 `CompletionService`来监控任务的完成情况,并通过一个独立线程异步检查所有任务的执行状态。这种设计不仅简洁高效,还能确保在大量任务处理时程序的稳定性和可维护性。希望本文能为您的开发工作提供实用的指导和帮助。
57 17
|
2月前
|
存储 Java 数据库
如何处理线程池关闭时未完成的任务?
总之,处理线程池关闭时未完成的任务需要综合考虑多种因素,并根据实际情况选择合适的处理方式。通过合理的处理,可以最大程度地减少任务丢失和数据不一致等问题,确保系统的稳定运行和业务的顺利开展。
138 64
|
2月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
130 62
|
2月前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
61 12
|
6月前
|
缓存 Java 调度
Java并发编程:深入解析线程池与Future任务
【7月更文挑战第9天】线程池和Future任务是Java并发编程中非常重要的概念。线程池通过重用线程减少了线程创建和销毁的开销,提高了资源利用率。而Future接口则提供了检查异步任务状态和获取任务结果的能力,使得异步编程更加灵活和强大。掌握这些概念,将有助于我们编写出更高效、更可靠的并发程序。
|
3月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
95 5
|
5月前
|
前端开发 JavaScript 大数据
React与Web Workers:开启前端多线程时代的钥匙——深入探索计算密集型任务的优化策略与最佳实践
【8月更文挑战第31天】随着Web应用复杂性的提升,单线程JavaScript已难以胜任高计算量任务。Web Workers通过多线程编程解决了这一问题,使耗时任务独立运行而不阻塞主线程。结合React的组件化与虚拟DOM优势,可将大数据处理等任务交由Web Workers完成,确保UI流畅。最佳实践包括定义清晰接口、加强错误处理及合理评估任务特性。这一结合不仅提升了用户体验,更为前端开发带来多线程时代的全新可能。
132 1
|
5月前
|
存储 监控 Java
|
6月前
|
Java Linux
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
Java演进问题之1:1线程模型对于I/O密集型任务如何解决
|
5月前
|
Cloud Native Java 调度
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决
项目环境测试问题之线程同步器会造成执行完任务的worker等待的情况如何解决