异步调用
在Dubbo中发起异步调用机制,从而提高对应的服务的调用的吞吐能力和调用机制
特性说明
技术背景
从2.7.0开始,Dubbo 的所有异步编程接口开始以CompletableFuture为基础,基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。
使用场景
使用CompletableFuture修饰的接口
- 首先需要服务提供者事先定义CompletableFuture签名的服务,接口定义指南如下:
- Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响,异步执行无异于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。
- Provider端异步执行和Consumer端异步调用是相互独立的,任意正交组合两端配置
- Consumer同步 - Provider同步
- Consumer异步 - Provider同步
- Consumer同步 - Provider异步
- Consumer异步 - Provider异步
Maven依赖配置
xml
复制代码
<properties> <source.level>1.8</source.level> <target.level>1.8</target.level> <dubbo.version>3.0.2.1</dubbo.version> <spring.version>4.3.16.RELEASE</spring.version> <junit.version>4.12</junit.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-framework-bom</artifactId> <version>${spring.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-bom</artifactId> <version>${dubbo.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-dependencies-zookeeper</artifactId> <version>${dubbo.version}</version> <type>pom</type> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-dependencies-zookeeper</artifactId> <type>pom</type> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> </dependency> </dependencies> <profiles> <!-- For jdk 11 above JavaEE annotation --> <profile> <id>javax.annotation</id> <activation> <jdk>[1.11,)</jdk> </activation> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> </dependencies> </profile> </profiles> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven-compiler-plugin.version}</version> <configuration> <source>${source.level}</source> <target>${target.level}</target> </configuration> </plugin> </plugins> </build>
方案1:定义CompletableFuture修饰的接口
服务接口定义
java
复制代码
public interface AsyncService { CompletableFuture<String> execute(String name); }
服务接口实现(服务提供者)
java
复制代码
public class AsyncServiceImpl implements AsyncService { @Override public CompletableFuture<String> execute(String name) { return CompletableFuture.supplyAsync(() -> { //TODO 功能实现机制 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "async response from provider."; }); } }
服务接口返回
通过return CompletableFuture.supplyAsync() ,业务执行已从Dubbo线程切换到业务线程,避免了对 Dubbo 线程池的阻塞。
方案2:使用AsyncContext
Dubbo提供了一个类似 Servlet 3.0 的异步接口AsyncContext,在没有 CompletableFuture修饰接口的情况下,也可以实现 Provider端的异步执行。
服务接口定义
java
复制代码
public interface AsyncService { String execute(String name); }
服务暴露,和普通服务完全一致
xml
复制代码
<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/> <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>
服务实现(服务提供者)
java
复制代码
public class AsyncServiceImpl implements AsyncService { public String execute(String name) { final AsyncContext asyncContext = RpcContext.startAsync(); new Thread(() -> { // 如果要使用上下文,则必须要放在第一句执行 asyncContext.signalContextSwitch(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 写回响应 asyncContext.write("Hello " + name + ", response from provider."); }).start(); return null; } }
注意接口的返回类型是 CompletableFuture。
XML引用服务(服务消费者者)
xml
复制代码
<dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.AsyncService"/>
调用远程服务(服务消费者)
xml
复制代码
// 调用直接返回CompletableFuture // 获取到对应的asyncService服务实现类对象 CompletableFuture<String> future = asyncService.execute("async call request"); // 增加回调 future.whenComplete((v, t) -> { if (t != null) { t.printStackTrace(); } else { System.out.println("Response: " + v); } }); // 早于结果输出 System.out.println("Executed before response return.");
方案3:使用RpcContext
在 consumer.xml 中配置
xml
复制代码
<dubbo:reference id="asyncService" interface="org.apache.dubbo.samples.governance.api.AsyncService"> <dubbo:method name="execute" async="true" /> </dubbo:reference>
调用代码
java
复制代码
// 此调用会立即返回null asyncService.execute("world"); // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture(); // 为Future添加回调 helloFuture.whenComplete((retValue, exception) -> { if (exception == null) { System.out.println(retValue); } else { exception.printStackTrace(); } });
或者,也可以这样做异步调用
java
复制代码
CompletableFuture<String> future = RpcContext.getServiceContext().asyncCall( () -> { asyncService.sayHello("oneway call request1"); } ); future.get();
异步总是不等待返回,你也可以设置是否等待消息发出
xml
复制代码
<dubbo:method name="findFoo" async="true" sent="true" />
- sent="true" 等待消息发出,消息发送失败将抛出异常。
如果你只是想异步,完全忽略返回值,可以配置 return="false",以减少 Future 对象的创建和管理成本
xml
复制代码
<dubbo:method name="findFoo" async="true" return="false" />
- sent="false" 不等待消息发出,将消息放入 IO 队列,即刻返回。