Spring Boot 2.x基础教程:配置线程池的拒绝策略

简介: Spring Boot 2.x基础教程:配置线程池的拒绝策略

通过之前三篇关于Spring Boot异步任务实现的博文,我们分别学会了用@Async创建异步任务为异步任务配置线程池使用多个线程池隔离不同的异步任务。今天这篇,我们继续对上面的知识进行完善和优化!

如果你已经看过上面几篇内容并已经掌握之后,一起来思考下面这个问题:

假设,线程池配置为核心线程数2、最大线程数2、缓冲队列长度2。此时,有5个异步任务同时开始,会发生什么?

场景重现

我们先来把上面的假设用代码实现一下:

第一步:创建Spring Boot应用,根据上面的假设写好线程池配置。

@EnableAsync
@SpringBootApplication
public class Chapter78Application {
    public static void main(String[] args) {
        SpringApplication.run(Chapter78Application.class, args);
    }
    @EnableAsync
    @Configuration
    class TaskPoolConfig {
        @Bean
        public Executor taskExecutor1() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(2);
            executor.setMaxPoolSize(2);
            executor.setQueueCapacity(2);
            executor.setKeepAliveSeconds(60);
            executor.setThreadNamePrefix("executor-1-");
            return executor;
        }
    }
}

第二步:用@Async注解实现一个部分任务

@Slf4j
@Component
public class AsyncTasks {
    public static Random random = new Random();
    @Async("taskExecutor1")
    public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
        log.info("开始任务:{}", taskNo);
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
        return CompletableFuture.completedFuture("任务完成");
    }
}

第三步:编写测试用例

@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {
    @Autowired
    private AsyncTasks asyncTasks;
    @Test
    public void test2() throws Exception {
        // 线程池配置:core-2,max-2,queue=2,同时有5个任务,出现下面异常:
        // org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
        // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9
        long start = System.currentTimeMillis();
        // 线程池1
        CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
        CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
        CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
        CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
        CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
        // 一起执行
        CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
        long end = System.currentTimeMillis();
        log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
    }
}

执行一下,可以类似下面这样的日志信息:

2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-2] com.didispace.chapter78.AsyncTasks       : 开始任务:2
2021-09-22 17:33:08.159  INFO 21119 --- [   executor-1-1] com.didispace.chapter78.AsyncTasks       : 开始任务:1
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732
  at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
  at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
  at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
  at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
  at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
  at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
  at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
  at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
  at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
  at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
  at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
  at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
  at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
  at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
  at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
  at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
  at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
  at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
  at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
  at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
  at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
  at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
  at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
  at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
  at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
  at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
  at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
  at java.util.ArrayList.forEach(ArrayList.java:1255)
  at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
  at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
  at java.util.ArrayList.forEach(ArrayList.java:1255)
  at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
  at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
  at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
  at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
  at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
  at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
  at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
  at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
  at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
  at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
  at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
  at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
  at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
  at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
  at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
  at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
  at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
  at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
  ... 74 more

从异常信息org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task:中,可以很明确的知道,第5个任务因为超过了执行线程+缓冲队列长度,而被拒绝了。

所有,默认情况下,线程池的拒绝策略是:当线程池队列满了,会丢弃这个任务,并抛出异常。

配置拒绝策略

虽然线程池有默认的拒绝策略,但实际开发过程中,有些业务场景,直接拒绝的策略往往并不适用,有时候我们可能会选择舍弃最早开始执行而未完成的任务、也可能会选择舍弃刚开始执行而未完成的任务等更贴近业务需要的策略。所以,为线程池配置其他拒绝策略或自定义拒绝策略是很常见的需求,那么这个要怎么实现呢?

下面就来具体说说今天的正题,如何为线程池配置拒绝策略、如何自定义拒绝策略。

看下面这段代码的最后一行,setRejectedExecutionHandler方法就是为线程池设置拒绝策略的方法:

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//...其他线程池配置
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
在ThreadPoolExecutor中提供了4种线程的策略可以供开发者直接使用,你只需要像下面这样设置即可:
// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

这四个策略对应的含义分别是:

  • AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
  • DiscardPolicy策略:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
  • DiscardOldestPolicy策略:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
  • CallerRunsPolicy策略:如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。

而如果你要自定义一个拒绝策略,那么可以这样写:

executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 拒绝策略的逻辑
    }
});

当然如果你喜欢用Lamba表达式,也可以这样写:

executor.setRejectedExecutionHandler((r, executor1) -> {
    // 拒绝策略的逻辑
});

好了,今天的学习就到这里!

如果您学习过程中如遇困难?可以加入我们超高质量的Spring技术交流群,参与交流与讨论,更好的学习与进步!更多Spring Boot教程可以点击直达!,欢迎收藏与转发支持!

代码示例

本文的完整工程可以查看下面仓库中2.x目录下的chapter7-8工程:

如果您觉得本文不错,欢迎Star支持,您的关注是我坚持的动力!

目录
相关文章
|
3天前
|
Cloud Native Java Nacos
springcloud/springboot集成NACOS 做注册和配置中心以及nacos源码分析
通过本文,我们详细介绍了如何在 Spring Cloud 和 Spring Boot 中集成 Nacos 进行服务注册和配置管理,并对 Nacos 的源码进行了初步分析。Nacos 作为一个强大的服务注册和配置管理平台,为微服务架构提供
29 14
|
6天前
|
Java 数据库 开发者
详细介绍SpringBoot启动流程及配置类解析原理
通过对 Spring Boot 启动流程及配置类解析原理的深入分析,我们可以看到 Spring Boot 在启动时的灵活性和可扩展性。理解这些机制不仅有助于开发者更好地使用 Spring Boot 进行应用开发,还能够在面对问题时,迅速定位和解决问题。希望本文能为您在 Spring Boot 开发过程中提供有效的指导和帮助。
43 12
|
3天前
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
3月前
|
Java 开发者 微服务
手写模拟Spring Boot自动配置功能
【11月更文挑战第19天】随着微服务架构的兴起,Spring Boot作为一种快速开发框架,因其简化了Spring应用的初始搭建和开发过程,受到了广大开发者的青睐。自动配置作为Spring Boot的核心特性之一,大大减少了手动配置的工作量,提高了开发效率。
83 0
|
1月前
|
负载均衡 IDE Java
SpringBoot整合XXL-JOB【04】- 以GLUE模式运行与执行器负载均衡策略
在本节中,我们将介绍XXL-JOB的GLUE模式和集群模式下的路由策略。GLUE模式允许直接在线上改造方法为定时任务,无需重新部署。通过一个测试方法,展示了如何在调度中心配置并使用GLUE模式执行定时任务。接着,我们探讨了多实例环境下的负载均衡策略,确保任务不会重复执行,并可通过修改路由策略(如轮训)实现任务在多个实例间的均衡分配。最后,总结了GLUE模式和负载均衡策略的应用,帮助读者更深入理解XXL-JOB的使用。
69 9
SpringBoot整合XXL-JOB【04】-  以GLUE模式运行与执行器负载均衡策略
|
4月前
|
消息中间件 Java 调度
Spring Boot 3.3 后台任务处理的高效策略
【10月更文挑战第18天】 在现代应用程序中,后台任务处理对于提升用户体验和系统性能至关重要。Spring Boot 3.3提供了多种机制来实现后台任务处理,包括异步方法、任务调度和使用消息系统。本文将探讨这些机制的最佳实践,帮助开发者提高应用程序的效率和响应速度。
85 0
|
1月前
|
JavaScript Java 程序员
SpringBoot自动配置及自定义Starter
Java程序员依赖Spring框架简化开发,但复杂的配置文件增加了负担。SpringBoot以“约定大于配置”理念简化了这一过程,通过引入各种Starter并加载默认配置,几乎做到开箱即用。
127 10
SpringBoot自动配置及自定义Starter
|
1月前
|
算法 安全 Java
Java线程调度揭秘:从算法到策略,让你面试稳赢!
在社招面试中,关于线程调度和同步的相关问题常常让人感到棘手。今天,我们将深入解析Java中的线程调度算法、调度策略,探讨线程调度器、时间分片的工作原理,并带你了解常见的线程同步方法。让我们一起破解这些面试难题,提升你的Java并发编程技能!
84 16
|
2月前
|
Java Maven Spring
SpringBoot配置跨模块扫描问题解决方案
在分布式项目中,使用Maven进行多模块开发时,某些模块(如xxx-common)没有启动类。如何将这些模块中的类注册为Spring管理的Bean对象?本文通过案例分析,介绍了两种解决方案:常规方案是通过`@SpringBootApplication(scanBasePackages)`指定扫描路径;推荐方案是保持各模块包结构一致(如com.xxx),利用SpringBoot默认扫描规则自动识别其他模块中的组件,简化配置。
SpringBoot配置跨模块扫描问题解决方案
|
2月前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
188 14