java版gRPC实战之六:客户端动态获取服务端地址

简介: 当服务端地址发生改变时,客户端不改配置不重启,就能感知到服务端的变化吗?

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

客户端为什么要动态获取服务端地址

本文是《java版gRPC实战》系列的第六篇,前面咱们在开发客户端应用时,所需的服务端地址都是按如下步骤设置的:

  • 在application.yml中配置,如下图:

在这里插入图片描述

  • 在用到gRPC的bean中,使用注解GrpcClient即可将Stub类注入到成员变量中:

在这里插入图片描述

  • 上述操作方式的优点是简单易用好配置,缺点也很明显:服务端的IP地址或者端口一旦有变化,就必须修改application.yml并重启客户端应用;

为什么不用注册中心

  • 您一定会想到解决上述问题最简单的方法就是使用注册中心,如nacos、eureka等,其实我也是这么想的,直到有一天,由于工作原因,我要在一个已有的gRPC微服务环境部署自己的应用,这个微服务环境并非java技术栈,而是基于golang的,他们都使用了go-zero框架( 老扎心了),这个go-zero框架没有提供java语言的SDK,因此,我只能服从go-zero框架的规则,从etcd中取得其他微服务的地址信息,才能调用其他gRPC服务端,如下图所示:

在这里插入图片描述

  • 如此一来,咱们之前那种在application.yml中配置服务端信息的方法就用不上了,本篇咱们来开发一个新的gRPC客户端应用,满足以下需求:
  1. 创建Stub对象的时候,服务端的信息不再来自注解GrpcClient,而是来自查询etcd的结果;
  2. etcd上的服务端信息有变化的时候,客户端可以及时更新,而不用重启应用;

本篇概览

  • 本篇要开发名为get-service-addr-from-etcd的springboot应用,该应用从etcd取得local-server应用的IP和端口,然后调用local-server的sayHello接口,如下图:

在这里插入图片描述

  1. 开发客户端应用;
  2. 部署gRPC服务端应用;
  3. 部署etcd;
  4. 模拟go-zero的规则,将服务端应用的IP地址和端口写入etcd;
  5. 启动客户端应用,验证能否正常调用服务端的服务;
  6. 重启服务端,重启的时候修改端口;
  7. 修改etcd中服务端的端口信息;
  8. 调用接口触发客户端重新实例化Stub对象;
  9. 验证客户端能否正常调用修改了端口的服务端服务;

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

在这里插入图片描述

  • grpc-tutorials文件夹下有多个目录,本篇文章对应的客户端代码在get-service-addr-from-etcd目录下,如下图:

在这里插入图片描述

开发客户端应用

  • 在父工程的build.gradle文件中新增一行,这是etcd相关的库,如下图红框所示:

在这里插入图片描述

  • 在父工程grpc-turtorials下面新建名为get-service-addr-from-etcd的模块,其build.gradle内容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation 'io.etcd:jetcd-core'
    implementation project(':grpc-lib')
}
  • 配置文件application.yml,设置自己的web端口号和应用名,另外grpc.etcdendpoints是etcd集群的地址信息:
server:
  port: 8084
spring:
  application:
    name: get-service-addr-from-etcd

grpc:
  # etcd的地址,从此处取得gRPC服务端的IP和端口
  etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
  • 启动类DynamicServerAddressDemoApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 新增StubWrapper.java文件,这是个spring bean,要重点关注的是simpleBlockingStub方法,当bean在spring注册的时候simpleBlockingStub方法会被执行,这样每当bean在spring注册时,都会从etcd查询gRPC服务端信息,然后创建SimpleBlockingStub对象:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import static com.google.common.base.Charsets.UTF_8;

/**
 * @author will (zq2599@gmail.com)
 * @version 1.0
 * @description: 包装了SimpleBlockingStub实例的类,发起gRPC请求时需要用到SimpleBlockingStub实例
 * @date 2021/5/8 19:34
 */
@Component("stubWrapper")
@Data
@Slf4j
@ConfigurationProperties(prefix = "grpc")
public class StubWrapper {

    /**
     * 这是etcd中的一个key,该key对应的值是grpc服务端的地址信息
     */
    private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server";

    /**
     * 配置文件中写好的etcd地址
     */
    private String etcdendpoints;

    private SimpleGrpc.SimpleBlockingStub simpleBlockingStub;

    /**
     * 从etcd查询gRPC服务端的地址
     * @return
     */
    public String[] getGrpcServerInfo() {
        // 创建client类
        KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient();

        GetResponse response = null;

        // 去etcd查询/grpc/local-server这个key的值
        try {
            response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get();
        } catch (Exception exception) {
            log.error("get grpc key from etcd error", exception);
        }

        if (null==response || response.getKvs().isEmpty()) {
            log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY);
            return null;
        }

        // 从response中取得值
        String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8);

        // rawAddrInfo是“192.169.0.1:8080”这样的字符串,即一个IP和一个端口,用":"分割,
        // 这里用":"分割成数组返回
        return null==rawAddrInfo ? null : rawAddrInfo.split(":");
    }

    /**
     * 每次注册bean都会执行的方法,
     * 该方法从etcd取得gRPC服务端地址,
     * 用于实例化成员变量SimpleBlockingStub
     */
    @PostConstruct
    public void simpleBlockingStub() {
        // 从etcd获取地址信息
        String[] array = getGrpcServerInfo();

        log.info("create stub bean, array info from etcd {}", Arrays.toString(array));

        // 数组的第一个元素是gRPC服务端的IP地址,第二个元素是端口
        if (null==array || array.length<2) {
            log.error("can not get valid grpc address from etcd");
            return;
        }

        // 数组的第一个元素是gRPC服务端的IP地址
        String addr = array[0];
        // 数组的第二个元素是端口
        int port = Integer.parseInt(array[1]);

        // 根据刚才获取的gRPC服务端的地址和端口,创建channel
        Channel channel = ManagedChannelBuilder
                .forAddress(addr, port)
                .usePlaintext()
                .build();

        // 根据channel创建stub
        simpleBlockingStub = SimpleGrpc.newBlockingStub(channel);
    }
}
  • GrpcClientService是封装了StubWrapper的服务类:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.HelloReply;
import com.bolingcavalry.grpctutorials.lib.HelloRequest;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.grpc.StatusRuntimeException;
import lombok.Setter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class GrpcClientService {

    @Autowired(required = false)
    @Setter
    private StubWrapper stubWrapper;

    public String sendMessage(final String name) {
        // 很有可能simpleStub对象为null
        if (null==stubWrapper) {
            return "invalid SimpleBlockingStub, please check etcd configuration";
        }

        try {
            final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build());
            return response.getMessage();
        } catch (final StatusRuntimeException e) {
            return "FAILED with " + e.getStatus().getCode().name();
        }
    }
}
  • 新增一个controller类GrpcClientController,提供一个http接口,里面会调用GrpcClientService的方法,最终完成远程gRPC调用:
package com.bolingcavalry.dynamicrpcaddr;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "will") String name) {
        return grpcClientService.sendMessage(name);
    }
}
  • 接下来新增一个controller类RefreshStubInstanceController,对外提供一个http接口refreshstub,作用是删掉stubWrapper这个bean,再重新注册一次,这样每当外部调用refreshstub接口,就可以从etcd取得服务端信息再重新实例化SimpleBlockingStub成员变量,这样就达到了客户端动态获取服务端地址的效果:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RefreshStubInstanceController implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Autowired
    private GrpcClientService grpcClientService;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @RequestMapping("/refreshstub")
    public String refreshstub() {

        String beanName = "stubWrapper";

        //获取BeanFactory
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

        // 删除已有bean
        defaultListableBeanFactory.removeBeanDefinition(beanName);

        //创建bean信息.
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class);

        //动态注册bean.
        defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());

        // 更新引用关系(注意,applicationContext.getBean方法很重要,会触发StubWrapper实例化操作)
        grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class));

        return "Refresh success";
    }
}
  • 编码完成,开始验证;

部署gRPC服务端应用

部署gRPC服务端应用很简单,启动local-server应用即可:
在这里插入图片描述

部署etcd

  • 为了简化操作,我这里的etcd集群是用docker部署的,对应的docker-compose.yml文件内容如下:
version: '3'
services:
  etcd1:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd1'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd1:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd1:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2379:2379
    volumes:
      - ./store/etcd1/data:/etcd_data
  etcd2:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd2'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd2:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd2:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2380:2379
    volumes:
      - ./store/etcd2/data:/etcd_data
  etcd3:
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd3'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd3:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd3:2379'
      - '--initial-cluster-token=etcd-cluster'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 2381:2379
    volumes:
      - ./store/etcd3/data:/etcd_data
  • 准备好上述文件后,执行docker-compose up -d即可创建集群;

将服务端应用的IP地址和端口写入etcd

  • 我这边local-server所在服务器IP是192.168.50.5,端口9898,所以执行以下命令将local-server信息写入etcd:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898

启动客户端应用

  • 打开DynamicServerAddressDemoApplication.java,点击下图红框位置,即可启动客户端应用:

在这里插入图片描述

  • 注意下图红框中的日志,该日志证明客户端应用从etcd获取服务端信息成功:

在这里插入图片描述

  • 浏览器访问应用get-service-addr-from-etcd的http接口,成功收到响应,证明gRPC调用成功:

在这里插入图片描述

  • 去看local-server的控制台,如下图红框,证明远程调用确实执行了:

在这里插入图片描述

重启服务端,重启的时候修改端口

  • 为了验证动态获取服务端信息是否有效,咱们先把local-server应用的端口改一下,如下图红框,改成9899

在这里插入图片描述

  • 改完重启local-server,如下图红框,可见gRPC端口已经改为9899:

在这里插入图片描述

  • 这时候再访问get-service-addr-from-etcd的http接口,由于get-service-addr-from-etcd不知道local-server的监听端口发生了改变,因此还是去访问9898端口,毫无意外的返回了失败:

在这里插入图片描述

修改etcd中服务端的端口信息

现在执行以下命令,将etcd中的服务端信息改为正确的:

docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899

调用接口触发客户端重新实例化Stub对象

  • 聪明的您一定知道接下来要做的事情了:让StubWrapper的bean重新在spring环境注册,也就是调用RefreshStubInstanceController提供的http接口refreshstub:

在这里插入图片描述

  • 查看get-service-addr-from-etcd应用的控制台,如下图红框,StubWrapper已经重新注册了,并且从etcd取得了最新的服务端信息:

在这里插入图片描述

验证客户端能否正常调用修改了端口的服务端服务

  • 再次访问get-service-addr-from-etcd应用的web接口,如下图,gRPC调用成功:

在这里插入图片描述

  • 至此,在不修改配置不重启服务的情况下,客户端也可以适应服务端的变化了,当然了,本文只是提供基本的操作参考,实际上的微服务环境会更复杂,例如refreshstub接口可能被其他服务调用,这样服务端有了变化可以更加及时地被更新,还有客户端本身也肯能是gRPC服务提供方,那也要把自己注册到etcd上去,还有利用etcd的watch功能监控指定的服务端是否一直存活,以及同一个gRPC服务的多个实例如何做负载均衡,等等,这些都要根据您的实际情况来定制;
  • 本篇内容过多,可见对于这些官方不支持的微服务环境,咱们自己去做注册发现的适配很费时费力的,如果设计和选型能自己做主,我们更倾向于使用现成的注册中心,接下来的文章,咱们就一起尝试使用eureka为gRPC提供注册发现服务;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关文章
|
2月前
|
Java
Java开发实现图片URL地址检验,如何编码?
【10月更文挑战第14天】Java开发实现图片URL地址检验,如何编码?
92 4
|
2月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
72 2
|
1天前
|
Java
Java基础却常被忽略:全面讲解this的实战技巧!
本次分享来自于一道Java基础的面试试题,对this的各种妙用进行了深度讲解,并分析了一些关于this的常见面试陷阱,主要包括以下几方面内容: 1.什么是this 2.this的场景化使用案例 3.关于this的误区 4.总结与练习
|
17天前
|
Java 程序员
Java基础却常被忽略:全面讲解this的实战技巧!
小米,29岁程序员,分享Java中`this`关键字的用法。`this`代表当前对象引用,用于区分成员变量与局部变量、构造方法间调用、支持链式调用及作为参数传递。文章还探讨了`this`在静态方法和匿名内部类中的使用误区,并提供了练习题。
19 1
|
28天前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
52 6
|
27天前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
1月前
|
存储 Java API
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
47 4
|
2月前
|
Web App开发 Java
使用java操作浏览器的工具selenium-java和webdriver下载地址
【10月更文挑战第12天】Selenium-java依赖包用于自动化Web测试,版本为3.141.59。ChromeDriver和EdgeDriver分别用于控制Chrome和Edge浏览器,需确保版本与浏览器匹配。示例代码展示了如何使用Selenium-java模拟登录CSDN,包括设置驱动路径、添加Cookies和获取页面源码。
131 6
|
2月前
|
JSON Java 开发工具
Java服务端集成Google FCM推送的注意事项和实际经验
本文分享了作者在公司APP海外发布过程中,选择Google FCM进行消息推送的集成经验。文章详细解析了Java集成FCM推送的多种实现方式,包括HTTP请求和SDK集成,并指出了通知栏消息和透传消息的区别与应用场景。同时,作者还探讨了Firebase项目的创建、配置和服务端集成的注意事项,帮助读者解决文档混乱和选择困难的问题。
91 1