阻塞与非阻塞客户端

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 本文主要讲解响应式编程模式下,阻塞与非阻塞式客户端的正确使用方式。

阻塞与非阻塞

阻塞是指程序会一直等待该进程或线程完成当前任务期间不做其它事情。而非阻塞,是指当前线程在处理一些事情的同时,还可以处理其它的事情,并不需要等待当前事件完成才执行其它事件。

阻塞与非阻塞客户端

对于请求当中,我们有需要借助一些请求封装的客户端,这里可以分为两大类:阻塞式、非阻塞式。

阻塞式客户端以常见的 RestTemplate为例,这是一种常见的客户端请求封装,要创建负载平衡RestTemplate,下面看看其Bean:

@LoadBalanced
@Bean
public RestTemplate restTemplate() {
   return new RestTemplate();
}

在底层,RestTemplate 使用了基于每个请求对应一个线程模型(thread-per-request)的 Java Servlet API。在阻塞客户端中,这意味着,直到 Web 客户端收到响应之前,线程都将一直被阻塞下去。而阻塞带来的问题是:每个线程都消耗了一定的内存和 CPU 周期。

如果在并发下,等待结果的请求迟早都会堆积起来。这样,程序将创建很多线程,这些线程将耗尽线程池或占用所有可用内存。由于频繁的 CPU 线程切换,我们还会遇到性能下降的问题。

这在 Spring5 中,提出了一种新的客户端抽象:反应式客户端 WebClient,而 WebClient 使用了 Spring Reactive Framework 所提供的异步非阻塞解决方案。所以,当 RestTemplate创建一个个新的线程时,Webclient是为其创建类似task的线程,并且在底层, Reactive 框架将对这些 task 进行排队,并且仅在适当的响应可用时再执行它们。WebClient 是 Spring WebFlux 库的一部分。所以,我们还可以使用了流畅的函数式 API 编程,并将响应类型作为声明来进行组合。如果需要使用 WebClient,同样可以创建:

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
   return WebClient.builder();
}

案例

假设这里有一个响应非常慢的服务rest-service,我们分别用阻塞式、非阻塞式客户端来测试一下。

阻塞式

我们利用 RestTemplate实现阻塞式请求:

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}


@Autowired
RestTemplate restTemplate;

@GetMapping("/getClientRes")
public Response<Object> getClientRes() throws Exception {
   System.out.println("block api enter");
HttpHeaders headers = new HttpHeaders();
MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
HttpEntity<String> formEntity = new HttpEntity<String>(null, headers);
String body = "";
try {
 ResponseEntity<String> responseEntity = restTemplate.exchange("http://diff-ns-service-service/getservicedetail?servicename=cas-server-service",
   HttpMethod.GET, formEntity, String.class);
 System.out.println(JSON.toJSONString(responseEntity));
 if (responseEntity.getStatusCodeValue() == 200) {
     System.out.println("block api exit");
  return Response.ok(responseEntity.getBody());
 }
} catch (Exception e) {
 System.out.println(e.getMessage());
}
System.out.println("block api failed, exit");
return Response.error("failed");
}

在启动服务请求后,发现其打印:

block api enter

[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]

block api exit

非阻塞式

上面的打印符合我们的逾期,接下来我们来看看非阻塞、反应式客户端请求:

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
   return WebClient.builder();
}


@GetMapping(value = "/getClientResByWebClient", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<String> getClientResByWebClient() throws Exception {
   System.out.println("no block api enter");
Mono<String> resp = webClientBuilder.build().get()
   .uri("http://diff-ns-service-service/getservicedetail?servicename=cas-server-service").retrieve()
   .bodyToMono(String.class);
 resp.subscribe(body -> System.out.println(body.toString()));
 System.out.println("no block api exit");
 return resp;
}

执行完代码后,看打印:

no block api enter

no block api exit

[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]

在本例中,WebClient 返回一个 Mono 生产者后完成方法的执行。如果一旦结果可用,发布者将开始向其订阅者发送数据。调用这个API的客户端(浏览器)也将订阅返回的 Mono 对象。

阻塞式转非阻塞式

可以将前面的阻塞式请求,直接转为非阻塞请求,前提是你使用的是 Spring5,此时,可以直接这样来写,贴代码:

@GetMapping("/hello")
public Mono<String> hello() {
   return Mono.fromCallable(() -> restTemplate.getForObject("http://diff-ns-service-service/all/getService", String.class))
           .subscribeOn(Schedulers.elastic());
}

这样后,在请求访问时,直接返回了提供者服务返回的信息体:

{"result":{"status":200,"code":0,"msg":"success"},"data":[{"host":"10.244.0.55","instanceId":"71f96128-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Service\",\"metadata\":{\"annotations\":{},\"name\":\"cas-server-service\",\"namespace\":\"system-server\"},\"spec\":{\"ports\":[{\"name\":\"cas-server01\",\"port\":2000,\"targetPort\":\"cas-server01\"}],\"selector\":{\"app\":\"cas-server\"}}}\n","port.cas-server01":"2000","k8s_namespace":"system-server"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.55:2000"},{"host":"10.244.0.56","instanceId":"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36","metadata":{"$ref":"$[0].metadata"},"namespace":"system-server","port":2000,"scheme":"http","secure":false,"serviceId":"cas-server-service","uri":"http://10.244.0.56:2000"}]}

这里需要注意的是,请求时,需要直接返回服务提供者的标准信息体,不能再作二次封装返回,否则,只能拿到信息:

{"result":{"status":200,"code":0,"msg":"success"},"data":{"scanAvailable":true}}

表示本次 callable 为true,但这不是我们需要的信息,我们还是需要其本身返回的业务数据。所以需要提供者的返回标准化,因为直接将信息返回给可接收的浏览器等前端。

自定义返回体

从前面例子,我们可以看到:当发送请求获取响应后会直接返回给订阅者(浏览器),但有时候,我们可能需要作一些自定义的返回体,比如加一些状态码、说明、描述等。

此时,我们该如何处理呢?我们可以看到在 webClient 中,提供了 block 函数,该函数在返回信息时,可以阻塞其返回给前端,可以通过其来封装返回结果:

@GetMapping("/test2")
public Response<String> test2() {
   Mono<String> resp = webClientBuilder.build()
                   .get()
                   .uri("http://diff-ns-service-service/all/getService")
                   .retrieve()
                   .bodyToMono(String.class);

   return Response.ok(resp.block(Duration.ofSeconds(2)));
}

这里在等待异常出现给定一个最大的时间,超时将抛出异常:RunTimeException。这样,我们就可以自定义消息体返回格式了。

{"result":{"status":200,"code":0,"msg":"success"},"data":""\"[{\\\"host\\\":\\\"10.244.0.55\\\",\\\"instanceId\\\":\\\"71f96128-3bb1-11ec-97e6-ac1f6ba00d36\\\",\\\"metadata\\\":{\\\"kubectl.kubernetes.io/last-applied-configuration\\\":\\\"{\\\\\\\"apiVersion\\\\\\\":\\\\\\\"v1\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"Service\\\\\\\",\\\\\\\"metadata\\\\\\\":{\\\\\\\"annotations\\\\\\\":{},\\\\\\\"name\\\\\\\":\\\\\\\"cas-server-service\\\\\\\",\\\\\\\"namespace\\\\\\\":\\\\\\\"system-server\\\\\\\"},\\\\\\\"spec\\\\\\\":{\\\\\\\"ports\\\\\\\":[{\\\\\\\"name\\\\\\\":\\\\\\\"cas-server01\\\\\\\",\\\\\\\"port\\\\\\\":2000,\\\\\\\"targetPort\\\\\\\":\\\\\\\"cas-server01\\\\\\\"}],\\\\\\\"selector\\\\\\\":{\\\\\\\"app\\\\\\\":\\\\\\\"cas-server\\\\\\\"}}}\\\\n\\\",\\\"port.cas-server01\\\":\\\"2000\\\",\\\"k8s_namespace\\\":\\\"system-server\\\"},\\\"namespace\\\":\\\"system-server\\\",\\\"port\\\":2000,\\\"scheme\\\":\\\"http\\\",\\\"secure\\\":false,\\\"serviceId\\\":\\\"cas-server-service\\\",\\\"uri\\\":\\\"http://10.244.0.55:2000\\\"},{\\\"host\\\":\\\"10.244.0.56\\\",\\\"instanceId\\\":\\\"71fc1c14-3bb1-11ec-97e6-ac1f6ba00d36\\\",\\\"metadata\\\":{\\\"$ref\\\":\\\"$[0].metadata\\\"},\\\"namespace\\\":\\\"system-server\\\",\\\"port\\\":2000,\\\"scheme\\\":\\\"http\\\",\\\"secure\\\":false,\\\"serviceId\\\":\\\"cas-server-service\\\",\\\"uri\\\":\\\"http://10.244.0.56:2000\\\"}]\"""}

当然,如果对方的消息体已经按照我们标准的格式输出了,我们可以直接返回这个消息体。

结论

在大部分场景下, RestTemplate 还是继续被使用的,但有些场景下,反应式非阻塞请求还是必须的,系统资源要少得多。WebClient 不失为是一个更好的选择。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
UED
阻塞与非阻塞问题
阻塞与非阻塞是计算机领域中常见的两种I/O模型,用来描述在进行输入输出操作时程序的行为方式。下面将详细介绍阻塞和非阻塞的概念、特点、应用场景以及它们之间的区别。
116 0
阻塞式/非阻塞式与同步/异步的区别
阻塞式/非阻塞式与同步/异步的区别
74 0
理解阻塞、非阻塞与同步、异步的区别
理解阻塞、非阻塞与同步、异步的区别
理解阻塞、非阻塞与同步、异步的区别
|
Linux
Linux网络编程之阻塞与非阻塞
Linux网络编程之阻塞与非阻塞
187 0
长连接 短连接 异步 同步 )阻塞与非阻塞详解
一。通信方式 主要有以下三大类: (一)SERVER/CLIENT方式 1.一个Client方连接一个Server方,或称点对点(peer to peer): 2.多个Client方连接一个Server方,这也是通常的并发服务器方式。
1502 2
|
缓存 Java
同步 异步 阻塞 非阻塞
在高性能的IO体系设计中,有几个名词概念常常会使我们感到迷惑不解。具体如下:  序号 问题 1 什么是同步? 2 什么是异步? 3 什么是阻塞? 4 什么是非阻塞? 5 什么是同步阻塞? 6 什么是同步非阻塞? 7 什么是异步阻塞? 8 什么是异步非阻塞? 散仙不才,在查了一部分资料后,愿试着以通俗易懂的方式
1865 1
同步、异步、阻塞、非阻塞
同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication) 真正意义上的 异步IO 是说内核直接将数据拷贝至用户态的内存单元,再通知程序直接去读取数据。
985 0
同步,异步,阻塞和非阻塞
同步,异步,阻塞和非阻塞的理解
1650 0