08篇 要给Nacos的UDP通信功能点个赞

简介: 08篇 要给Nacos的UDP通信功能点个赞

Nacos在服务注册功能中使用到了UDP的通信方式,主要功能就是用来辅助服务实例变化时对客户端进行通知。然而,对于大多数使用Nacos的程序员来说,可能还不知道这个功能,更别说灵活运用了。


看完整个源码的实现,还是要为这一功能点个赞的,可以说非常巧妙和实用。但在实现上有一些不足,文末会进行指出。


本篇文章就带大家从源码层面来分析一下Nacos 2.0中是如何基于UDP协议来实现服务实例变更的通知。


UDP通知基本原理

在分析源码之前,先来从整体上看一下Nacos中UDP的实现原理。


image.png我们知道,UDP协议通信是双向的,没有所谓的客户端和服务端,因此在客户端和服务器端都会开启UDP的监听。客户端是单独开启一个线程来处理UDP消息的。当采用HTTP协议与注册中心通信时,,在客户端调用服务订阅接口时,会将客户端的UPD信息(IP和端口)上送到注册中心,注册中心以PushClient对象来进行封装和存储。


当注册中心有实例变化时,会发布一个ServiceChangeEvent事件,注册中心监听到这个事件之后,会遍历存储的PushClient,基于UDP协议对客户端进行通知。客户端接收到UDP通知,即可更新本地缓存的实例列表。


前面我们已经知道,基于HTTP协议进行服务注册时,会有一个实例更新的时间差,因为是通过客户端定时拉取服务器中的实例列表。如果拉取太频繁,注册中心压力比较大,如果拉取的周期比较长,实例的变化又没办法快速感知到。而UDP协议的通知,恰恰弥补了这一缺点,所以说,要为基于UDP通知这个功能点个赞。


下面就来看看源码层面是如何实现的。


客户端UDP通知监听与处理

客户端在实例化NamingHttpClientProxy时,在其构造方法中会初始化PushReceiver。


public NamingHttpClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListManager serverListManager,
        Properties properties, ServiceInfoHolder serviceInfoHolder) {
    // ...
    // 构建BeatReactor
    this.beatReactor = new BeatReactor(this, properties);
    // 构建UDP端口监听
    this.pushReceiver = new PushReceiver(serviceInfoHolder);
    // ...
}

PushReceiver的构造方法,如下:

public PushReceiver(ServiceInfoHolder serviceInfoHolder) {
    try {
        // 持有ServiceInfoHolder引用
        this.serviceInfoHolder = serviceInfoHolder;
        // 获取UDP端口
        String udpPort = getPushReceiverUdpPort();
        // 根据端口情况,构建DatagramSocket,如果未设置端口,则采用随机端口
        if (StringUtils.isEmpty(udpPort)) {
            this.udpSocket = new DatagramSocket();
        } else {
            this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
        }
        // 创建只有一个线程的ScheduledExecutorService
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            }
        });
        // 执行线程,PushReceiver实现了Runnable接口
        this.executorService.execute(this);
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

PushReceiver的构造方法做了以下操作:


第一、持有ServiceInfoHolder对象引用;

第二、获取UDP端口;

第三、实例化DatagramSocket对象,用于发送和接收Socket数据;

第四,创建线程池,并执行PushReceiver(实现了Runnable接口);

既然PushReceiver实现了Runnable接口,run方法肯定是需要重新实现的:


@Override
public void run() {
    while (!closed) {
        try {
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            // 创建DatagramPacket用于存储接收到的报文
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            // 接收报文,在未接收到报文时会进行线程阻塞
            udpSocket.receive(packet);
            // 将报文转换为json格式
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
            // 将json格式的报文转换为PushPacket对象
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            // 如果符合条件,则调用ServiceInfoHolder进行接收报文处理,并返回应答报文
            if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
                serviceInfoHolder.processServiceInfo(pushPacket.data);
                // send ack to server
                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"\"}";
            } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
                        + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
            }
            // 发送应答报文
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {
            if (closed) {
                return;
            }
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

PushReceiver#run方法主要处理了以下操作:


第一、构建DatagramPacket用于接收报文数据;

第二、通过DatagramSocket#receive方法阻塞等待报文的到来;

第三、DatagramSocket#receive接收到报文之后,方法继续执行;

第四、解析JSON格式的报文为PushPacket对象;

第五、判断报文类型,调用ServiceInfoHolder#processServiceInfo处理接收到的报文信息,在该方法中会将PushPacket转化为ServiceInfo对象;

第六、封装ACK信息(即应答报文信息);

第七、通过DatagramSocket发送应答报文;

上面我们看到了Nacos客户端是如何基于UDP进行报文的监听和处理的,但并未找到客户端是如何将UDP信息上送给注册中心的。下面我们就来梳理一下,上送UDP信息的逻辑。


客户端上送UDP信息

在NamingHttpClientProxy中存储了UDP_PORT_PARAM,即UDP的端口参数信息。


UDP端口信息通过实例查询类接口进行传递,比如:查询实例列表、查询单个健康实例、查询所有实例、订阅接口、订阅的更新任务UpdateTask等接口。在这些方法中都调用了NamingClientProxy#queryInstancesOfService方法。


NamingHttpClientProxy中的queryInstancesOfService方法实现:


@Override
public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,
        boolean healthyOnly) throws NacosException {
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, NamingUtils.getGroupedName(serviceName, groupName));
    params.put(CLUSTERS_PARAM, clusters);
    // 获取UDP端口
    params.put(UDP_PORT_PARAM, String.valueOf(udpPort));
    params.put(CLIENT_IP_PARAM, NetUtils.localIP());
    params.put(HEALTHY_ONLY_PARAM, String.valueOf(healthyOnly));
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    if (StringUtils.isNotEmpty(result)) {
        return JacksonUtils.toObj(result, ServiceInfo.class);
    }
    return new ServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), clusters);
}

但查看源码会发现,查询实例列表、查询单个健康实例、查询所有实例、订阅的更新任务UpdateTask中,UDP端口传递的参数值均为0。只有HTTP协议的订阅接口取值为PushReceiver中的UDP端口号。


@Override

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {

   return queryInstancesOfService(serviceName, groupName, clusters, pushReceiver.getUdpPort(), false);

}

1

2

3

4

在上面的代码中我们已经知道PushReceiver中有一个getPushReceiverUdpPort的方法:


public static String getPushReceiverUdpPort() {

   return System.getenv(PropertyKeyConst.PUSH_RECEIVER_UDP_PORT);

}

1

2

3

很明显,UDP的端口是通过环境变量设置的,对应的key为“push.receiver.udp.port”。


而在1.4.2版本中,HostReactor中的NamingProxy成员变量的queryList方法也会传递UDP端口:


public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        if (StringUtils.isNotEmpty(result)) {
            processServiceJson(result);
        }
    } finally {
        // ...
    }
}

关于1.4.2版本中的实现,大家自行看源码即可,这里不再展开。

完成了客户端UDP基本信息的传递,再来看看服务器端是如何接收和存储这些信息的。

UDP服务存储

服务器端在获取实例列表的接口中,对UDP端口进行了处理。

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public Object list(HttpServletRequest request) throws Exception {
    // ...
    // 如果没有获得UDP端口信息,则默认端口为0
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    // ...
    // 客户端的IP、UDP端口封装到Subscriber对象中
    Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,
            udpPort, clusters);
    return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
}

在getInstanceOperator()方法中会获得当前采用的哪个协议,然后选择对应的处理类:

/**
 * 判断并返回采用V1版本或V2版本的操作服务
 * @return V1:Jraft协议(服务器端);V2:gRpc协议(客户端)
 */
private InstanceOperator getInstanceOperator() {
    return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

这里具体的实现类为InstanceOperatorServiceImpl:

@Override
public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,
        boolean healthOnly) throws Exception {
    ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());
    String clientIP = subscriber.getIp();
    ServiceInfo result = new ServiceInfo(serviceName, cluster);
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();
    // now try to enable the push
    try {
        // 处理支持UDP协议的客户端信息
        if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {
            subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),
                    new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,
                    StringUtils.EMPTY);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        // ...
    }
    // ...
}

当UDP端口大于0,且agent参数定义的客户端支持UDP,则将对应的客户端信息封装到InetSocketAddress对象中,然后放入NamingSubscriberServiceV1Impl中(该类已经被废弃,看后续如何调整该方法实现)。


在NamingSubscriberServiceV1Impl中,会将对应的参数封装为PushClient,存放在Map当中。


public void addClient(String namespaceId, String serviceName, String clusters, String agent,
        InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
    PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant,
            app);
    addClient(client);
}

addClient方法会将PushClient信息存放到ConcurrentMap<String, ConcurrentMap<String, PushClient>>当中:

private final ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
public void addClient(PushClient client) {
        // client is stored by key 'serviceName' because notify event is driven by serviceName change
        String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
        ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
        if (clients == null) {
            clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
            clients = clientMap.get(serviceKey);
        }
        PushClient oldClient = clients.get(client.toString());
        if (oldClient != null) {
            oldClient.refresh();
        } else {
            PushClient res = clients.putIfAbsent(client.toString(), client);
           // ...
        }
    }

此时,UDP的IP、端口信息已经封装到PushClient当中,并存储在NamingSubscriberServiceV1Impl的成员变量当中。


注册中心的UDP通知

当服务端发现某个实例发生了变化,比如主动注销了,会发布一个ServiceChangeEvent事件,UdpPushService会监听到该事件,并进行业务处理。


在UdpPushService的onApplicationEvent方法中,会根据PushClient的具体情况进行移除或发送UDP通知。onApplicationEvent中核心逻辑代码如下:


ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()
        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
    return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
    // 移除僵尸客户端
    if (client.zombie()) {
        Loggers.PUSH.debug("client is zombie: " + client);
        clients.remove(client.toString());
        Loggers.PUSH.debug("client is zombie: " + client);
        continue;
    }
    AckEntry ackEntry;
    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
    byte[] compressData = null;
    Map<String, Object> data = null;
    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
        compressData = (byte[]) (pair.getValue0());
        data = (Map<String, Object>) pair.getValue1();
    }
    // 封装AckEntry对象
    if (compressData != null) {
        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
    } else {
        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
        if (ackEntry != null) {
            cache.put(key, new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));
        }
    }
    // 通过UDP通知其他客户端
    udpPush(ackEntry);
}

事件处理的核心逻辑是就是先判断PushClient的状态信息,如果已经是僵尸客户端,则移除。然后将发送UDP的报文信息和接收客户端的信息封装为AckEntry对象,然后调用udpPush方法,进行UDP消息的发送。


注册中心的UDP接收

在看客户端源码的时候,我们看到客户端不仅会接收UDP请求,而且还会进行应答。那么注册中心怎么接收应答呢?也在UdpPushService类中,该类内部的静态代码块初始化一个UDP的DatagramSocket,用来接收消息:


static {
    try {
        udpSocket = new DatagramSocket();
        Receiver receiver = new Receiver();
        Thread inThread = new Thread(receiver);
        inThread.setDaemon(true);
        inThread.setName("com.alibaba.nacos.naming.push.receiver");
        inThread.start();
    } catch (SocketException e) {
        Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
    }
}

Receiver是一个内部类,实现了Runnable接口,在其run方法中主要就是接收报文信息,然后进行报文消息的判断,根据判断结果,操作本地Map中数据。


UDP设计不足

文章最开始就写到,UDP的设计非常棒,即弥补了HTTP定时拉取的不足,又不至于太影响性能。但目前Nacos在UDP方面有一些不足,也可能是个人的吹毛求疵吧。


第一,文档中没有明确说明UDP的功能如何使用,这导致很多使用者在使用时并不知道UDP功能的存在,以及使用的限制条件。


第二,对云服务不友好。客户端的UDP端口可以自定义,但服务器端的UDP端口是随机获取到。在云服务中,即便是内网服务,UDP端口也是被防火墙限制的。如果服务端的UDP端口是随机获取(客户端默认也是),那么UDP的通信将直接被防火墙拦截掉,而用户根本看不到任何异常(UDP协议不关注客户端是否收到消息)。


至于这两点,说起来算是瑕不掩瑜,读完源码或读过我这篇文章的朋友大概已经知道怎么用了。后续可以给官方提一个Issue,看看是否可以改进。


小结

本文重点从三个方面讲解的Nacos基于UDP的服务实例变更通知:


第一,客户端监听UDP端口,当接收注册中心发来的服务实例变化,可以及时的更新本地的实例缓存;


第二,客户端通过订阅接口,将自身的UDP信息发送给注册中心,注册中心进行存储;


第三,注册中心中实例发生了变化,通过事件机制,将变更信息通过UDP协议发送给客户端。


经过本篇文章,想必你不仅了解了Nacos中UDP协议的通知机制。同时,也开拓了一个新的思路,即如何使用UDP,在什么场景下使用UDP,以及在云服务中使用UDP可能会存在的问题。如果这篇文章对你有帮助,关注或点赞都可以。



相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
相关文章
|
2月前
|
Dubbo Cloud Native 应用服务中间件
阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。
在云原生时代,微服务架构成为主流。阿里云的 Dubbo 和 Nacos 深度整合,提供了高效的服务注册与发现、配置管理等关键功能,简化了微服务治理,提升了系统的灵活性和可靠性。示例代码展示了如何在项目中实现两者的整合,通过 Nacos 动态调整服务状态和配置,适应多变的业务需求。
75 2
|
6月前
|
存储 网络协议 Ubuntu
【Linux开发实战指南】基于UDP协议的即时聊天室:快速构建登陆、聊天与退出功能
UDP 是一种无连接的、不可靠的传输层协议,位于IP协议之上。它提供了最基本的数据传输服务,不保证数据包的顺序、可靠到达或无重复。与TCP(传输控制协议)相比,UDP具有较低的传输延迟,因为省去了建立连接和确认接收等过程,适用于对实时性要求较高、但能容忍一定数据丢失的场景,如在线视频、语音通话、DNS查询等。 链表 链表是一种动态数据结构,用于存储一系列元素(节点),每个节点包含数据字段和指向下一个节点的引用(指针)。链表分为单向链表、双向链表和循环链表等类型。与数组相比,链表在插入和删除操作上更为高效,因为它不需要移动元素,只需修改节点间的指针即可。但访问链表中的元素不如数组直接,通常需要从
329 2
|
3月前
|
网络协议 Linux 网络性能优化
Linux C/C++之TCP / UDP通信
这篇文章详细介绍了Linux下C/C++语言实现TCP和UDP通信的方法,包括网络基础、通信模型、编程示例以及TCP和UDP的优缺点比较。
78 0
Linux C/C++之TCP / UDP通信
|
2月前
|
数据管理 Nacos 开发者
"Nacos架构深度解析:一篇文章带你掌握业务层四大核心功能,服务注册、配置管理、元数据与健康检查一网打尽!"
【10月更文挑战第23天】Nacos 是一个用于服务注册发现和配置管理的平台,支持动态服务发现、配置管理、元数据管理和健康检查。其业务层包括服务注册与发现、配置管理、元数据管理和健康检查四大核心功能。通过示例代码展示了如何在业务层中使用Nacos,帮助开发者构建高可用、动态扩展的微服务生态系统。
143 0
|
4月前
|
C语言
C语言 网络编程(八)并发的UDP服务端 以进程完成功能
这段代码展示了如何使用多进程处理 UDP 客户端和服务端通信。客户端通过发送登录请求与服务端建立连接,并与服务端新建的子进程进行数据交换。服务端则负责接收请求,验证登录信息,并创建子进程处理客户端的具体请求。子进程会创建一个新的套接字与客户端通信,实现数据收发功能。此方案有效利用了多进程的优势,提高了系统的并发处理能力。
|
4月前
|
C语言
C语言 网络编程(七)UDP通信创建流程
本文档详细介绍了使用 UDP 协议进行通信的过程,包括创建套接字、发送与接收消息等关键步骤。首先,通过 `socket()` 函数创建套接字,并设置相应的参数。接着,使用 `sendto()` 函数向指定地址发送数据。为了绑定地址,需要调用 `bind()` 函数。接收端则通过 `recvfrom()` 函数接收数据并获取发送方的地址信息。文档还提供了完整的代码示例,展示了如何实现 UDP 的发送端和服务端功能。
|
4月前
|
C语言
C语言 网络编程(九)并发的UDP服务端 以线程完成功能
这是一个基于UDP协议的客户端和服务端程序,其中服务端采用多线程并发处理客户端请求。客户端通过UDP向服务端发送登录请求,并根据登录结果与服务端的新子线程进行后续交互。服务端在主线程中接收客户端请求并创建新线程处理登录验证及后续通信,子线程创建新的套接字并与客户端进行数据交换。该程序展示了如何利用线程和UDP实现简单的并发服务器架构。
|
5月前
|
网络协议 数据处理 C语言
网络编程进阶:UDP通信
网络编程进阶:UDP通信
313 0
|
6月前
|
网络协议 网络架构
【网络编程入门】TCP与UDP通信实战:从零构建服务器与客户端对话(附简易源码,新手友好!)
在了解他们之前我们首先要知道网络模型,它分为两种,一种是OSI,一种是TCP/IP,当然他们的模型图是不同的,如下
248 1
|
6月前
|
网络协议 Java 数据处理
(一)Java网络编程之计网基础、TCP-IP协议簇、TCP、UDP协议及腾讯QQ通信原理综述
就目前而言,多数网络编程的系列的文章都在围绕着计算机网络体系进行阐述,但其中太多理论概念,对于大部分开发者而言,用途甚微。因此,在本系列中则会以实际开发者的工作为核心,从Java程序员的角度出发,详细解读Java的网络编程核心内容。
118 0