JAVA多线程以及Spring异步注解@Async

简介: JAVA多线程以及Spring异步注解@Async

关于多线程

参考1

Java中可以通过new Thread()来构造线程,但是通过直接new一个线程对象有如下缺点:

  1. 每次new Thread新建对象性能差。
  2. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
  3. 缺乏更多功能,如定时执行、定期执行、线程中断。
    相比之下,使用Java提供的四种线程池有以下好处:
  4. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  5. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  6. 提供定时执行、定期执行、单线程、并发数控制等功能。

关于线程池

参考2

如何创建线程池

早期创建线程池的方式

在 JDK 1.5 之后推出了相关的 api,常见的创建线程池方式有以下几种:

  • Executors.newCachedThreadPool():无限线程池。
  • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
  • Executors.newSingleThreadExecutor():创建单个线程的线程池。
  • Executors.newScheduledThreadPool():创建周期性执行某个任务的线程池 。
    但是为了更加明确线程池的运行规则,规避资源耗尽的风险,以上早期的jdk中创建的方式,在阿里代码规范中给出红色警告。

规范地使用线程池

更推荐使用java.util.concurrent包下的ThreadPoolExecutor,在其的构造函数中,给出如下参数:

  • int corePoolSize
    核心线程:线程池新建线程的时候,如果当前线程总数小于 corePoolSize ,则新建的是核心线程;如果超过corePoolSize,则新建的是非核心线程。
  • int maximumPoolSize
    该线程池中线程总数的最大值,线程总数 = 核心线程数 + 非核心线程数。
  • BlockingQueue workQueue
    该线程池中的任务队列:维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
  • ThreadFactory threadFactory
    线程的工厂类,用来创建线程。
  • RejectedExecutionHandler handler
    拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

使用时要注意:

  1. 避免使用无界队列
  2. 明确拒绝任务时的行为
  • AbortPolicy 抛出RejectedExecutionException
  • DiscardPolicy 什么也不做,直接忽略
  • DiscardOldestPolicy 丢弃执行队列中最老的任务,尝试为当前提交的任务腾出位置
  • CallerRunsPolicy 直接由提交任务者执行这个任务
  1. 获取处理结果和异常
  2. 正确配置线程池参数
  1. 先看下机器的CPU核数,然后在设定具体参数:即CPU核数 = Runtime.getRuntime().availableProcessors()
  2. 分析下线程池处理的程序是CPU密集型,还是IO密集型:
  1. CPU密集型:核心线程数 = CPU核数 + 1
  2. IO密集型:核心线程数 = CPU核数 * 2
  1. 线程池线程任务尽量避免使用死循环调用方式

以上线程池中线程的数量分析的十分宽泛,以下的分析更为妥当:

N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y(网络请求,本地IO等),则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。具体的分析:

参考3 线程数究竟设多少合理

spring线程池配置

参考4 spring线程池配置

  • 方式一: XML定义bean
<!-- 包路径扫描 -->
    <context:component-scan base-package="spring.task"/>
    <!-- Spring线程池 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心线程数 -->
        <property name="corePoolSize" value="5" />
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value="10" />
        <!-- 允许的空闲时间, 默认60秒 -->
        <property name="keepAliveSeconds" value="60" />
        <!-- 任务队列 -->
        <property name="queueCapacity" value="50" />
        <!-- 线程超过空闲时间限制,均会退出直到线程数量为0 -->
        <property name="allowCoreThreadTimeOut" value="true"/>
        <!-- 对拒绝task的处理策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy" />
        </property>
    </bean>
当一个新任务来临时:
1)如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务;
2)如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列;
3)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maxPoolSize,建新的线程来处理被添加的任务;
4)如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maxPoolSize,那么通过handler所指定的策略来处理此任务;
5)当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,如果allowCoreThreadTimeOut为false,则线程数量维持在corePoolSize, 如果为true,则线程数量可最低降至0;
  • 方式二:使用task:executor方式
<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:task="http://www.springframework.org/schema/task"
           xsi:schemaLocation=
                   "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                   http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
     <!-- 包路径扫描 -->
    <context:component-scan base-package="spring.task"/>
    <!-- 定义线程池 -->
    <task:executor id="executor" pool-size="5" queue-capacity="10" rejection-policy="DISCARD_OLDEST"/>
</beans>

@Async注解

原先我们在代码中对线程池提交线程时,需要先在类中注入线程池对象,再调用其execute()方法,或submit()方法进行提交,不需要返回或者返回Future对象。而使用@Async注解,则只需要在其类或者方法上使用注解,则其自动会被spring提交到线程池中处理。

参考芋道源码5 Spring 异步调用,一行代码实现!舒服,不接受任何反驳~

作用域

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
    String value() default "";
}

通过其@Target定义可知,可以加在类上,或者方法上。若加在类上,则其所有方法都会以异步的方式执行,这样的粒度如果没设计好,可能就会过粗;若加在方法上,则该方法会被异步调用;

引入依赖

@Async注解在spring-framework中的spring-context工程里。

<!-- 引入 Spring Boot 依赖 间接引入了 spring-context -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

启动Spring异步注解

在启动类中使用注解@EnableAsync

@SpringBootApplication
@EnableAsync // 开启 @Async 的支持
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

无需返回的异步调用

@Override
    @Async
    public void noResultAsynFuncA() {
        long start = printStartTime();
        sleep(5);
        printEndTime(start);
    }

带Future返回的异步调用

  • 应注意,@Async注解若有返回,则尽量是Future<String>,否则在IDE中应有提示

Method annotated with @Async should return “void” or “Future-like” type.

@Override
    @Async
    public Future<String> resultAsynFuncA() {
        return AsyncResult.forValue(this.resultFuncA());
    }

应用ayml配置文件

在 spring.task.execution 配置项,Spring Task 调度任务的配置,对应 TaskExecutionProperties 配置类

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution:
      thread-name-prefix: task- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置

异步异常处理器

实现AsyncUncaughtExceptionHandler接口

@Component
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        logger.error("[handleUncaughtException][method({}) params({}) 发生异常]",
                method, params, ex);
    }
}

注意,AsyncUncaughtExceptionHandler 只能拦截返回类型非 Future 的异步调用方法。通过看 AsyncExecutionAspectSupport#handleError(Throwable ex, Method method, Object... params) 的源码,可以很容易得到这个结论

异步异常配置类

实现AsyncConfigurer接口

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig implements AsyncConfigurer {
    @Autowired
    private GlobalAsyncExceptionHandler exceptionHandler;
    @Override
    public Executor getAsyncExecutor() {
        return null;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return exceptionHandler;
    }
}
  • 实现 #getAsyncUncaughtExceptionHandler() 方法,返回我们定义的 GlobalAsyncExceptionHandler 对象。
  • 实现 #getAsyncExecutor() 方法,返回 Spring Task 异步任务的默认执行器。这里,我们返回了 null ,并未定义默认执行器。所以最终会使用 TaskExecutionAutoConfiguration 自动化配置类创建出来的 ThreadPoolTaskExecutor 任务执行器,作为默认执行器。

配置多个线程池

实际使用中,不同业务放在不同的执行器中执行,就需要配置多个线程池。

应用yaml配置文件

spring:
  task:
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-one:
      thread-name-prefix: task-one- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
    # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。
    execution-two:
      thread-name-prefix: task-two- # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置
      pool: # 线程池相关
        core-size: 8 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。
        max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE
        keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒
        queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。
        allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。
      shutdown:
        await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
        await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置
  • 在 spring.task 配置项下,我们新增了 execution-one 和 execution-two 两个执行器的配置。

异步配置类

@Configuration
@EnableAsync // 开启 @Async 的支持
public class AsyncConfig {
    public static final String EXECUTOR_ONE_BEAN_NAME = "executor-one";
    public static final String EXECUTOR_TWO_BEAN_NAME = "executor-two";
    @Configuration
    public static class ExecutorOneConfiguration {
        @Bean(name = EXECUTOR_ONE_BEAN_NAME + "-properties")
        @Primary
        @ConfigurationProperties(prefix = "spring.task.execution-one") // 读取 spring.task.execution-one 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
            return new TaskExecutionProperties();
        }
        @Bean(name = EXECUTOR_ONE_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
            // 创建 TaskExecutorBuilder 对象
            TaskExecutorBuilder builder = createTskExecutorBuilder(this.taskExecutionProperties());
            // 创建 ThreadPoolTaskExecutor 对象
            return builder.build();
        }
    }
    @Configuration
    public static class ExecutorTwoConfiguration {
    // ExecutorOneConfiguration相似代码省略
        @Bean(name = EXECUTOR_TWO_BEAN_NAME + "-properties")
        @ConfigurationProperties(prefix = "spring.task.execution-two") // 读取 spring.task.execution-two 配置到 TaskExecutionProperties 对象
        public TaskExecutionProperties taskExecutionProperties() {
        }
        @Bean(name = EXECUTOR_TWO_BEAN_NAME)
        public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        }
    }
    private static TaskExecutorBuilder createTskExecutorBuilder(TaskExecutionProperties properties) {
        // Pool 属性
        TaskExecutionProperties.Pool pool = properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        // Shutdown 属性
        TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
        builder = builder.awaitTermination(shutdown.isAwaitTermination());
        builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
        // 其它基本属性
        builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
//        builder = builder.customizers(taskExecutorCustomizers.orderedStream()::iterator);
//        builder = builder.taskDecorator(taskDecorator.getIfUnique());
        return builder;
    }
}

使用线程池

@Service
public class DemoService {
    @Async(AsyncConfig.EXECUTOR_ONE_BEAN_NAME)
    public Integer execute01() {
        logger.info("[execute01]");
        return 1;
    }
    @Async(AsyncConfig.EXECUTOR_TWO_BEAN_NAME)
    public Integer execute02() {
        logger.info("[execute02]");
        return 2;
    }
}
  • 在 @Async注解中显示声明使用哪一个线程池

总结

  1. 要用好线程池,需要懂其原理,@Async注解只是提供方便。
  2. AsyncUncaughtExceptionHandler全局异步异常处理类不能处理Future<T>,而@Async基本上尽量要求返回void或者Future-like,所以AsyncUncaughtExceptionHandler一般应该是用来处理返回void的场景。
目录
相关文章
|
2月前
|
XML Java 数据格式
SpringBoot入门(8) - 开发中还有哪些常用注解
SpringBoot入门(8) - 开发中还有哪些常用注解
56 0
|
27天前
|
XML Java 编译器
Java注解的底层源码剖析与技术认识
Java注解(Annotation)是Java 5引入的一种新特性,它提供了一种在代码中添加元数据(Metadata)的方式。注解本身并不是代码的一部分,它们不会直接影响代码的执行,但可以在编译、类加载和运行时被读取和处理。注解为开发者提供了一种以非侵入性的方式为代码提供额外信息的手段,这些信息可以用于生成文档、编译时检查、运行时处理等。
62 7
|
8天前
|
Java Spring
【Spring】方法注解@Bean,配置类扫描路径
@Bean方法注解,如何在同一个类下面定义多个Bean对象,配置扫描路径
132 73
|
3天前
|
Java Spring 容器
【SpringFramework】Spring IoC-基于注解的实现
本文主要记录基于Spring注解实现IoC容器和DI相关知识。
35 21
|
8天前
|
存储 Java Spring
【Spring】获取Bean对象需要哪些注解
@Conntroller,@Service,@Repository,@Component,@Configuration,关于Bean对象的五个常用注解
|
8天前
|
Java Spring
【Spring配置】idea编码格式导致注解汉字无法保存
问题一:对于同一个项目,我们在使用idea的过程中,使用汉字注解完后,再打开该项目,汉字变成乱码问题二:本来a项目中,汉字注解调试好了,没有乱码了,但是创建出来的新的项目,写的注解又成乱码了。
|
1月前
|
Java 开发者 微服务
Spring Boot 入门:简化 Java Web 开发的强大工具
Spring Boot 是一个开源的 Java 基础框架,用于创建独立、生产级别的基于Spring框架的应用程序。它旨在简化Spring应用的初始搭建以及开发过程。
57 6
Spring Boot 入门:简化 Java Web 开发的强大工具
|
1月前
|
Java 编译器 数据库
Java 中的注解(Annotations):代码中的 “元数据” 魔法
Java注解是代码中的“元数据”标签,不直接参与业务逻辑,但在编译或运行时提供重要信息。本文介绍了注解的基础语法、内置注解的应用场景,以及如何自定义注解和结合AOP技术实现方法执行日志记录,展示了注解在提升代码质量、简化开发流程和增强程序功能方面的强大作用。
76 5
|
2月前
|
前端开发 Java Spring
Spring MVC核心:深入理解@RequestMapping注解
在Spring MVC框架中,`@RequestMapping`注解是实现请求映射的核心,它将HTTP请求映射到控制器的处理方法上。本文将深入探讨`@RequestMapping`注解的各个方面,包括其注解的使用方法、如何与Spring MVC的其他组件协同工作,以及在实际开发中的应用案例。
47 4
|
2月前
|
前端开发 Java 开发者
Spring MVC中的请求映射:@RequestMapping注解深度解析
在Spring MVC框架中,`@RequestMapping`注解是实现请求映射的关键,它将HTTP请求映射到相应的处理器方法上。本文将深入探讨`@RequestMapping`注解的工作原理、使用方法以及最佳实践,为开发者提供一份详尽的技术干货。
134 2