SpringBoot - 优雅的实现【异步编程】

简介: SpringBoot - 优雅的实现【异步编程】



概述

Spring3开始提供了@Async注解,我们只需要在方法上标注此注解,此方法即可实现异步调用。 除此之外, 还得需要一个配置类,通过@EnableAsync 来开启异步功能 。



V1.0 默认的实现

Step1 搞配置类,开启@EnableAsync

我们需要使用@EnableAsync来开启异步任务支持。

@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。

我们这里选择单独搞个配置类

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
}



Step2 搞方法标记 @Async注解

package com.artisan.jobs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2022/3/1 0:42
 * @mark: show me the code , change the world
 */
@Component
@Slf4j
public class AsyncJob {
    @Async 
    public void job1() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job1 cost {} ms", endTime - beginTime);
    }
    @Async 
    public void job2() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job2 cost {} ms", endTime - beginTime);
    }
}



Step3 搞调用

package com.artisan.controller;
import com.artisan.jobs.AsyncJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2022/3/1 0:44
 * @mark: show me the code , change the world
 */
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
    @Autowired
    private AsyncJob asyncJob;
    @RequestMapping("/job")
    public String task() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        // 执行异步任务
        asyncJob.job1();
        asyncJob.job2();
        // 模拟业务耗时
        Thread.sleep(1000);
        long cost = System.currentTimeMillis() - beginTime;
        log.info("main cost {} ms", cost);
        return "Task Cost " + cost + " ms";
    }
}

@Async注解在默认情况下用的是SimpleAsyncTaskExecutor线,不是真正意义上的线程池。

所以,线程名称是 task-1 , task-2, task-3 , task-4…

2022-03-02 22:33:47.007 [http-nio-8080-exec-6] INFO  com.artisan.controller.AsyncController:39 - main cost 1001 ms
2022-03-02 22:33:47.675 [http-nio-8080-exec-2] INFO  com.artisan.controller.AsyncController:39 - main cost 1001 ms
2022-03-02 22:33:48.021 [task-4] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2014 ms
2022-03-02 22:33:48.021 [task-3] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2014 ms
2022-03-02 22:33:48.396 [http-nio-8080-exec-5] INFO  com.artisan.controller.AsyncController:39 - main cost 1015 ms
2022-03-02 22:33:48.678 [task-6] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2004 ms
2022-03-02 22:33:48.678 [task-5] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2004 ms
2022-03-02 22:33:49.004 [http-nio-8080-exec-3] INFO  com.artisan.controller.AsyncController:39 - main cost 1008 ms
2022-03-02 22:33:49.393 [task-8] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2011 ms
2022-03-02 22:33:49.393 [task-7] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2011 ms
2022-03-02 22:33:50.012 [task-9] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2015 ms
2022-03-02 22:33:50.012 [task-10] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2015 ms

可以看到,每次调用都会new一个线程。若系统中不断的创建线程…



Spring提供的线程池

名称 说明
SimpleAsyncTaskExecutor 这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地
ConcurrentTaskExecutor Executor的适配类,不推荐使用。如ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
ThreadPoolTaskScheduler 可以使用cron表达式
ThreadPoolTaskExecutor 推荐。 是对java.util.concurrent.ThreadPoolExecutor的包装



V2.0 实现@Async的自定义线程池

package com.artisan.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @author 小工匠
 * @version 1.0
 * @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
 * 我们这里选择使用单独的配置类AsyncConfiguration。
 * @date 2022/3/1 0:41
 * @mark: show me the code , change the world
 */
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    /**
     * 核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 10;
    /**
     * 缓冲队列大小
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "Async-Service-";
    /**
     * 自定义线程池
     *
     * @return
     */
    @Bean("customAsyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(KEEP_ALIVE_TIME);
        executor.setKeepAliveSeconds(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

其他保持不变, 重启测试


V3.0 多个线程池处理

需求: 不同的业务,使用不同的线程池


多个线程池

package com.artisan.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @author 小工匠
 * @version 1.0
 * @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
 * 我们这里选择使用单独的配置类ThreadPoolTaskConfig
 * @date 2022/3/1 0:41
 * @mark: show me the code , change the world
 */
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    /**
     * 核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 10;
    /**
     * 缓冲队列大小
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "Biz1_Async-Service-";
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX_2= "Biz2_Async-Service-";
    /**
     * 自定义线程池
     *
     * @return
     */
    @Bean("tp1")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(KEEP_ALIVE_TIME);
        executor.setKeepAliveSeconds(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    /**
     * 自定义线程池
     *
     * @return
     */
    @Bean("tp2")
    public ThreadPoolTaskExecutor taskExecutor2() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(KEEP_ALIVE_TIME);
        executor.setKeepAliveSeconds(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX_2);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

配置 多个线程池, 然后 为@Async指定线程池名字即可实现 多个线程池处理

package com.artisan.jobs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2022/3/1 0:42
 * @mark: show me the code , change the world
 */
@Component
@Slf4j
public class AsyncJob {
    @Async("tp1")
    public void job1() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job1 cost {} ms", endTime - beginTime);
    }
    @Async("tp2")
    public void job2() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job2 cost {} ms", endTime - beginTime);
    }
    @Async()
    public void job3() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job3 cost {} ms", endTime - beginTime);
    }
}


默认线程池

@Async()没标注,用哪个?????? 当系统存在多个线程池时,我们也可以配置一个默认线程池 ,配置类让其实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池

package com.artisan.multi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * <p>
 * 实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池
 */
@Configuration
@EnableAsync
@Slf4j
public class DefaultAsyncConfiguration implements AsyncConfigurer {
    /**
     * 核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = 2;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 10;
    /**
     * 缓冲队列大小
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "Default_Async-Service-";
    @Bean(name = "defaultPool")
    public ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(KEEP_ALIVE_TIME);
        taskExecutor.setKeepAliveSeconds(QUEUE_CAPACITY);
        taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
    /**
     * 指定默认线程池
     */
    @Override
    public Executor getAsyncExecutor() {
        return executor();
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> log.error("线程池执行任务发横未知错误,执行方法:{}", method.getName(), ex);
    }
}


验证一把

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController {
    @Autowired
    private AsyncJob asyncJob;
    @RequestMapping("/job")
    public String task() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        // 执行异步任务
        asyncJob.job1();
        asyncJob.job2();
        asyncJob.job3();
        // 模拟业务耗时
        Thread.sleep(1000);
        long cost = System.currentTimeMillis() - beginTime;
        log.info("main cost {} ms", cost);
        return "Task Cost " + cost + " ms";
    }

完美匹配…


源码

https://github.com/yangshangwei/boot2


相关文章
|
11月前
|
缓存 NoSQL Java
SpringBoot 如何实现异步编程,老鸟们都这么玩的!(内含福利)
SpringBoot 如何实现异步编程,老鸟们都这么玩的!(内含福利)
164 0
|
设计模式 Java Spring
【SpringBoot技术专题】「Async&Future」异步编程机制以及功能分析讲解
【SpringBoot技术专题】「Async&Future」异步编程机制以及功能分析讲解
140 0
|
缓存 NoSQL Java
SpringBoot 如何异步编程,老鸟们都这么玩的
首先我们来看看在Spring中为什么要使用异步编程,它能解决什么问题?
444 1
SpringBoot 如何异步编程,老鸟们都这么玩的
|
22天前
|
Java Linux
Springboot 解决linux服务器下获取不到项目Resources下资源
Springboot 解决linux服务器下获取不到项目Resources下资源
|
29天前
|
Java API Spring
SpringBoot项目调用HTTP接口5种方式你了解多少?
SpringBoot项目调用HTTP接口5种方式你了解多少?
85 2
|
29天前
|
前端开发 JavaScript Java
6个SpringBoot 项目拿来就可以学习项目经验接私活
6个SpringBoot 项目拿来就可以学习项目经验接私活
35 0
|
2月前
|
前端开发 Java 关系型数据库
SpringBoot+MyBatis 天猫商城项目
SpringBoot+MyBatis 天猫商城项目
60 1
|
2月前
|
Java Maven 微服务
springboot项目开启远程调试-jar包
springboot项目开启远程调试-jar包
24 0
|
1天前
|
存储 Java 应用服务中间件
Springboot项目打war包部署到外置tomcat容器【详解版】
该文介绍了将Spring Boot应用改为war包并在外部Tomcat中部署的步骤:1) 修改pom.xml打包方式为war;2) 排除内置Tomcat依赖;3) 创建`ServletInitializer`类继承`SpringBootServletInitializer`;4) build部分需指定`finalName`;5) 使用`mvn clean package`打包,将war包放入外部Tomcat的webapps目录,通过startup脚本启动Tomcat并访问应用。注意,应用访问路径和静态资源引用需包含war包名。
|
6天前
|
Java Docker 容器
SpringBoot项目集成XXL-job
SpringBoot项目集成XXL-job