异步编程 - 08 Spring框架中的异步执行_TaskExecutor接口和@Async应用篇

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 异步编程 - 08 Spring框架中的异步执行_TaskExecutor接口和@Async应用篇


概述


在Spring Framework中分别使用TaskExecutorTaskScheduler接口提供异步执行和任务调度的抽象。

这里我们着重了解基于TaskExecutor支撑的注解@Async是如何实现异步处理的。



Spring中对TaskExecutor的抽象


Spring 2.0版本中提供了一种新的处理执行器(executors)的抽象,即TaskExecutor接口。TaskExecutor接口 与java.util.concurrent.Executor是等价的,其只有一个接口。

public interface TaskExecutor {
    void execute(Runnable task);
}

该接口具有单个方法execute(Runnable task),该方法基于线程池的语义和配置接收要执行的任务。



Spring框架内置的TaskExecutor实现。



SimpleAsyncTaskExecutor

这种TaskExecutor接口的实现不会复用线程,对应每个请求会新创建一个对应的线程来执行。它支持的并发限制将阻止任何超出限制的调用,这个可以通过调用setConcurrencyLimit方法来限制并发数,默认是不限制并发数的。


SyncTaskExecutor

这种TaskExecutor接口的实现不会异步地执行提交的任务,而是会同步使用调用线程来执行,这种实现主要用于没有必要多线程进行处理的情况,比如在进行简单的单元测试时。



ConcurrentTaskExecutor

这种TaskExecutor接口的实现是对JDK5中的java.util.concurrent.Executor的一个包装,通过setConcurrentExecutor(Executor concurrentExecutor)接口可以设置一个JUC中的线程池到其内部来做适配。


还有一个替代方案ThreadPoolTaskExecutor,它通过bean属性的方式配置Executor线程池的属性。一般很少会用到Concurrent TaskExecutor,但如果ThreadPoolTaskExecutor不够健壮满足不了你的需求,那么ConcurrentTaskExecutor也是一种选择。



SimpleThreadPoolTaskExecutor

这个实现实际上是Quartz的SimpleThreadPool的子类,它监听Spring的生命周期回调。当你有一个可能需要Quartz和非Quartz组件共享的线程池时,通常会使用该实现。



ThreadPoolTaskExecutor

该实现只能在Java 5环境中使用,其也是该环境中最常用的实现。它公开了bean属性,用于配置java.util.concurrent.ThreadPoolExecutor并将其包装在TaskExecutor中。如果你需要一些高级的接口,例如ScheduledThreadPoolExecutor,建议使用Concurrent TaskExecutor。



TimerTaskExecutor

该实现使用单个java.util.Timer对象作为其内部异步线程来执行任务。它与SyncTaskExecutor的不同之处在于,该实现对所有提交的任务都在Timer内的单独线程中执行,尽管提交的多个任务的执行是顺序同步的。



小结

如上,Spring框架本身提供了很多TaskExecutor的实现,但是如果不符合你的需要,你可以通过实现TaskExecutor接口来定制自己的执行器。




如何在Spring中使用异步执行


使用TaskExecutor实现异步执行


在Spring中TaskExecutor的实现类是以JavaBeans的方式提供服务的,比如下面这个例子,我们通过xml方式向Spring容器中注入了TaskExecutor的实现者ThreadPoolTaskExecutor的实例。

   <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!--1. 核心线程个数-->
        <property name="corePoolSize" value="5" />
        <!--2.最大线程个数 -->
        <property name="maxPoolSize" value="10" />
        <!--3.超过核心线程个数的线程空闲多久被回收 -->
        <property name="keepAliveSeconds" value="60" />
        <!--4.缓存队列大小 -->
        <property name="queueCapacity" value="20" />
        <!--5.拒绝策略 -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRuns
Policy" />
        </property>
    </bean>


·如上代码我们向Spring容器中注入了一个ThreadPoolTaskExecutor处理器实例,其配置属性与Java并发包中的线程池ThreadPoolExecutor类似。


·其中代码1、2将处理器中核心线程个数设置为5,最大线程个数设置为10。


·代码3设置了线程池中非核心线程空闲60s后会被自动回收。


·代码4设置了线程池阻塞队列的大小为20。


·代码5设置了线程池的拒绝策略,这里设置为CallerRunsPolicy,意为当线程池中的队列满了,并且所有线程都在忙碌的时候,如果此时向处理器提交了新的任务,则新的任务不再是异步执行,而是使用调用线程来执行。


当我们向Spring容器中注入了TaskExecutor的实例后,我们就可以在Spring容器中使用它。

<bean id="asyncExecutorExample"
    class="com.jiaduo.async.AsyncProgram.AsyncExecutorExample">
    <property name="taskExecutor" ref="taskExecutor" />
</bean>


·如上代码通过xml方式向Spring容器注入了AsyncExecutorExample的实例,并且其属性taskExecutor注入了上面创建的名称为taskExecutor的执行器,下面我们看看AsyncExecutorExample的代码。

public class AsyncExecutorExample {
    private class MessagePrinterTask implements Runnable {
        private String message;
        public MessagePrinterTask(String message) {
            this.message = message;
        }
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " " + message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public TaskExecutor getTaskExecutor() {
        return taskExecutor;
    }
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
    // 线程池执行器
    private TaskExecutor taskExecutor;
    public void printMessages() {
        for (int i = 0; i < 6; i++) {
            taskExecutor.execute(new MessagePrinterTask("Message" + i));
        }
    }
}

上述代码的AsyncExecutorExample中有一个类型为TaskExecutor的属性,我们通过setter访问器注入了该属性,其有一个printMessages方法用来触发异步任务执行,这里的异步任务被封装为MessagePrinterTask,其在run方法内先休眠1s模拟任务执行,然后打印输出。


下面我们看看如何把上面的内容组成可执行的程序,首先需要把上面两个xml配置汇总到beans.xml里面,代码如下所示。


<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-2.5.xsd">
    <bean id="taskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        ...
    </bean>
    <bean id="asyncExecutorExample"
        class="com.jiaduo.async.AsyncProgram.AsyncExecutorExample">
        <property name="taskExecutor" ref="taskExecutor" />
    </bean>
</beans>


然后我们需要编写的测试代码如下所示。

public static void main(String arg[]) throws InterruptedException {
    // 1.创建容器上下文
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
            new String[] { "beans.xml" });
    // 2.获取AsyncExecutorExample实例并调用打印方法
    System.out.println(Thread.currentThread().getName() + " begin ");
    AsyncExecutorExample asyncExecutorExample = applicationContext.getBean(AsyncExecutorExample.class);
    asyncExecutorExample.printMessages();
    System.out.println(Thread.currentThread().getName() + " end ");
}   


·代码1使用ClassPathXmlApplicationContext创建了一个Spring容器上下文,并且以beans.xml作为容器中bean的元数据。


·代码2从容器上下文中获取AsyncExecutorExample的实例,并且调用了print-Messages方法。由于printMessages方法内的6个任务提交到了执行器线程进行处理,所以main函数所在线程调用printMessages方法后马上返回,然后具体任务是由执行器中的线程执行的。


·运行上面代码,一个可能的输出为:


main begin 
main end 
taskExecutor-1 Message0
taskExecutor-3 Message2
taskExecutor-2 Message1
taskExecutor-5 Message4
taskExecutor-4 Message3
taskExecutor-1 Message5


可知具体任务是在执行器线程中执行的,而不是在main函数所在线程中执行的。运行上面的代码后,虽然main函数所在线程会马上结束,并且异步任务也执行完了,但是JVM进程并没有退出,这是因为执行器ThreadPoolTaskExecutor中的线程都是用户线程而不是Deamon线程。而JVM退出的条件是进程中不含有任何用户线程,所以我们要与使用Java并发包中的线程池一样,需要显式关闭线程池。


为此我们在AsyncExecutorExample中添加shutdown方法:


public void shutdown() {
    if (taskExecutor instanceof ThreadPoolTaskExecutor) {
        ((ThreadPoolTaskExecutor) taskExecutor).shutdown();
    }
}


然后在测试类的main函数最后添加如下代码:

// 3.关闭执行器,释放线程
asyncExecutorExample.shutdown();


添加代码后,运行测试代码,输出如下所示。

main begin 
main end 
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.jiaduo.async.AsyncProgram.AsyncExecutorExample$MessagePrinterTask.run(AsyncExecutorExample.java:17)
...


如上可知我们的任务都被中断了(因为我们的任务中调用了sleep方法),这是因为默认情况下执行器ThreadPoolTaskExecutor中的变量waitForTasksToComplete OnShutdown为false,意为关闭执行器时不等待正在执行的任务执行完毕就中断执行任务的线程。所以我们需要修改ThreadPoolTaskExecutor注入的配置,代码如下所示。


<bean id="taskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    ...
    <property name="waitForTasksToCompleteOnShutdown"
        value="true"></property>
</bean>


如上配置在注入ThreadPoolTaskExecutor的配置属性最后添加了变量waitForTasksTo  CompleteOnShutdown为true的配置,然后运行测试类,就会发现等异步任务执行完毕后,当前jvm进程就不存在了,这说明执行器已经被优雅地退出了。



使用注解@Async实现异步执行


在Spring中可以在方法上添加@Async注释,以便异步执行该方法。换句话说,调用线程将在调用含有@Async注释的方法时立即返回,并且该方法的实际执行将发生在Spring的TaskExecutor异步处理器线程中。需要注意的是,该注解@Async默认是不会解析的,你可以使用如下两种方式开启该注解的解析。



·基于xml配置Bean时需要加入如下配置,才可以开启异步处理:

<task:annotation-driven  />


·在基于注解的情况下可以添加如下注解来启动异步处理:

@EnableAsync


下面我们看看如何使用第一种方式开启并使用异步执行,首先我们需要在beans-annotation.xml中配置如下代码。

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-2.5.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd">
    <!--1.开启Async注解的解析 -->
    <task:annotation-driven />
    <!--2.注入业务Bean -->
    <bean id="asyncCommentExample"
        class="com.jiaduo.async.AsyncProgram.AsyncAnnotationExample">
    </bean>
</beans>


如上代码1通过配置开启了对注解Async的解析,代码2注入了我们的业务Bean,其代码如下所示。

public class AsyncAnnotationExample {
    @Async
    public void printMessages() {
        for (int i = 0; i < 6; i++) {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + " 、
Message" + i);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


如上代码的printMessages方法添加了@Async注解,方法内循环6次,循环中先让执行线程休眠1s,然后打印输出。

下面我们组合上面的代码片段形成一个可执行程序进行测试,测试代码如下所示。

public static void main(String arg[]) throws InterruptedException {
    // 1.创建容器上下文
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
            new String[] { "beans-annotation.xml" });
    // 2. 获取AsyncAnnotationExample实例并调用打印方法
    System.out.println(Thread.currentThread().getName() + " begin ");
    AsyncAnnotationExample asyncCommentExample = applicationContext.getBean(AsyncAnnotationExample.class);
    asyncCommentExample.printMessages();
    System.out.println(Thread.currentThread().getName() + " end ");
}


如上代码1使用beans-annotation.xml作为容器Bean的元数据创建了Spring上下文,代码2从中获取了AsyncAnnotationExample的实例,然后调用其printMessages,main线程调用该方法后,该方法会马上返回,printMessages内的任务是使用Spring框架内的默认执行器SimpleAsyncTaskExecutor中的线程来执行的。运行上面代码的一个可能的输出结果如下所示。


main begin 
main end 
SimpleAsyncTaskExecutor-1 Message0
SimpleAsyncTaskExecutor-1 Message1
SimpleAsyncTaskExecutor-1 Message2
SimpleAsyncTaskExecutor-1 Message3
SimpleAsyncTaskExecutor-1 Message4
SimpleAsyncTaskExecutor-1 Message5


可知具体执行异步任务的是SimpleAsyncTaskExecutor中的线程,而不是main函数所在线程。当然我们可以指定自己的执行器来执行我们的异步任务,这需要我们在xml配置自己的执行器,代码如下所示。

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    ...
    <!--0.创建自己的业务线程池处理器 -->
    <task:executor id="myexecutor" pool-size="5" />
    <!--1.开启Async注解的解析 -->
    <task:annotation-driven executor="myexecutor"/>
    <!--2.注入业务Bean -->
    <bean id="asyncCommentExample"
        class="com.jiaduo.async.AsyncProgram.AsyncAnnotationExample">
    </bean>
</beans>


如上代码0为我们创建了自己的线程池处理器,代码1则把我们的线程池处理器作为异步任务的处理器,运行如上代码,可以看到一个可能的输出结果如下:

main begin 
main end 
myexecutor-1 Message0
myexecutor-1 Message1
myexecutor-1 Message2
myexecutor-1 Message3
myexecutor-1 Message4
myexecutor-1 Message5


由如上代码可知,异步任务是使用我们自己的线程池执行器执行的。

下面我们看看第二种方式是如何使用注解方式开启异步处理的,首先我们需要在xml里面进行如下配置。

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-2.5.xsd
       http://www.springframework.org/schema/task
       http://www.springframework.org/schema/task/spring-task.xsd">
    <!--1.扫描bean的包路径 -->
    <context:component-scan
        base-package="com.jiaduo.async.AsyncProgram" />
</beans


如上代码1配置了包扫描路径,框架会扫描该包下面含有@Component注解的从Bean到Spring的容器。

然后要在AsyncAnnotationExample类中加上如下注解。

@EnableAsync//开启异步执行
@Component//把该Bean注入Spring容器
public class AsyncAnnotationExample {
    @Async
    public void printMessages() {
        ...
    }
}


如上代码使用了注解@EnableAsync开启异步执行。

另外需要注意的是@Async注解本身也是有参数的,比如我们可以在某一个需要异步处理的方法上加@Async,注解时指定使用哪一个线程池处理器来进行异步处理。

@Async("bizExecutor")
void doSomething(String s) {
....
}


如上代码指定了方法doSomething使用名称为bizExecutor的线程池处理器来执行异步任务。


上面我们讲解的异步任务都是没有返回结果的,其实基于@Async注解的异步处理也是支持返回值的,但是返回值类型必须是Future或者其子类类型的,比如返回的Future类型可以是普通的java.util.concurrent.Future类型,也可以是Spring框架的org.springframework.util.concurrent.ListenableFuture类型,或者JDK8中的java.util.concurrent.CompletableFuture类型,又或者Spring中的AsyncResult类型等。这提供了异步执行的好处,以便调用者可以在调用Future上的get()之前处理其他任务。


如下代码展示了在AsyncAnnotationExample中,方法doSomething是如何在具有返回值的方法上使用注解@Async的。

@Async
public CompletableFuture<String> doSomething() {
    // 1.创建future
    CompletableFuture<String> result = new CompletableFuture<String>();
    // 2.模拟任务执行
    try {
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + "doSomething");
    } catch (Exception e) {
        e.printStackTrace();
    }
    result.complete("done");
    // 3.返回结果
    return result;
}


代码1创建了一个CompletableFuture类型的Future实例,代码2休眠5s模拟任务执行,然后设置Future的执行结果,代码3则返回Future对象。

下面修改我们的测试代码对其进行测试,代码如下所示。

public static void main(String arg[]) throws InterruptedException {
    // 1.创建容器上下文
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
            new String[] { "beans-annotation.xml" });
    // 2. 获取AsyncExecutorExample实例并调用打印方法
    System.out.println(Thread.currentThread().getName() + " begin ");
    AsyncAnnotationExample asyncCommentExample = applicationContext.getBean(AsyncAnnotationExample.class);
    // 3.获取异步future并设置回调
    CompletableFuture<String> resultFuture = asyncCommentExample.doSomething();
    resultFuture.whenComplete(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(String t, Throwable u) {
            if (null == u) {
                System.out.println(Thread.currentThread().getName() + " " + t);
            } else {
                System.out.println("error:" + u.getLocalizedMessage());
            }
        }
    });
    System.out.println(Thread.currentThread().getName() + " end ");
}


代码3的main函数所在线程调用了AsyncAnnotationExample的doSomething方法,该方法会马上返回一个CompletableFuture,我们在其上设置了回调函数,之后main线程就退出了,最终doSomething方法内的代码就是使用处理器线程池中的线程来执行的,并当执行完毕后回调我们设置的回调函数。


运行上面代码的输出如下所示。


main begin 
main end 
SimpleAsyncTaskExecutor-1doSomething
SimpleAsyncTaskExecutor-1 done


如上代码可知,doSomething方法的执行是使用SimpleAsyncTaskExecutor线程池处理器来执行的,而不是main函数所在线程进行执行。


最后看看使用@Async注解遇到异常时该如何处理。当@Async方法具有Future类型返回值时,很容易管理在方法执行期间抛出的异常,因为会在调用get方法等待结果时抛出该异常。但是对于void返回类型来说,异常未被捕获且无法传输。这时候可以提供AsyncUncaughtExceptionHandler来处理该类异常。以下示例显示了如何执行该操作。


  public class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        // handle exception
    }
}


然后我们在xml里面配置即可:

    <task:annotation-driven
        exception-handler="myAsyncUncaughtExceptionHandler" />
    <bean id="myAsyncUncaughtExceptionHandler" 
class="com.artisan.async.AsyncProgram.MyAsyncUncaughtExceptionHandler"></bean>


如上代码的xml配置首先创建了实例myAsyncUncaughtExceptionHandler,然后将其设置到注解annotation-driven中,在异步任务中抛出异常时会在MyAsyncUncaught ExceptionHandler的handleUncaughtException方法中得到处理。


由上可知基于@Async注解实现异步执行的方式时,大大简化了我们异步编程的运算负担,我们不必再显式地创建线程池并把任务手动提交到线程池内,只要直接在需要异步执行的方法上添加@Async注解即可。当然,当我们需要使用自己的线程池来异步执行标注@Async的方法时,还是需要显式创建线程池的,但这时并不需要显式提交任务到线程池。

相关文章
|
17天前
|
设计模式 XML Java
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
本文详细介绍了Spring框架的核心功能,并通过手写自定义Spring框架的方式,深入理解了Spring的IOC(控制反转)和DI(依赖注入)功能,并且学会实际运用设计模式到真实开发中。
【23种设计模式·全精解析 | 自定义Spring框架篇】Spring核心源码分析+自定义Spring的IOC功能,依赖注入功能
|
13天前
|
Java 开发者 Spring
理解和解决Spring框架中的事务自调用问题
事务自调用问题是由于 Spring AOP 代理机制引起的,当方法在同一个类内部自调用时,事务注解将失效。通过使用代理对象调用、将事务逻辑分离到不同类中或使用 AspectJ 模式,可以有效解决这一问题。理解和解决这一问题,对于保证 Spring 应用中的事务管理正确性至关重要。掌握这些技巧,可以提高开发效率和代码的健壮性。
43 13
|
25天前
|
IDE Java 测试技术
互联网应用主流框架整合之Spring Boot开发
通过本文的介绍,我们详细探讨了Spring Boot开发的核心概念和实践方法,包括项目结构、数据访问层、服务层、控制层、配置管理、单元测试以及部署与运行。Spring Boot通过简化配置和强大的生态系统,使得互联网应用的开发更加高效和可靠。希望本文能够帮助开发者快速掌握Spring Boot,并在实际项目中灵活应用。
46 5
|
1月前
|
缓存 Java 数据库连接
Spring框架中的事件机制:深入理解与实践
Spring框架是一个广泛使用的Java企业级应用框架,提供了依赖注入、面向切面编程(AOP)、事务管理、Web应用程序开发等一系列功能。在Spring框架中,事件机制是一种重要的通信方式,它允许不同组件之间进行松耦合的通信,提高了应用程序的可维护性和可扩展性。本文将深入探讨Spring框架中的事件机制,包括不同类型的事件、底层原理、应用实践以及优缺点。
67 8
|
Java Spring
spring框架之AOP模块(面向切面),附带通知类型---超详细介绍
spring框架之AOP模块(面向切面),附带通知类型---超详细介绍
140 0
|
缓存 监控 Java
Spring框架之AOP(面向切面编程)
Spring框架之AOP(面向切面编程)
64 0
|
6月前
|
分布式计算 Java MaxCompute
详解 Java 限流接口实现问题之在Spring框架中使用AOP来实现基于注解的限流问题如何解决
详解 Java 限流接口实现问题之在Spring框架中使用AOP来实现基于注解的限流问题如何解决
|
7月前
|
设计模式 SQL Java
Spring框架第四章(AOP概念及相关术语)
Spring框架第四章(AOP概念及相关术语)
|
8月前
|
安全 Java 开发者
在Spring框架中,IoC和AOP是如何实现的?
【4月更文挑战第30天】在Spring框架中,IoC和AOP是如何实现的?
97 0
|
XML 设计模式 安全
【Spring框架四】——Spring AOP 注解实现和xml方式实现1
【Spring框架四】——Spring AOP 注解实现和xml方式实现
168 0