Spring中ThreadPoolTaskExecutor的线程调度及问题

简介:
  • 问题现象

  • 原因分析

  • 任务调度逻辑

  • 汇总分析

  • 解决方案


问题现象

在我们的系统中,使用了这样的配置来开启异步操作:

spring配置

<task:annotation-driven executor="executor"

    scheduler="scheduler" />

<task:executor id="executor" pool-size="16-128"

    keep-alive="60" rejection-policy="CALLER_RUNS" queue-capacity="1000" />

客户端开启异步代码

@Async()

public Future<Result4Calculate> calculateByLendId(intdid) {

    // 标记1

    // 调用REST服务;监控调用时间。

}

获取Future后的处理

try {

    keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);

    // 后续处理

catch (Exception e) {

    // 标记2

    // 异常报警

}

 

然而在这种配置下,客户端在标记1处监控到的调用时间普遍在4s以内(平均时间不到1s,个别峰值请求会突破5s,全天超过5s的请求不到10个)。然而,在标记2处捕获到的超时异常却非常多(一天接近700+)。

问题出在哪儿?


 

原因分析

上述问题相关代码的调用时序如下图所示。

https://www.processon.com/view/link/585d381ee4b02e6c0ac86d66spacer.gif


其中,rest client 与rest server间的交互时间可以明确监控到,用时超过5s的非常少。但是,get方法却经常抛出超时异常。经过初步分析,问题出现在ThreadPoolTaskExecutor的任务调度过程中。


 

任务调度逻辑

使用<task:executor>注解得到的bean是ThreadPoolTaskExecutor的实例。这个类本身并不做调度,而是将调度工作委托给了ThreadPoolExecutor。后者的任务调度代码如下:

 

ThreadPoolExecutor任务调度代码

/**

 * Executes the given task sometime in the future.  The task

 * may execute in a new thread or in an existing pooled thread.

 *

 * If the task cannot be submitted for execution, either because this

 * executor has been shutdown or because its capacity has been reached,

 * the task is handled by the current {@code RejectedExecutionHandler}.

 *

 * @param command the task to execute

 * @throws RejectedExecutionException at discretion of

 *         {@code RejectedExecutionHandler}, if the task

 *         cannot be accepted for execution

 * @throws NullPointerException if {@code command} is null

 */

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    /*

     * Proceed in 3 steps:

     *

     * 1. If fewer than corePoolSize threads are running, try to

     * start a new thread with the given command as its first

     * task.  The call to addWorker atomically checks runState and

     * workerCount, and so prevents false alarms that would add

     * threads when it shouldn't, by returning false.

     *

     * 2. If a task can be successfully queued, then we still need

     * to double-check whether we should have added a thread

     * (because existing ones died since last checking) or that

     * the pool shut down since entry into this method. So we

     * recheck state and if necessary roll back the enqueuing if

     * stopped, or start a new thread if there are none.

     *

     * 3. If we cannot queue task, then we try to add a new

     * thread.  If it fails, we know we are shut down or saturated

     * and so reject the task.

     */

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(nullfalse);

    }

    else if (!addWorker(command, false))

        reject(command);

}

通过其中的注释,我们可以知道它的核心调度逻辑如下(省略了一些检查等方法):

  1. 如果正在运行的线程数量小于corePoolSize(最小线程数),则尝试启动一个新线程,并将当入参command作为该线程的第一个task。否则进入步骤二。

  2. 如果没有按步骤1执行,那么尝试把入参command放入workQueue中。如果能成功入队,做后续处理;否则,进入步骤三。

  3. 如果没有按步骤2执行,那么将尝试创建一个新线程,然后做后续处理。

 

简单的说,当向ThreadPoolExecutor提交一个任务时,它会优先交给线程池中的现有线程;如果暂时没有可用的线程,那么它会将任务放到队列中;一般只有在队列满了的时候(导致无法成功入队),才会创建新线程来处理队列中的任务。

顺带一说,任务入队后,在某些条件下也会创建新线程。但新线程不会立即执行当前任务,而是从队列中获取一个任务并开始执行。


 

汇总分析

综上所述,我们可以确定以下信息:

  1. 根据系统配置,ThreadPoolExecutor中的corePoolSize = 16。


  2. 当并发数超过16时,ThreadPoolExecutor会按照步骤二进行任务调度,即把任务放入队列中,但没有及时创建新线程来执行这个任务

    这一点是推测。但同时,通过日志中的线程名称确认的线程池内线程数量没有增长。日志中,异步线程的id从executor-1、executor-2一直到executor-16,但17及以上的都没有出现过。


  3. 队列中的任务出现积压、时间累积,导致某一个任务超时后,后续大量任务都超时。但是超时并没有阻止任务执行;任务仍然会继续通过rest client调用rest server,并被监控代码记录下时间。
    任务在队列中积压、累积,是引发一天数百次异常、报警的原因。而监控代码并未监控到任务调度的时间,因此都没有出现超时。

spacer.gif


 

解决方案

初步考虑方案有三:

  1. 提高初始线程数。
    提高步并发的初始线程数(如将16-128调整为32-128)。以此减少新任务进入队列的几率。
    但是这个方案只是降低队列积压的风险,并不解决问题。


  2. 关闭队列。
    将队列大小调整为0,以此保证每一个新任务都有一个新线程来执行。
    这个方案的问题在于,并发压力大时,可能导致线程不够用。此时的异步调用会根据rejection-policy="CALLER_RUNS"的配置而变为同步调用。


  3. 更换线程池。
    使用优先创建新线程(而非优先入队列)的线程池。
    改动最大的方案。



本文转自 斯然在天边 51CTO博客,原文链接:http://blog.51cto.com/winters1224/1885666,如需转载请自行联系原作者
相关文章
|
5月前
|
监控 Java BI
《深入理解Spring》定时任务——自动化调度的时间管理者
Spring定时任务通过@Scheduled注解和Cron表达式实现灵活调度,支持固定频率、延迟执行及动态配置,结合线程池与异常处理可提升可靠性,适用于报表生成、健康检查等场景,助力企业级应用自动化。
|
6月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
520 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
472 1
|
算法 Unix Linux
linux线程调度策略
linux线程调度策略
463 0
|
算法 安全 Java
Java线程调度揭秘:从算法到策略,让你面试稳赢!
在社招面试中,关于线程调度和同步的相关问题常常让人感到棘手。今天,我们将深入解析Java中的线程调度算法、调度策略,探讨线程调度器、时间分片的工作原理,并带你了解常见的线程同步方法。让我们一起破解这些面试难题,提升你的Java并发编程技能!
545 16
|
Java Spring
spring多线程实现+合理设置最大线程数和核心线程数
本文介绍了手动设置线程池时的最大线程数和核心线程数配置方法,建议根据CPU核数及程序类型(CPU密集型或IO密集型)来合理设定。对于IO密集型,核心线程数设为CPU核数的两倍;CPU密集型则设为CPU核数加一。此外,还讨论了`maxPoolSize`、`keepAliveTime`、`allowCoreThreadTimeout`和`queueCapacity`等参数的设置策略,以确保线程池高效稳定运行。
2411 11
spring多线程实现+合理设置最大线程数和核心线程数
|
安全 Java 开发者
Spring容器中的bean是线程安全的吗?
Spring容器中的bean默认为单例模式,多线程环境下若操作共享成员变量,易引发线程安全问题。Spring未对单例bean做线程安全处理,需开发者自行解决。通常,Spring bean(如Controller、Service、Dao)无状态变化,故多为线程安全。若涉及线程安全问题,可通过编码或设置bean作用域为prototype解决。
353 1
|
开发框架 Java .NET
.net core 非阻塞的异步编程 及 线程调度过程
【11月更文挑战第12天】本文介绍了.NET Core中的非阻塞异步编程,包括其基本概念、实现方式及应用示例。通过`async`和`await`关键字,程序可在等待I/O操作时保持线程不被阻塞,提高性能。文章还详细说明了异步方法的基础示例、线程调度过程、延续任务机制、同步上下文的作用以及如何使用`Task.WhenAll`和`Task.WhenAny`处理多个异步任务的并发执行。
363 1
|
存储 Java 数据处理
进程中的线程调度
进程是应用程序运行的基本单位,包括主线程、用户线程和守护线程。计算机由存储器和处理器协同操作,操作系统设计为分时和分任务模式。在个人PC普及后,基于用户的时间片异步任务操作系统确保了更好的体验和性能。线程作为进程的调度单元,通过覆写`Thread`类的`run`方法来处理任务数据,并由系统调度框架统一管理。微服务架构进一步将应用分解为多个子服务,在不同节点上执行,提高数据处理效率与容错性,特别是在大规模数据存储和处理中表现显著。例如,利用微服务框架可以优化算法,加速业务逻辑处理,并在不同区块间分配海量数据存储任务。
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
420 0

热门文章

最新文章