【SpringBoot技术专题】「Async&Future」异步编程机制以及功能分析讲解

简介: 【SpringBoot技术专题】「Async&Future」异步编程机制以及功能分析讲解

本文内容


  • Future 模式介绍以及核心思想
  • 核心线程数、最大线程数的区别,队列容量代表什么;
  • ThreadPoolTaskExecutor 饱和策略;
  • SpringBoot 异步编程实战,搞懂代码的执行逻辑。




Future 模式


  • 异步编程在处理耗时操作以及多任务处理的场景下非常有用,我们可以更好的让我们的系统利用好机器的CPU和内存,提高它们的利用率。


  • 多线程设计模式有很多种,Future模式是多线程开发中较为常见的一种设计模式,本文也是基于这种模式来说明 SpringBoot 对于异步编程的知识。


实战之前我先简单介绍一下Future模式的核心思想吧!。



Future的核心思想


Future模式的核心思想是异步调用 。当我们执行一个方法时,假如这个方法中有多个耗时的任务需要同时去做,而且又不着急等待这个结果时可以让客户端立即返回然后,后台慢慢去计算任务。当然你也可以选择等这些任务都执行完了,再返回给客户端。



SpringBoot 异步编程实战


如果我们需要在Spring/SpringBoot实现异步编程的话,通过Spring提供的两个注解会让这件事情变的非常简单。


  1. @EnableAsync:通过在配置类或者Main类上加@EnableAsync开启对异步方法的支持。
  2. @Async 可以作用在类上或者方法上,作用在类上代表这个类的所有方法都是异步方法。



TaskExecutor


  • 很多人对于TaskExecutor 不是太了解,所以我们花一点篇幅先介绍一下这个东西。从名字就能看出它是任务的执行者,它领导执行着线程来处理任务,就像司令官一样,而我们的线程就好比一只只军队一样,这些军队可以异步对敌人进行打击。
  • Spring提供了TaskExecutor接口作为任务执行者的抽象,它和java.util.concurrent包下的Executor接口很像。稍微不同的 TaskExecutor接口用到了 Java 8 的语法@FunctionalInterface声明这个接口是一个函数式接口。
org.springframework.core.task.TaskExecutor
@FunctionalInterface
public interface TaskExecutor extends Executor {
    void execute(Runnable var1);
}
复制代码


如果没有自定义Executor, Spring 将创建一个 SimpleAsyncTaskExecutor 并使用它。



自定义AsyncConfigurer


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
  private static final int CORE_POOL_SIZE = 6;
  private static final int MAX_POOL_SIZE = 10;
  private static final int QUEUE_CAPACITY = 100;
  @Bean
  public Executor taskExecutor() {
    // Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 核心线程数
    executor.setCorePoolSize(CORE_POOL_SIZE);
    // 最大线程数
    executor.setMaxPoolSize(MAX_POOL_SIZE);
    // 队列大小
    executor.setQueueCapacity(QUEUE_CAPACITY);
    // 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
    executor.initialize();
    return executor;
  }
}
复制代码



ThreadPoolTaskExecutor常见概念


  • Core Pool Size : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • Queue Capacity : 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。
  • Maximum Pool Size : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。


一般情况下不会将队列大小设为:Integer.MAX_VALUE,也不会将核心线程数和最大线程数设为同样的大小,这样的话最大线程数的设置都没什么意义了,你也无法确定当前CPU和内存利用率具体情况如何。


如果队列已满并且当前同时运行的线程数达到最大线程数的时候,如果再有新任务过来会发生什么呢?Spring默认使用的是 ThreadPoolExecutor.AbortPolicy(ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。)


对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy,当最大池被填满时,此策略为我们提供可伸缩队列。



ThreadPoolTaskExecutor 饱和策略定义:


如果当前同时运行的线程数量达到最大线程数量时,ThreadPoolTaskExecutor 定义一些策略:


  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。
  • 另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
  1. 编写一个异步的方法



给这个方法加上了@Async注解来告诉 Spring 它是一个异步的方法。另外,这个方法的返回值CompletableFuture.completedFuture(results)这代表我们需要返回结果,也就是说程序必须把任务执行完成之后再返回给用户。



请留意completableFutureTask方法中的第一行打印日志这句代码,后面分析程序中会用到,很重要!

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@Service
public class AsyncService {
  private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
  private List<String> movies =
      new ArrayList<>(
          Arrays.asList(
              "Forrest Gump",
              "Titanic",
              "Spirited Away",
              "The Shawshank Redemption",
              "Zootopia",
              "Farewell ",
              "Joker",
              "Crawl"));
  /** 示范使用:找到特定字符/字符串开头的电影 */
  @Async
  public CompletableFuture<List<String>> completableFutureTask(String start) {
    // 打印日志
    logger.warn(Thread.currentThread().getName() + "start this task!");
    // 找到特定字符/字符串开头的电影
    List<String> results =
        movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
    // 模拟这是一个耗时的任务
    try {
      Thread.sleep(1000L);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    //返回一个已经用给定值完成的新的CompletableFuture。
    return CompletableFuture.completedFuture(results);
  }
}
复制代码



  1. 测试编写的异步方法
@RestController
@RequestMapping("/async")
public class AsyncController {
  @Autowired 
  AsyncService asyncService;
  @GetMapping("/movies")
  public String completableFutureTask() throws ExecutionException, InterruptedException {
    //开始时间
    long start = System.currentTimeMillis();
    // 开始执行大量的异步任务
    List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
    List<CompletableFuture<List<String>>> completableFutureList = words.stream()
            .map(word -> asyncService.completableFutureTask(word))
            .collect(Collectors.toList());
    // CompletableFuture.join() 方法可以获取他们的结果并将结果连接起来
    List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
    // 打印结果以及运行程序运行花费时间
    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
    return results.toString();
  }
}
复制代码

请求这个接口,控制台打印出下面的内容:

2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task!
Elapsed time: 1010
复制代码


首先我们可以看到处理所有任务花费的时间大概是 1 s。这与我们自定义的

ThreadPoolTaskExecutor 有关,我们配置的核心线程数是 6 ,然后通过通过下面的代码模拟分配了 6 个任务给系统执行。这样每个线程都会被分配到一个任务,每个任务执行花费时间是 1 s ,所以处理 6 个任务的总花费时间是 1 s。


List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");  
List<CompletableFuture<List<String>>> completableFutureList =
        words.stream()
            .map(word -> asyncService.completableFutureTask(word))
            .collect(Collectors.toList());
复制代码

试着去把核心线程数的数量改为 3 ,再次请求这个接口你会发现处理所有任务花费的时间大概是 2 s。




特殊情况 无需返回值

另外,从上面的运行结果可以看出,当所有任务执行完成之后才返回结果。这种情况对应于我们需要返回结果给客户端请求的情况下,假如我们不需要返回任务执行结果给客户端的话呢? 就比如我们上传一个大文件到系统,上传之后只要大文件格式符合要求我们就上传成功。普通情况下我们需要等待文件上传完毕再返回给用户消息,但是这样会很慢。采用异步的话,当用户上传之后就立马返回给用户消息,然后系统再默默去处理上传任务。这样也会增加一点麻烦,因为文件可能会上传失败,所以系统也需要一点机制来补偿这个问题,比如当上传遇到问题的时候,发消息通知用户。

下面会演示一下客户端不需要返回结果的情况:


将completableFutureTask方法变为 void 类型
@Async
public void completableFutureTask(String start) {
  ......
  //这里可能是系统对任务执行结果的处理,比如存入到数据库等等......
  //doSomeThingWithResults(results);
}
复制代码



Controller 代码修改如下:
@GetMapping("/movies")
  public String completableFutureTask() throws ExecutionException, InterruptedException {
    // Start the clock
    long start = System.currentTimeMillis();
    // Kick of multiple, asynchronous lookups
    List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
        words.stream()
            .forEach(word -> asyncService.completableFutureTask(word));
    // Wait until they are all done
    // Print results, including elapsed time
    System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
    return "Done";
  }
复制代码



请求这个接口,控制台打印出下面的内容:

Elapsed time: 0
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task!
复制代码

可以看到系统会直接返回给用户结果,然后系统才真正开始执行任务。



相关文章
|
12天前
|
JavaScript 前端开发 Java
基于SpringBoot+Vue实现前后端交互功能(详解Vue框架机制)
基于SpringBoot+Vue实现前后端交互功能(详解Vue框架机制)
|
12天前
|
Java 测试技术 数据库
基于SpringBoot+HTML实现登录注册功能模块
基于SpringBoot+HTML实现登录注册功能模块
|
3天前
|
SQL Java 关系型数据库
基于SpringBoot使用MyBatisPlus,MyBatisPlus标准数据层开发(CRUD)、MyBatisPlus分页功能的使用
基于SpringBoot使用MyBatisPlus,MyBatisPlus标准数据层开发(CRUD)、MyBatisPlus分页功能的使用
12 2
|
7天前
|
监控 NoSQL Java
java云MES 系统源码Java+ springboot+ mysql 一款基于云计算技术的企业级生产管理系统
MES系统是生产企业对制造执行系统实施的重点在智能制造执行管理领域,而MES系统特点中的可伸缩、信息精确、开放、承接、安全等也传递出:MES在此管理领域中无可替代的“王者之尊”。MES制造执行系统特点集可伸缩性、精确性、开放性、承接性、经济性与安全性于一体,帮助企业解决生产中遇到的实际问题,降低运营成本,快速适应企业不断的制造执行管理需求,使得企业已有基础设施与一切可用资源实现高度集成,提升企业投资的有效性。
49 5
|
12天前
|
开发框架 监控 Java
深入探索Spring Boot的监控、管理和测试功能及实战应用
【5月更文挑战第14天】Spring Boot是一个快速开发框架,提供了一系列的功能模块,包括监控、管理和测试等。本文将深入探讨Spring Boot中监控、管理和测试功能的原理与应用,并提供实际应用场景的示例。
21 2
|
12天前
|
XML Java 开发者
springboot 启动原理、启动过程、启动机制的介绍
【5月更文挑战第13天】Spring Boot 是一种基于 Java 的框架,用于创建独立的、生产级别的 Spring 应用程序。它的主要目标是简化 Spring 应用的初始搭建和开发过程,同时提供一系列大型项目常见的非功能性特征(如嵌入式服务器、安全性、度量、健康检查和外部化配置)。
28 3
|
12天前
|
Java 关系型数据库 MySQL
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术,它不采用正弦载波,而是利用纳秒级的非正弦波窄脉冲传输数据,因此其所占的频谱范围很宽。一套UWB精确定位系统,最高定位精度可达10cm,具有高精度,高动态,高容量,低功耗的应用。
36 0
一套java+ spring boot与vue+ mysql技术开发的UWB高精度工厂人员定位全套系统源码有应用案例
|
12天前
|
存储 数据可视化 安全
Java全套智慧校园系统源码springboot+elmentui +Quartz可视化校园管理平台系统源码 建设智慧校园的5大关键技术
智慧校园指的是以物联网为基础的智慧化的校园工作、学习和生活一体化环境,这个一体化环境以各种应用服务系统为载体,将教学、科研、管理和校园生活进行充分融合。无处不在的网络学习、融合创新的网络科研、透明高效的校务治理、丰富多彩的校园文化、方便周到的校园生活。简而言之,“要做一个安全、稳定、环保、节能的校园。
50 6
|
12天前
|
SQL Java 数据库连接
一文带你快速学会SpringBoot工程下MaBatis对数据的增删改查功能!
在SpringBoot项目中,已配置好Mybatis和Lombok,数据库tb_user有四条初始数据。需求是按ID删除用户。首先在UserMapper接口添加@Delete注解的删除方法,然后在单元测试类中测试此方法,成功删除ID为4的用户。删除方法可选返回影响的记录数,此处用void。参数名在#{...}内可自定义。通过配置mybatis日志在控制台显示SQL操作。
|
12天前
|
Java 容器
SpringBoot使用配置注解开启自动配置功能&整合spring-boot-configuration-processor
SpringBoot使用配置注解开启自动配置功能&整合spring-boot-configuration-processor
21 0