【小家Spring】高性能关键技术之---体验Spring MVC的异步模式(Callable、WebAsyncTask、DeferredResult) 基础使用篇(中)

简介: 【小家Spring】高性能关键技术之---体验Spring MVC的异步模式(Callable、WebAsyncTask、DeferredResult) 基础使用篇(中)

下面我们简单看看WebAsyncTask的源码,非常简单,就是个包装:


public class WebAsyncTask<V> implements BeanFactoryAware {
  // 正常执行的函数(通过WebAsyncTask的构造函数可以传进来)
  private final Callable<V> callable;
  // 处理超时时间(ms),可通过构造函数指定,也可以不指定(不会有超时处理)
  private Long timeout;
  // 执行任务的执行器。可以构造函数设置进来,手动指定。
  private AsyncTaskExecutor executor;
  // 若设置了,会根据此名称去IoC容器里找这个Bean (和上面二选一)  
  // 若传了executorName,请务必调用set方法设置beanFactory
  private String executorName;
  private BeanFactory beanFactory;
  // 超时的回调
  private Callable<V> timeoutCallback;
  // 发生错误的回调
  private Callable<V> errorCallback;
  // 完成的回调(不管超时还是错误都会执行)
  private Runnable completionCallback;
  ...
  // 这是获取执行器的逻辑
  @Nullable
  public AsyncTaskExecutor getExecutor() {
    if (this.executor != null) {
      return this.executor;
    } else if (this.executorName != null) {
      Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name");
      return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class);
    } else {
      return null;
    }
  }
  public void onTimeout(Callable<V> callback) {
    this.timeoutCallback = callback;
  }
  public void onError(Callable<V> callback) {
    this.errorCallback = callback;
  }
  public void onCompletion(Runnable callback) {
    this.completionCallback = callback;
  }
  // 最终执行超时回调、错误回调、完成回调都是通过这个拦截器实现的
  CallableProcessingInterceptor getInterceptor() {
    return new CallableProcessingInterceptor() {
      @Override
      public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
        return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
      }
      @Override
      public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
        return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE);
      }
      @Override
      public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
        if (completionCallback != null) {
          completionCallback.run();
        }
      }
    };
  }
}


WebAsyncTask 的异步编程 API。相比于 @Async 注解,WebAsyncTask 提供更加健全的 超时处理 和 异常处理 支持。但是@Async也有更优秀的地方,就是他不仅仅能用于controller中~~~~(任意地方)


DeferredResult案例:


DeferredResult使用方式与Callable类似,但在返回结果上不一样,它返回的时候实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去。


这个特性非常非常的重要,对后面实现复杂的功能(比如服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等等高级应用)


官方给的Demo如下:

image.png


自己写个非常粗糙的Demo:


@Controller
@RequestMapping("/async/controller")
public class AsyncHelloController {
    private List<DeferredResult<String>> deferredResultList = new ArrayList<>();
    @ResponseBody
    @GetMapping("/hello")
    public DeferredResult<String> helloGet() throws Exception {
        DeferredResult<String> deferredResult = new DeferredResult<>();
        //先存起来,等待触发
        deferredResultList.add(deferredResult);
        return deferredResult;
    }
    @ResponseBody
    @GetMapping("/setHelloToAll")
    public void helloSet() throws Exception {
        // 让所有hold住的请求给与响应
        deferredResultList.forEach(d -> d.setResult("say hello to all"));
    }
}


我们第一个请求/hello,会先deferredResult存起来,然后前端页面是一直等待(转圈状态)的。知道我发第二个请求:setHelloToAll,所有的相关页面才会有响应~~


执行过程


官方:

image.png


1.controller 返回一个DeferredResult,我们把它保存到内存里或者List里面(供后续访问)


2.Spring MVC调用request.startAsync(),开启异步处理


3.与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态


4.应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult set值。然后Spring MVC会把这个请求再次派发给servlet容器


5.DispatcherServlet再次被调用,然后处理后续的标准流程


简单看看源码:

public class DeferredResult<T> {
  private static final Object RESULT_NONE = new Object()
  // 超时时间(ms) 可以不配置
  @Nullable
  private final Long timeout;
  // 相当于超时的话的,传给回调函数的值
  private final Object timeoutResult;
  // 这三种回调也都是支持的
  private Runnable timeoutCallback;
  private Consumer<Throwable> errorCallback;
  private Runnable completionCallback;
  // 这个比较强大,就是能把我们结果再交给这个自定义的函数处理了 他是个@FunctionalInterface
  private DeferredResultHandler resultHandler;
  private volatile Object result = RESULT_NONE;
  private volatile boolean expired = false;
  // 判断这个DeferredResult是否已经被set过了(被set过的对象,就可以移除了嘛)
  // 如果expired表示已经过期了你还没set,也是返回false的
  // Spring4.0之后提供的
  public final boolean isSetOrExpired() {
    return (this.result != RESULT_NONE || this.expired);
  }
  // 没有isSetOrExpired 强大,建议使用上面那个
  public boolean hasResult() {
    return (this.result != RESULT_NONE);
  }
  // 还可以获得set进去的结果
  @Nullable
  public Object getResult() {
    Object resultToCheck = this.result;
    return (resultToCheck != RESULT_NONE ? resultToCheck : null);
  }
  public void onTimeout(Runnable callback) {
    this.timeoutCallback = callback;
  }
  public void onError(Consumer<Throwable> callback) {
    this.errorCallback = callback;
  }
  public void onCompletion(Runnable callback) {
    this.completionCallback = callback;
  }
  // 如果你的result还需要处理,可以这是一个resultHandler,会对你设置进去的结果进行处理
  public final void setResultHandler(DeferredResultHandler resultHandler) {
    Assert.notNull(resultHandler, "DeferredResultHandler is required");
    // Immediate expiration check outside of the result lock
    if (this.expired) {
      return;
    }
    Object resultToHandle;
    synchronized (this) {
      // Got the lock in the meantime: double-check expiration status
      if (this.expired) {
        return;
      }
      resultToHandle = this.result;
      if (resultToHandle == RESULT_NONE) {
        // No result yet: store handler for processing once it comes in
        this.resultHandler = resultHandler;
        return;
      }
    }
    try {
      resultHandler.handleResult(resultToHandle);
    } catch (Throwable ex) {
      logger.debug("Failed to handle existing result", ex);
    }
  }
  // 我们发现,这里调用是private方法setResultInternal,我们设置进来的结果result,会经过它的处理
  // 而它的处理逻辑也很简单,如果我们提供了resultHandler,它会把这个值进一步的交给我们的resultHandler处理
  // 若我们没有提供此resultHandler,那就保存下这个result即可
  public boolean setResult(T result) {
    return setResultInternal(result);
  }
  private boolean setResultInternal(Object result) {
    // Immediate expiration check outside of the result lock
    if (isSetOrExpired()) {
      return false;
    }
    DeferredResultHandler resultHandlerToUse;
    synchronized (this) {
      // Got the lock in the meantime: double-check expiration status
      if (isSetOrExpired()) {
        return false;
      }
      // At this point, we got a new result to process
      this.result = result;
      resultHandlerToUse = this.resultHandler;
      if (resultHandlerToUse == null) {
        this.resultHandler = null;
      }
    }
    resultHandlerToUse.handleResult(result);
    return true;
  }
  // 发生错误了,也可以设置一个值。这个result会被记下来,当作result
  // 注意这个和setResult的唯一区别,这里入参是Object类型,而setResult只能set规定的指定类型
  // 定义成Obj是有原因的:因为我们一般会把Exception等异常对象放进来。。。
  public boolean setErrorResult(Object result) {
    return setResultInternal(result);
  }
  // 拦截器 注意最终finally里面,都可能会调用我们的自己的处理器resultHandler(若存在的话)
  // afterCompletion不会调用resultHandler~~~~~~~~~~~~~
  final DeferredResultProcessingInterceptor getInterceptor() {
    return new DeferredResultProcessingInterceptor() {
      @Override
      public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) {
        boolean continueProcessing = true;
        try {
          if (timeoutCallback != null) {
            timeoutCallback.run();
          }
        } finally {
          if (timeoutResult != RESULT_NONE) {
            continueProcessing = false;
            try {
              setResultInternal(timeoutResult);
            } catch (Throwable ex) {
              logger.debug("Failed to handle timeout result", ex);
            }
          }
        }
        return continueProcessing;
      }
      @Override
      public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) {
        try {
          if (errorCallback != null) {
            errorCallback.accept(t);
          }
        } finally {
          try {
            setResultInternal(t);
          } catch (Throwable ex) {
            logger.debug("Failed to handle error result", ex);
          }
        }
        return false;
      }
      @Override
      public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) {
        expired = true;
        if (completionCallback != null) {
          completionCallback.run();
        }
      }
    };
  }
  // 内部函数式接口 DeferredResultHandler
  @FunctionalInterface
  public interface DeferredResultHandler {
    void handleResult(Object result);
  }
}


DeferredResult的超时处理,采用委托机制,也就是在实例DeferredResult时给予一个超时时长(毫秒),同时在onTimeout中委托(传入)一个新的处理线程(我们可以认为是超时线程);当超时时间到来,DeferredResult启动超时线程,超时线程处理业务,封装返回数据,给DeferredResult赋值(正确返回的或错误返回的)

相关文章
|
人工智能 Java Serverless
【MCP教程系列】搭建基于 Spring AI 的 SSE 模式 MCP 服务并自定义部署至阿里云百炼
本文详细介绍了如何基于Spring AI搭建支持SSE模式的MCP服务,并成功集成至阿里云百炼大模型平台。通过四个步骤实现从零到Agent的构建,包括项目创建、工具开发、服务测试与部署。文章还提供了具体代码示例和操作截图,帮助读者快速上手。最终,将自定义SSE MCP服务集成到百炼平台,完成智能体应用的创建与测试。适合希望了解SSE实时交互及大模型集成的开发者参考。
14139 60
|
6月前
|
SQL Java 数据库连接
Spring Data JPA 技术深度解析与应用指南
本文档全面介绍 Spring Data JPA 的核心概念、技术原理和实际应用。作为 Spring 生态系统中数据访问层的关键组件,Spring Data JPA 极大简化了 Java 持久层开发。本文将深入探讨其架构设计、核心接口、查询派生机制、事务管理以及与 Spring 框架的集成方式,并通过实际示例展示如何高效地使用这一技术。本文档约1500字,适合有一定 Spring 和 JPA 基础的开发者阅读。
630 0
|
7月前
|
前端开发 Java API
利用 Spring WebFlux 技术打造高效非阻塞 API 的完整开发方案与实践技巧
本文介绍了如何使用Spring WebFlux构建高效、可扩展的非阻塞API,涵盖响应式编程核心概念、技术方案设计及具体实现示例,适用于高并发场景下的API开发。
550 0
|
5月前
|
前端开发 Java 微服务
《深入理解Spring》:Spring、Spring MVC与Spring Boot的深度解析
Spring Framework是Java生态的基石,提供IoC、AOP等核心功能;Spring MVC基于其构建,实现Web层MVC架构;Spring Boot则通过自动配置和内嵌服务器,极大简化了开发与部署。三者层层演进,Spring Boot并非替代,而是对前者的高效封装与增强,适用于微服务与快速开发,而深入理解Spring Framework有助于更好驾驭整体技术栈。
|
6月前
|
消息中间件 存储 Java
RabbitMQ 和 Spring Cloud Stream 实现异步通信
本文介绍了在微服务架构中,如何利用 RabbitMQ 作为消息代理,并结合 Spring Cloud Stream 实现高效的异步通信。内容涵盖异步通信的优势、RabbitMQ 的核心概念与特性、Spring Cloud Stream 的功能及其与 RabbitMQ 的集成方式。通过这种组合,开发者可以构建出具备高可用性、可扩展性和弹性的分布式系统,满足现代应用对快速响应和可靠消息传递的需求。
355 2
RabbitMQ 和 Spring Cloud Stream 实现异步通信
|
6月前
|
监控 安全 Java
Spring Cloud 微服务治理技术详解与实践指南
本文档全面介绍 Spring Cloud 微服务治理框架的核心组件、架构设计和实践应用。作为 Spring 生态系统中构建分布式系统的标准工具箱,Spring Cloud 提供了一套完整的微服务解决方案,涵盖服务发现、配置管理、负载均衡、熔断器等关键功能。本文将深入探讨其核心组件的工作原理、集成方式以及在实际项目中的最佳实践,帮助开发者构建高可用、可扩展的分布式系统。
383 1
|
6月前
|
监控 Kubernetes Cloud Native
Spring Batch 批处理框架技术详解与实践指南
本文档全面介绍 Spring Batch 批处理框架的核心架构、关键组件和实际应用场景。作为 Spring 生态系统中专门处理大规模数据批处理的框架,Spring Batch 为企业级批处理作业提供了可靠的解决方案。本文将深入探讨其作业流程、组件模型、错误处理机制、性能优化策略以及与现代云原生环境的集成方式,帮助开发者构建高效、稳定的批处理系统。
667 1
|
6月前
|
Java 数据库连接 开发者
Spring Framework 核心技术详解
本文档旨在深入解析 Java Spring Framework 的核心技术原理与应用。与侧重于快速开发的 Spring Boot 不同,本文将聚焦于 Spring 框架本身的设计理念、核心容器、控制反转(IoC)、面向切面编程(AOP)、数据访问与事务管理等基础且强大的模块。通过理解这些核心概念,开发者能够更深刻地领悟 Spring 生态系统的设计哲学,并具备解决复杂企业级应用开发问题的能力。
430 4
|
6月前
|
前端开发 Java 开发者
MVC 架构模式技术详解与实践
本文档旨在全面解析软件工程中经典且至关重要的 MVC(Model-View-Controller) 架构模式。内容将深入探讨 MVC 的核心思想、三大组件的职责与交互关系、其优势与劣势,并重点分析其在现代 Web 开发中的具体实现,特别是以 Spring MVC 框架为例,详解其请求处理流程、核心组件及基本开发实践。通过本文档,读者将能够深刻理解 MVC 的设计哲学,并掌握基于该模式进行 Web 应用开发的能力。
1126 1

热门文章

最新文章