gRPC 基于zookeeper实现负载均衡

本文涉及的产品
应用型负载均衡 ALB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。

导读

gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。

gRPC通过其插件机制,可以很灵活的实现负载均衡、调用链、健康检查、权限认证等模块,本文主要介绍如何通过gRPC定义的接口实现负载均衡功能。

负载均衡方案

RPC服务的除了解决跨语言调用的问题、模块解耦,重要的一点是通过模块的微服务化,可以水平扩展RPC服务的节点,应用层通过异步调用多个服务,降低应用层的延时。

由于RPC服务之间是无状态的,可以水平增加机器,扩展其服务能力。但是,如何利用多个节点呢?

通常做法,可以在RPC前面加上loadbalance(LB),LB 后面挂上对应的服务节点。应用层直接访问LB,RPC节点隐藏在LB后面,对应用层不可见。 这种好处比较明显, 只要把LB的地址以及端口提供给应用层即可, 应用层不用关心LB算法,大大降低了应用调用的复杂性。这种方式在web等短连接应用是比较好的解决方案,因为在重新进行LB连接的时候,可以重新选择后端的服务。

但是,对于长链接的服务来说,这就有很大的问题。
比如,一个长链接应用A 连接到LB上, LB 随机转发到后端的RPC服务B上,由于A是长链接,后面的所有请求都会转到B上,那么就不能起到负载均衡的作用了。你可能想到,最好跟RPC服务保持长链接,不用每次调用都进行连接,释放连接。 gRPC没有提供这种负载均衡的组件, 但是暴露了负载均衡的接口,只要extends NameResolverProvider类,实现接口方法,就能很方便的实现负载均衡模块。

关于gRPC 负载均衡的基础介绍,请参考 https://juejin.im/post/5cd6e69ff265da03a85addb6

下面介绍如何通过zookeeper实现负载均衡的NameResolver

ZkNameResolverProvider实现

public class ZkNameResolverProvider extends NameResolverProvider {
    @Override
    protected boolean isAvailable() {
        return true;
    }

    @Override
    protected int priority() {
        return 5;
    }

    @Nullable
    @Override
    public NameResolver newNameResolver(URI targetUri, Attributes params) {
        return new ZkNameResolver(targetUri);
    }

    @Override
    public String getDefaultScheme() {
        return "zk";
    }
}

ZkNameResolver实现

public class ZkNameResolver extends NameResolver implements Watcher {
    private URI zkUri;
    private ZooKeeper zoo;
    private Listener listener;
    private final int ZK_CONN_TIMEOUT = 3000;
    private final String ZK_PATH = "/grpc_server_list";

    ZkNameResolver(URI zkUri) {
        this.zkUri = zkUri;
    }

    @Override
    public String getServiceAuthority() {
        return zkUri.getAuthority();
    }

    @Override
    public void start(Listener listener) {
        this.listener = listener;
        final CountDownLatch latch = new CountDownLatch(1);
        String zkAddr = zkUri.getHost() + ":" + zkUri.getPort();
        System.out.printf("connect to zookeeper server %s", zkAddr);
        try {
            this.zoo = new ZooKeeper(zkAddr, ZK_CONN_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
        } catch (IOException e) {
            System.out.printf(e);
            System.out.printf("connect to zookeeper failed, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            latch.await();
            System.out.printf("connect to zookeeper succeed");
        } catch (InterruptedException e) {
            System.out.printf(e);
            System.out.printf("CountDownLatch interrupted, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            Stat stat = zoo.exists(ZK_PATH, true);
            if (stat == null) {
                System.out.printf("%s not exists", ZK_PATH);
            } else {
                System.out.printf("%s exists", ZK_PATH);
            }
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
        }

        try {
            List<String> children = zoo.getChildren(ZK_PATH, this);
            addServersToListener(children);
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
            System.out.printf("get children of %s failed [%s], JVM exited", ZK_PATH, e.getMessage());
            System.exit(1);
        }
    }
    // 把zookeeper ZK_PATH的子节点作为rpc的节点地址,注册到gRPC负载均衡服务中
    private void addServersToListener(List<String> servers) {
        System.out.printf("rpc servers:%s", servers);
        ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>();
        for (String server : servers) {
            List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
            String[] address = server.split(":");
            socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1])));
            addressGroups.add(new EquivalentAddressGroup(socketAddresses));
        }
        if (addressGroups.size() > 0) {
            listener.onAddresses(addressGroups, Attributes.EMPTY);
        } else {
            System.out.printf("No servers find, keep looking");
        }
    }

    @Override
    public void shutdown() {
        try {
            zoo.close();
        } catch (InterruptedException e) {
            System.out.printf(e);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.None) {
            System.out.printf("Zookeeper connection expired");
        } else {
            try {
                List<String> children = zoo.getChildren(ZK_PATH, false);
                addServersToListener(children);
                zoo.getChildren(ZK_PATH, true);
            } catch (Exception e) {
                System.out.printf(e);
            }
        }
    }
}
  • 把rpc服务的host:port作为zookeeper的path(/grpc_server_list/rpc_host:50010),zookeeper监听path的创建删除等事件
  • 在rpc节点有上线或者下线时,动态把节点信息从zookeeper上添加或者删除
  • 通过listener.onAddresses把rpc服务地址注册到gRPC负载均衡上

channel 创建

this.channel = ManagedChannelBuilder
// 配置zk地址
.forTarget("zk://zkhost:2181")
// 配置NameResolverProvider实现类
.nameResolverFactory(new ZkNameResolverProvider())
.enableRetry()
.maxRetryAttempts(5)
.keepAliveTime(5, TimeUnit.MINUTES)
.keepAliveWithoutCalls(true)
.keepAliveTimeout(10, TimeUnit.MINUTES)
.idleTimeout(24, TimeUnit.HOURS)
// 轮询策略
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.usePlaintext()
.build();
  • forTarget("zk://zkhost:2181")配置zookeeper链接地址
  • nameResolverFactory(new ZkNameResolverProvider())配置NameResolverProvider实现类,让gRPC通过ZkNameResolverProvider查找可用的服务节点地址
  • 调用rpc服务,会根据配置的loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())策略进行轮询调用对应的后端服务

总结

gRPC 提供了非常灵活的的负载均衡接口,通过实现接口, 可以很方便的实现负载均衡。 通过自定义的负载均衡机制,可以保证调用方与每个rpc保持长链接,大大提高了rpc的网络开销,同时轮询到每个rpc服务上,扩展了rpc的响应能力。
通过zookeeper可以watch机制,监听特定path(/grpc_server_list)子节点的增加或者删除,动态实现服务的注册于下线,大大提高了后端服务水平扩展的便捷性。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
4月前
|
负载均衡 算法 微服务
基于gRPC的注册发现与负载均衡的原理和实战
基于gRPC的注册发现与负载均衡的原理和实战
|
5月前
|
消息中间件 存储 负载均衡
消息队列 MQ使用问题之如何在grpc客户端中设置负载均衡器
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
负载均衡 算法 Java
分布式系列教程(10) -分布式协调工具Zookeeper(负载均衡原理实现)
分布式系列教程(10) -分布式协调工具Zookeeper(负载均衡原理实现)
120 0
|
缓存 负载均衡 算法
Nginx实现负载均衡(整合SpringBoot小demo)
Nginx实现负载均衡(整合SpringBoot小demo)
372 4
Nginx实现负载均衡(整合SpringBoot小demo)
|
负载均衡 Linux 调度
使用keepalived(HA)+LVS实现高可用负载均衡群集,调度器的双机热备
使用keepalived(HA)+LVS实现高可用负载均衡群集,调度器的双机热备
191 1
使用keepalived(HA)+LVS实现高可用负载均衡群集,调度器的双机热备
|
负载均衡 网络协议 应用服务中间件
nginx实现负载均衡
nginx实现负载均衡
339 0
nginx实现负载均衡
|
弹性计算 负载均衡 Kubernetes
【视频】第四讲-负载均衡ALB+实验三-使用ALB实现灰度发布|学习笔记
快速学习【视频】第四讲-负载均衡ALB+实验三-使用ALB实现灰度发布。
【视频】第四讲-负载均衡ALB+实验三-使用ALB实现灰度发布|学习笔记
|
域名解析 tengine 负载均衡
使用nginx的负载均衡机制实现用户无感更新服务
用户请求的转发是接口服务在部署时必须要做的一步。
|
负载均衡 Java 开发者
自定义实现负载均衡|学习笔记
快速学习自定义实现负载均衡
自定义实现负载均衡|学习笔记
|
存储 负载均衡 NoSQL
nginx反向代理做负载均衡以及使用redis实现session共享配置详解
nginx反向代理做负载均衡以及使用redis实现session共享配置详解
566 0
nginx反向代理做负载均衡以及使用redis实现session共享配置详解