下面我们简单看看WebAsyncTask的源码,非常简单,就是个包装:
public class WebAsyncTask<V> implements BeanFactoryAware { // 正常执行的函数(通过WebAsyncTask的构造函数可以传进来) private final Callable<V> callable; // 处理超时时间(ms),可通过构造函数指定,也可以不指定(不会有超时处理) private Long timeout; // 执行任务的执行器。可以构造函数设置进来,手动指定。 private AsyncTaskExecutor executor; // 若设置了,会根据此名称去IoC容器里找这个Bean (和上面二选一) // 若传了executorName,请务必调用set方法设置beanFactory private String executorName; private BeanFactory beanFactory; // 超时的回调 private Callable<V> timeoutCallback; // 发生错误的回调 private Callable<V> errorCallback; // 完成的回调(不管超时还是错误都会执行) private Runnable completionCallback; ... // 这是获取执行器的逻辑 @Nullable public AsyncTaskExecutor getExecutor() { if (this.executor != null) { return this.executor; } else if (this.executorName != null) { Assert.state(this.beanFactory != null, "BeanFactory is required to look up an executor bean by name"); return this.beanFactory.getBean(this.executorName, AsyncTaskExecutor.class); } else { return null; } } public void onTimeout(Callable<V> callback) { this.timeoutCallback = callback; } public void onError(Callable<V> callback) { this.errorCallback = callback; } public void onCompletion(Runnable callback) { this.completionCallback = callback; } // 最终执行超时回调、错误回调、完成回调都是通过这个拦截器实现的 CallableProcessingInterceptor getInterceptor() { return new CallableProcessingInterceptor() { @Override public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception { return (timeoutCallback != null ? timeoutCallback.call() : CallableProcessingInterceptor.RESULT_NONE); } @Override public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception { return (errorCallback != null ? errorCallback.call() : CallableProcessingInterceptor.RESULT_NONE); } @Override public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception { if (completionCallback != null) { completionCallback.run(); } } }; } }
WebAsyncTask 的异步编程 API。相比于 @Async 注解,WebAsyncTask 提供更加健全的 超时处理 和 异常处理 支持。但是@Async也有更优秀的地方,就是他不仅仅能用于controller中~~~~(任意地方)
DeferredResult案例:
DeferredResult使用方式与Callable类似,但在返回结果上不一样,它返回的时候实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult中去。
这个特性非常非常的重要,对后面实现复杂的功能(比如服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等等高级应用)
官方给的Demo如下:
自己写个非常粗糙的Demo:
@Controller @RequestMapping("/async/controller") public class AsyncHelloController { private List<DeferredResult<String>> deferredResultList = new ArrayList<>(); @ResponseBody @GetMapping("/hello") public DeferredResult<String> helloGet() throws Exception { DeferredResult<String> deferredResult = new DeferredResult<>(); //先存起来,等待触发 deferredResultList.add(deferredResult); return deferredResult; } @ResponseBody @GetMapping("/setHelloToAll") public void helloSet() throws Exception { // 让所有hold住的请求给与响应 deferredResultList.forEach(d -> d.setResult("say hello to all")); } }
我们第一个请求/hello
,会先deferredResult
存起来,然后前端页面是一直等待(转圈状态)的。知道我发第二个请求:setHelloToAll
,所有的相关页面才会有响应~~
执行过程
官方:
1.controller 返回一个DeferredResult,我们把它保存到内存里或者List里面(供后续访问)
2.Spring MVC调用request.startAsync(),开启异步处理
3.与此同时将DispatcherServlet里的拦截器、Filter等等都马上退出主线程,但是response仍然保持打开的状态
4.应用通过另外一个线程(可能是MQ消息、定时任务等)给DeferredResult set值。然后Spring MVC会把这个请求再次派发给servlet容器
5.DispatcherServlet再次被调用,然后处理后续的标准流程
简单看看源码:
public class DeferredResult<T> { private static final Object RESULT_NONE = new Object() // 超时时间(ms) 可以不配置 @Nullable private final Long timeout; // 相当于超时的话的,传给回调函数的值 private final Object timeoutResult; // 这三种回调也都是支持的 private Runnable timeoutCallback; private Consumer<Throwable> errorCallback; private Runnable completionCallback; // 这个比较强大,就是能把我们结果再交给这个自定义的函数处理了 他是个@FunctionalInterface private DeferredResultHandler resultHandler; private volatile Object result = RESULT_NONE; private volatile boolean expired = false; // 判断这个DeferredResult是否已经被set过了(被set过的对象,就可以移除了嘛) // 如果expired表示已经过期了你还没set,也是返回false的 // Spring4.0之后提供的 public final boolean isSetOrExpired() { return (this.result != RESULT_NONE || this.expired); } // 没有isSetOrExpired 强大,建议使用上面那个 public boolean hasResult() { return (this.result != RESULT_NONE); } // 还可以获得set进去的结果 @Nullable public Object getResult() { Object resultToCheck = this.result; return (resultToCheck != RESULT_NONE ? resultToCheck : null); } public void onTimeout(Runnable callback) { this.timeoutCallback = callback; } public void onError(Consumer<Throwable> callback) { this.errorCallback = callback; } public void onCompletion(Runnable callback) { this.completionCallback = callback; } // 如果你的result还需要处理,可以这是一个resultHandler,会对你设置进去的结果进行处理 public final void setResultHandler(DeferredResultHandler resultHandler) { Assert.notNull(resultHandler, "DeferredResultHandler is required"); // Immediate expiration check outside of the result lock if (this.expired) { return; } Object resultToHandle; synchronized (this) { // Got the lock in the meantime: double-check expiration status if (this.expired) { return; } resultToHandle = this.result; if (resultToHandle == RESULT_NONE) { // No result yet: store handler for processing once it comes in this.resultHandler = resultHandler; return; } } try { resultHandler.handleResult(resultToHandle); } catch (Throwable ex) { logger.debug("Failed to handle existing result", ex); } } // 我们发现,这里调用是private方法setResultInternal,我们设置进来的结果result,会经过它的处理 // 而它的处理逻辑也很简单,如果我们提供了resultHandler,它会把这个值进一步的交给我们的resultHandler处理 // 若我们没有提供此resultHandler,那就保存下这个result即可 public boolean setResult(T result) { return setResultInternal(result); } private boolean setResultInternal(Object result) { // Immediate expiration check outside of the result lock if (isSetOrExpired()) { return false; } DeferredResultHandler resultHandlerToUse; synchronized (this) { // Got the lock in the meantime: double-check expiration status if (isSetOrExpired()) { return false; } // At this point, we got a new result to process this.result = result; resultHandlerToUse = this.resultHandler; if (resultHandlerToUse == null) { this.resultHandler = null; } } resultHandlerToUse.handleResult(result); return true; } // 发生错误了,也可以设置一个值。这个result会被记下来,当作result // 注意这个和setResult的唯一区别,这里入参是Object类型,而setResult只能set规定的指定类型 // 定义成Obj是有原因的:因为我们一般会把Exception等异常对象放进来。。。 public boolean setErrorResult(Object result) { return setResultInternal(result); } // 拦截器 注意最终finally里面,都可能会调用我们的自己的处理器resultHandler(若存在的话) // afterCompletion不会调用resultHandler~~~~~~~~~~~~~ final DeferredResultProcessingInterceptor getInterceptor() { return new DeferredResultProcessingInterceptor() { @Override public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) { boolean continueProcessing = true; try { if (timeoutCallback != null) { timeoutCallback.run(); } } finally { if (timeoutResult != RESULT_NONE) { continueProcessing = false; try { setResultInternal(timeoutResult); } catch (Throwable ex) { logger.debug("Failed to handle timeout result", ex); } } } return continueProcessing; } @Override public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) { try { if (errorCallback != null) { errorCallback.accept(t); } } finally { try { setResultInternal(t); } catch (Throwable ex) { logger.debug("Failed to handle error result", ex); } } return false; } @Override public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) { expired = true; if (completionCallback != null) { completionCallback.run(); } } }; } // 内部函数式接口 DeferredResultHandler @FunctionalInterface public interface DeferredResultHandler { void handleResult(Object result); } }
DeferredResult的超时处理,采用委托机制,也就是在实例DeferredResult时给予一个超时时长(毫秒),同时在onTimeout中委托(传入)一个新的处理线程(我们可以认为是超时线程);当超时时间到来,DeferredResult启动超时线程,超时线程处理业务,封装返回数据,给DeferredResult赋值(正确返回的或错误返回的)