JAVA多线程以及Spring异步注解@Async

简介: JAVA多线程以及Spring异步注解@Async

关于多线程

参考1

Java中可以通过new Thread()来构造线程,但是通过直接new一个线程对象有如下缺点:

  1. 每次new Thread新建对象性能差。
  2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  3. 缺乏更多功能,如定时执行、定期执行、线程中断。
    相比之下,使用Java提供的四种线程池有以下好处:
  4. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  5. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  6. 提供定时执行、定期执行、单线程、并发数控制等功能。

关于线程池

参考2

如何创建线程池

早期创建线程池的方式

在 JDK 1.5 之后推出了相关的 api,常见的创建线程池方式有以下几种:

  • Executors.newCachedThreadPool():无限线程池。
  • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
  • Executors.newSingleThreadExecutor():创建单个线程的线程池。
  • Executors.newScheduledThreadPool():创建周期性执行某个任务的线程池 。
    但是为了更加明确线程池的运行规则,规避资源耗尽的风险,以上早期的jdk中创建的方式,在阿里代码规范中给出红色警告。

规范地使用线程池

更推荐使用java.util.concurrent包下的ThreadPoolExecutor,在其的构造函数中,给出如下参数:

  • int corePoolSize
    核心线程:线程池新建线程的时候,如果当前线程总数小于 corePoolSize ,则新建的是核心线程;如果超过corePoolSize,则新建的是非核心线程。
  • int maximumPoolSize
    该线程池中线程总数的最大值,线程总数 = 核心线程数 + 非核心线程数。
  • BlockingQueue workQueue
    该线程池中的任务队列:维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
  • ThreadFactory threadFactory
    线程的工厂类,用来创建线程。
  • RejectedExecutionHandler handler
    拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

使用时要注意:

  1. 避免使用无界队列
  2. 明确拒绝任务时的行为
  • AbortPolicy 抛出RejectedExecutionException
  • DiscardPolicy 什么也不做,直接忽略
  • DiscardOldestPolicy 丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置
  • CallerRunsPolicy 直接由提交任务者执行这个任务
  1. 获取处理结果和异常
  2. 正确配置线程池参数
  1. 先看下机器的CPU核数,然后在设定具体参数:即CPU核数 = Runtime.getRuntime().availableProcessors()
  2. 分析下线程池处理的程序是CPU密集型,还是IO密集型:
  1. CPU密集型:核心线程数 = CPU核数 + 1
  2. IO密集型:核心线程数 = CPU核数 * 2
  1. 线程池线程任务尽量避免使用死循环调用方式

以上线程池中线程的数量分析的十分宽泛,以下的分析更为妥当:

N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y(网络请求,本地IO等),则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。具体的分析:

参考3 线程数究竟设多少合理

spring线程池配置

参考4 spring线程池配置

  • 方式一: XML定义bean
<!-- 包路径扫描 -->
    <context:component-scan base-package="spring.task"/>
    <!-- Spring线程池 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心线程数 -->
        <property name="corePoolSize" value="5" />
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value="10" />
        <!-- 允许的空闲时间, 默认60秒 -->
        <property name="keepAliveSeconds" value="60" />
        <!-- 任务队列 -->
        <property name="queueCapacity" value="50" />
        <!-- 线程超过空闲时间限制,均会退出直到线程数量为0 -->
        <property name="allowCoreThreadTimeOut" value="true"/>
        <!-- 对拒绝task的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy" />
        </property>
    </bean>
当一个新任务来临时:
1)如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务;
2)如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列;
3)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maxPoolSize,建新的线程来处理被添加的任务;
4)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maxPoolSize,那么通过handler所指定的策略来处理此任务;
5)当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,如果allowCoreThreadTimeOut为false,则线程数量维持在corePoolSize, 如果为true,则线程数量可最低降至0;
  • 方式二:使用task:executor方式
<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:task="http://www.springframework.org/schema/task"
           xsi:schemaLocation=
                   "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                   http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
     <!-- 包路径扫描 -->
    <context:component-scan base-package="spring.task"/>
    <!-- 定义线程池 -->
    <task:executor id="executor" pool-size="5" queue-capacity="10" rejection-policy="DISCARD_OLDEST"/>
</beans>

@Async注解

原先我们在代码中对线程池提交线程时,需要先在类中注入线程池对象,再调用其execute()方法,或submit()方法进行提交,不需要返回或者返回Future对象。而使用@Async注解,则只需要在其类或者方法上使用注解,则其自动会被spring提交到线程池中处理。

参考芋道源码5 Spring 异步调用,一行代码实现!舒服,不接受任何反驳~

作用域

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
    String value() default "";
}

通过其@Target定义可知,可以加在类上,或者方法上。若加在类上,则其所有方法都会以异步的方式执行,这样的粒度如果没设计好,可能就会过粗;若加在方法上,则该方法会被异步调用;

引入依赖

@Async注解在spring-framework中的spring-context工程里。

<!-- 引入 Spring Boot 依赖 间接引入了 spring-context -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

启动Spring异步注解

在启动类中使用注解@EnableAsync

@SpringBootApplication
@EnableAsync // 开启 @Async 的支持
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

无需返回的异步调用

@Override
    @Async
    public void noResultAsynFuncA() {
        long start = printStartTime();
        sleep(5);
        printEndTime(start);
    }

带Future返回的异步调用

  • 应注意,@Async注解若有返回,则尽量是Future<String>,否则在IDE中应有提示

Method annotated with @Async should return “void” or “Future-like” type.

@Override
    @Async
    public Future<String> resultAsynFuncA() {
        return AsyncResult.forValue(this.resultFuncA());
    }

应用ayml配置文件

在 spring.task.execution 配置项,Spring Task 调度任务的配置,对应 TaskExecutionProperties 配置类

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution:
      thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置

异步异常处理器

实现AsyncUncaughtExceptionHandler接口

@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("[handleUncaughtException][method({}) params({}) 发生异常]",
                method, params, ex);
    }
}

注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params) 的源码,可以很容易得到这个结论

异步异常配置类

实现AsyncConfigurer接口

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig implements AsyncConfigurer {
    @Autowired
    private GlobalAsyncExceptionHandler exceptionHandler;
    @Override
    public Executor getAsyncExecutor() {
        return null;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return exceptionHandler;
    }
}
  • 实现 #getAsyncUncaughtExceptionHandler() 方法,返回我们定义的 GlobalAsyncExceptionHandler 对象。
  • 实现 #getAsyncExecutor() 方法,返回 Spring Task 异步任务的默认执行器。这里,我们返回了 null ,并未定义默认执行器。所以最终会使用 TaskExecutionAutoConfiguration 自动化配置类创建出来的 ThreadPoolTaskExecutor 任务执行器,作为默认执行器。

配置多个线程池

实际使用中,不同业务放在不同的执行器中执行,就需要配置多个线程池。

应用yaml配置文件

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-one:
      thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-two:
      thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • 在 spring.task 配置项下,我们新增了 execution-one 和 execution-two 两个执行器的配置。

异步配置类

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig {
    public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
    public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";
    @Configuration
    public static class ExecutorOneConfiguration {
        @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
        @Primary
        @ConfigurationProperties(prefix = "spring.task.execution-one") // 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
            return new TaskExecutionProperties();
        }
        @Bean(name = EXECUTOR_ONE_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }
    }
    @Configuration
    public static class ExecutorTwoConfiguration {
    // ExecutorOneConfiguration相似代码省略
        @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
        @ConfigurationProperties(prefix = "spring.task.execution-two") // 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
        }
        @Bean(name = EXECUTOR_TWO_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        }
    }
    private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties) {
        // Pool 属性
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        // Shutdown 属性
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        // 其它基本属性
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
//        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
//        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }
}

使用线程池

@Service
public class DemoService {
    @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
    public Integer execute01() {
        logger.info("[execute01]");
        return 1;
    }
    @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
    public Integer execute02() {
        logger.info("[execute02]");
        return 2;
    }
}
  • 在 @Async注解中显示声明使用哪一个线程池

总结

  1. 要用好线程池,需要懂其原理,@Async注解只是提供方便。
  2. AsyncUncaughtExceptionHandler全局异步异常处理类不能处理Future<T>,而@Async基本上尽量要求返回void或者Future-like,所以AsyncUncaughtExceptionHandler一般应该是用来处理返回void的场景。
目录
相关文章
|
5天前
|
Java 开发者
Java多线程编程中的常见误区与最佳实践####
本文深入剖析了Java多线程编程中开发者常遇到的几个典型误区,如对`start()`与`run()`方法的混淆使用、忽视线程安全问题、错误处理未同步的共享变量等,并针对这些问题提出了具体的解决方案和最佳实践。通过实例代码对比,直观展示了正确与错误的实现方式,旨在帮助读者构建更加健壮、高效的多线程应用程序。 ####
|
13天前
|
安全 Java 测试技术
Java并行流陷阱:为什么指定线程池可能是个坏主意
本文探讨了Java并行流的使用陷阱,尤其是指定线程池的问题。文章分析了并行流的设计思想,指出了指定线程池的弊端,并提供了使用CompletableFuture等替代方案。同时,介绍了Parallel Collector库在处理阻塞任务时的优势和特点。
|
4天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
|
4天前
|
Java 开发者
Java多线程编程的艺术与实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的技术文档,本文以实战为导向,通过生动的实例和详尽的代码解析,引领读者领略多线程编程的魅力,掌握其在提升应用性能、优化资源利用方面的关键作用。无论你是Java初学者还是有一定经验的开发者,本文都将为你打开多线程编程的新视角。 ####
|
3天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
9天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
32 9
|
6天前
|
安全 Java 开发者
Java多线程编程中的常见问题与解决方案
本文深入探讨了Java多线程编程中常见的问题,包括线程安全问题、死锁、竞态条件等,并提供了相应的解决策略。文章首先介绍了多线程的基础知识,随后详细分析了每个问题的产生原因和典型场景,最后提出了实用的解决方案,旨在帮助开发者提高多线程程序的稳定性和性能。
|
12天前
|
存储 安全 Java
Java多线程编程的艺术:从基础到实践####
本文深入探讨了Java多线程编程的核心概念、应用场景及其实现方式,旨在帮助开发者理解并掌握多线程编程的基本技能。文章首先概述了多线程的重要性和常见挑战,随后详细介绍了Java中创建和管理线程的两种主要方式:继承Thread类与实现Runnable接口。通过实例代码,本文展示了如何正确启动、运行及同步线程,以及如何处理线程间的通信与协作问题。最后,文章总结了多线程编程的最佳实践,为读者在实际项目中应用多线程技术提供了宝贵的参考。 ####
|
9天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
12天前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
27 3
下一篇
无影云桌面