JAVA多线程以及Spring异步注解@Async

简介: JAVA多线程以及Spring异步注解@Async

关于多线程

参考1

Java中可以通过new Thread()来构造线程,但是通过直接new一个线程对象有如下缺点:

  1. 每次new Thread新建对象性能差。
  2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  3. 缺乏更多功能,如定时执行、定期执行、线程中断。
    相比之下,使用Java提供的四种线程池有以下好处:
  4. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  5. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  6. 提供定时执行、定期执行、单线程、并发数控制等功能。

关于线程池

参考2

如何创建线程池

早期创建线程池的方式

在 JDK 1.5 之后推出了相关的 api,常见的创建线程池方式有以下几种:

  • Executors.newCachedThreadPool():无限线程池。
  • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
  • Executors.newSingleThreadExecutor():创建单个线程的线程池。
  • Executors.newScheduledThreadPool():创建周期性执行某个任务的线程池 。
    但是为了更加明确线程池的运行规则,规避资源耗尽的风险,以上早期的jdk中创建的方式,在阿里代码规范中给出红色警告。

规范地使用线程池

更推荐使用java.util.concurrent包下的ThreadPoolExecutor,在其的构造函数中,给出如下参数:

  • int corePoolSize
    核心线程:线程池新建线程的时候,如果当前线程总数小于 corePoolSize ,则新建的是核心线程;如果超过corePoolSize,则新建的是非核心线程。
  • int maximumPoolSize
    该线程池中线程总数的最大值,线程总数 = 核心线程数 + 非核心线程数。
  • BlockingQueue workQueue
    该线程池中的任务队列:维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
  • ThreadFactory threadFactory
    线程的工厂类,用来创建线程。
  • RejectedExecutionHandler handler
    拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

使用时要注意:

  1. 避免使用无界队列
  2. 明确拒绝任务时的行为
  • AbortPolicy 抛出RejectedExecutionException
  • DiscardPolicy 什么也不做,直接忽略
  • DiscardOldestPolicy 丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置
  • CallerRunsPolicy 直接由提交任务者执行这个任务
  1. 获取处理结果和异常
  2. 正确配置线程池参数
  1. 先看下机器的CPU核数,然后在设定具体参数:即CPU核数 = Runtime.getRuntime().availableProcessors()
  2. 分析下线程池处理的程序是CPU密集型,还是IO密集型:
  1. CPU密集型:核心线程数 = CPU核数 + 1
  2. IO密集型:核心线程数 = CPU核数 * 2
  1. 线程池线程任务尽量避免使用死循环调用方式

以上线程池中线程的数量分析的十分宽泛,以下的分析更为妥当:

N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y(网络请求,本地IO等),则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。具体的分析:

参考3 线程数究竟设多少合理

spring线程池配置

参考4 spring线程池配置

  • 方式一: XML定义bean
<!-- 包路径扫描 -->
    <context:component-scan base-package="spring.task"/>
    <!-- Spring线程池 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心线程数 -->
        <property name="corePoolSize" value="5" />
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value="10" />
        <!-- 允许的空闲时间, 默认60秒 -->
        <property name="keepAliveSeconds" value="60" />
        <!-- 任务队列 -->
        <property name="queueCapacity" value="50" />
        <!-- 线程超过空闲时间限制,均会退出直到线程数量为0 -->
        <property name="allowCoreThreadTimeOut" value="true"/>
        <!-- 对拒绝task的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy" />
        </property>
    </bean>
当一个新任务来临时:
1)如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务;
2)如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列;
3)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maxPoolSize,建新的线程来处理被添加的任务;
4)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maxPoolSize,那么通过handler所指定的策略来处理此任务;
5)当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,如果allowCoreThreadTimeOut为false,则线程数量维持在corePoolSize, 如果为true,则线程数量可最低降至0;
  • 方式二:使用task:executor方式
<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:task="http://www.springframework.org/schema/task"
           xsi:schemaLocation=
                   "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                   http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
     <!-- 包路径扫描 -->
    <context:component-scan base-package="spring.task"/>
    <!-- 定义线程池 -->
    <task:executor id="executor" pool-size="5" queue-capacity="10" rejection-policy="DISCARD_OLDEST"/>
</beans>

@Async注解

原先我们在代码中对线程池提交线程时,需要先在类中注入线程池对象,再调用其execute()方法,或submit()方法进行提交,不需要返回或者返回Future对象。而使用@Async注解,则只需要在其类或者方法上使用注解,则其自动会被spring提交到线程池中处理。

参考芋道源码5 Spring 异步调用,一行代码实现!舒服,不接受任何反驳~

作用域

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
    String value() default "";
}

通过其@Target定义可知,可以加在类上,或者方法上。若加在类上,则其所有方法都会以异步的方式执行,这样的粒度如果没设计好,可能就会过粗;若加在方法上,则该方法会被异步调用;

引入依赖

@Async注解在spring-framework中的spring-context工程里。

<!-- 引入 Spring Boot 依赖 间接引入了 spring-context -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

启动Spring异步注解

在启动类中使用注解@EnableAsync

@SpringBootApplication
@EnableAsync // 开启 @Async 的支持
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

无需返回的异步调用

@Override
    @Async
    public void noResultAsynFuncA() {
        long start = printStartTime();
        sleep(5);
        printEndTime(start);
    }

带Future返回的异步调用

  • 应注意,@Async注解若有返回,则尽量是Future<String>,否则在IDE中应有提示

Method annotated with @Async should return “void” or “Future-like” type.

@Override
    @Async
    public Future<String> resultAsynFuncA() {
        return AsyncResult.forValue(this.resultFuncA());
    }

应用ayml配置文件

在 spring.task.execution 配置项,Spring Task 调度任务的配置,对应 TaskExecutionProperties 配置类

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution:
      thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置

异步异常处理器

实现AsyncUncaughtExceptionHandler接口

@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("[handleUncaughtException][method({}) params({}) 发生异常]",
                method, params, ex);
    }
}

注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params) 的源码,可以很容易得到这个结论

异步异常配置类

实现AsyncConfigurer接口

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig implements AsyncConfigurer {
    @Autowired
    private GlobalAsyncExceptionHandler exceptionHandler;
    @Override
    public Executor getAsyncExecutor() {
        return null;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return exceptionHandler;
    }
}
  • 实现 #getAsyncUncaughtExceptionHandler() 方法,返回我们定义的 GlobalAsyncExceptionHandler 对象。
  • 实现 #getAsyncExecutor() 方法,返回 Spring Task 异步任务的默认执行器。这里,我们返回了 null ,并未定义默认执行器。所以最终会使用 TaskExecutionAutoConfiguration 自动化配置类创建出来的 ThreadPoolTaskExecutor 任务执行器,作为默认执行器。

配置多个线程池

实际使用中,不同业务放在不同的执行器中执行,就需要配置多个线程池。

应用yaml配置文件

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-one:
      thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-two:
      thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • 在 spring.task 配置项下,我们新增了 execution-one 和 execution-two 两个执行器的配置。

异步配置类

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig {
    public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
    public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";
    @Configuration
    public static class ExecutorOneConfiguration {
        @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
        @Primary
        @ConfigurationProperties(prefix = "spring.task.execution-one") // 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
            return new TaskExecutionProperties();
        }
        @Bean(name = EXECUTOR_ONE_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }
    }
    @Configuration
    public static class ExecutorTwoConfiguration {
    // ExecutorOneConfiguration相似代码省略
        @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
        @ConfigurationProperties(prefix = "spring.task.execution-two") // 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
        }
        @Bean(name = EXECUTOR_TWO_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        }
    }
    private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties) {
        // Pool 属性
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        // Shutdown 属性
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        // 其它基本属性
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
//        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
//        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }
}

使用线程池

@Service
public class DemoService {
    @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
    public Integer execute01() {
        logger.info("[execute01]");
        return 1;
    }
    @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
    public Integer execute02() {
        logger.info("[execute02]");
        return 2;
    }
}
  • 在 @Async注解中显示声明使用哪一个线程池

总结

  1. 要用好线程池,需要懂其原理,@Async注解只是提供方便。
  2. AsyncUncaughtExceptionHandler全局异步异常处理类不能处理Future<T>,而@Async基本上尽量要求返回void或者Future-like,所以AsyncUncaughtExceptionHandler一般应该是用来处理返回void的场景。
目录
相关文章
|
1天前
|
安全 Java
Java中的并发编程:理解并发性与线程安全
Java作为一种广泛应用的编程语言,在并发编程方面具有显著的优势和特点。本文将探讨Java中的并发编程概念,重点关注并发性与线程安全,并提供一些实用的技巧和建议,帮助开发人员更好地理解和应用Java中的并发机制。
|
1天前
|
Java
Java中的多线程编程:基础知识与实战技巧
【5月更文挑战第6天】多线程编程是Java中的一个重要特性,它允许我们在一个程序中同时执行多个任务。本文将介绍Java多线程的基础知识,包括线程的创建、启动、同步和通信,以及如何在Java中实现多线程编程。通过实例代码和解析,帮助读者深入理解Java多线程编程的概念和应用。
|
1天前
|
Java 编译器 Android开发
Java注解你知多少?
Java注解你知多少?
6 1
|
1天前
|
IDE Java 数据库连接
Lombok注解大全
这些是Lombok中的一些常见注解,它们可以显著减少Java代码中的冗余代码,提高代码的可读性和可维护性。不过,在使用Lombok之前,请确保你的开发环境已经配置好支持Lombok,通常需要安装相应的插件或进行设置以使IDE(如Eclipse、IntelliJ IDEA)能够正确解析Lombok注解。
10 4
|
2天前
|
Java
Java中的多线程编程:基础知识与实践
【5月更文挑战第5天】在现代软件开发中,多线程编程是一个重要的概念,尤其是在Java这样的多平台、高性能的编程语言中。通过多线程,我们可以实现并行处理,提高程序的运行效率。本文将介绍Java中多线程编程的基础知识,包括线程的概念、创建和控制方法,以及一些常见的多线程问题和解决方案。
|
5天前
|
存储 缓存 前端开发
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
20 3
|
5天前
|
Java
JAVA难点包括异常处理、多线程、泛型和反射,以及复杂的分布式系统知识
JAVA难点包括异常处理、多线程、泛型和反射,以及复杂的分布式系统知识。入坑JAVA因它的面向对象特性、平台无关性、强大的标准库和活跃的社区支持。
21 2
|
5天前
|
Java 调度 开发者
Java中的多线程编程:基础与实践
【5月更文挑战第2天】本文将深入探讨Java中的多线程编程,从基础概念到实际应用,为读者提供全面的理解和实践指导。我们将首先介绍线程的基本概念和重要性,然后详细解析Java中实现多线程的两种主要方式:继承Thread类和实现Runnable接口。接着,我们将探讨线程同步的问题,包括synchronized关键字和Lock接口的使用。最后,我们将通过一个实际的生产者-消费者模型来演示多线程编程的实践应用。
|
5天前
|
安全 Java 程序员
Java中的多线程编程:从理论到实践
【5月更文挑战第2天】 在计算机科学中,多线程编程是一项重要的技术,它允许多个任务在同一时间段内并发执行。在Java中,多线程编程是通过创建并管理线程来实现的。本文将深入探讨Java中的多线程编程,包括线程的概念、如何创建和管理线程、以及多线程编程的一些常见问题和解决方案。
17 1
|
8天前
|
监控 安全 Java
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题
【多线程学习】深入探究阻塞队列与生产者消费者模型和线程池常见面试题