Netty实战(十六)UDP广播事件(二)编写广播者和监视器

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Netty 提供了大量的类来支持 UDP 应用程序的编写

一、编写广播者

Netty 提供了大量的类来支持 UDP 应用程序的编写。下面我们列出一些要用到的类型:

名 称 描 述
interface AddressedEnvelope<M, A extends SocketAddress>extends ReferenceCounted 定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中 M 是消息类型;A 是地址类型
class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A> 提供了 interface AddressedEnvelope的默认实现
class DatagramPacketextends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> 扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器
implements ByteBufHolder 扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器
interface DatagramChannelextends Channel 扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理
class NioDatagramChannnelextends AbstractNioMessageChannelimplements DatagramChannel 定义了一个能够发送和接收 AddressedEnvelope 消息的 Channel 类型

Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

要将 LogEvent 消息转换为 DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展 Netty 的 MessageToMessageEncoder,这个我们之前使用过。

下面我们看一下将要广播的信息:我们将展示正在广播的 3 个日志条目,每一个都将通过一个专门的 DatagramPacket进行广播。
1.png

这是该 LogEventBroadcaster 的 ChannelPipeline 的一个高级别视图,展示了 LogEvent 消息是如何流经它的。
2.png

我们会将所有的将要被传输的数据都被封装在了 LogEvent 消息中。LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远程节点(监视器)所捕获。

下面我们自定义一个MessageToMessageEncoder来执行上面所说的转换:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate:
 */
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    //LogEventEncoder 创建了即将被发送到指定的InetSocketAddress 的 DatagramPacket 消息
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc()
                .buffer(file.length + msg.length + 1);
        //将文件名写入到 ByteBuf 中
        buf.writeBytes(file);
        //添加一个SEPARATOR
        buf.writeByte(LogEvent.SEPARATOR);
        //将日志消息写入ByteBuf 中
        buf.writeBytes(msg);
        //将一个拥有数据和目的地地址的新 DatagramPacket 添加到出站的消息列表中
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

在 LogEventEncoder 被实现之后,我们就该准备引导该服务器,其包括设置各种各样的 ChannelOption,以及在 ChannelPipeline 中安装所需要的 ChannelHandler,这些都要通过主类 LogEventBroadcaster 完成。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: 广播者组件
 */
public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;
    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引导该 NioDatagramChannel(无连接的)
        bootstrap.group(group).channel(NioDatagramChannel.class)
                //设置 SO_BROADCAST 套接字选项
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }
    public void run() throws Exception {
        //绑定 Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //启动主处理循环
        for (;;) {
            long len = file.length();
            if (len < pointer) {
               // 文件已重置
                //如果有必要,将文件指针设置到该文件的最后一个字节
                pointer = len;
            } else if (len > pointer) {
                // 已添加内容
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                //设置当前的文件指针,以确保没有任何的旧日志被发送
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    //对于每个日志条目,写入一个LogEvent到Channel 中
                    ch.writeAndFlush(new LogEvent(null, -1,
                            file.getAbsolutePath(), line));
                }
                //存储其在文件中的当前位置
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                //休眠 1 秒,如果被中断,则退出循环;否则重新处理它
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }
        //创建并启动一个新的LogEventBroadcaster的实例
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",
                        Integer.parseInt(args[0])), new File(args[1]));
        try {
            broadcaster.run();
        }
        finally {
            broadcaster.stop();
        }
    }
}

这样就完成了该应用程序的广播者组件。

二、编写监视器

我们定义一个 LogEventMonitor。

这个程序将:
(1)接收由 LogEventBroadcaster 广播的 UDP DatagramPacket;
(2)将它们解码为 LogEvent 消息;
(3)将 LogEvent 消息写出到 System.out。

和之前一样,该逻辑由一组自定义的 ChannelHandler 实现,我们扩展 MessageToMessageDecoder。

下面是一个LogEventMonitor 的 ChannelPipeline,展示了LogEvent 是如何流经它的
3.png

综上所述,我们需要2个解码器来处理LogEvent ,它们分别是LogEventDecoder 、LogEventHandler。

LogEventDecoder :

1、ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket解码为LogEvent 消息(一个用于转换入站数据的任何 Netty 应用程序的典型设置)

下面我们开始写LogEventDecoder的实现:


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.util.List;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate:LogEventDecoder
 */
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {

    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        //获取对 DatagramPacket 中的数据(ByteBuf)的引用
        ByteBuf data = datagramPacket.content();
        //获取该 SEPARATOR 的索引
        int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        //提取文件名
        String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
        //提取日志消息
        String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
        //构建一个新的 LogEvent 对象,并且将它添加到(已经解码的消息的)列表中
        LogEvent event = new LogEvent(datagramPacket.sender(),
                System.currentTimeMillis(), filename, logMsg);
        out.add(event);
    }
}

LogEventHandler:

2、第二个 ChannelHandler 的工作是对第一个 ChannelHandler 所创建的 LogEvent 消息执行一些处理。在这个场景下,它只是简单地将它们写出到 System.out。在真实世界的应用程序中,可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。

下面我们开始写LogEventHandler的实现:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: LogEventHandler
 */
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //扩展 SimpleChannelInboundHandler 以处理 LogEvent 消息

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //当异常发生时,打印栈跟踪信息,并关闭对应的 Channe
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        //创建 StringBuilder,并且构建输出的字符串
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        //打印 LogEvent 的数据
        System.out.println(builder.toString());
    }
}

LogEventHandler 将以一种简单易读的格式打印 LogEvent 消息,包括以下的各项:

  • 发送方的 InetSocketAddress,其由 IP 地址和端口组成;
  • 生成 LogEvent 消息的日志文件的绝对路径名;
  • 实际上的日志消息,其代表日志文件中的一行。

按照我们之前讲过的内容,下面我们要做的是将LogEventDecoder 和LogEventHandler 安装到ChannelPipeline。


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.net.InetSocketAddress;

/**
 * Author: lhd
 * Data: 2023/6/13
 * Annotate: 监视器
 */
public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                //引导该 NioDatagramChannel
                .channel(NioDatagramChannel.class)
                //设置套接字选项 SO_BROADCAST
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        //将 LogEventDecoder 和 LogEventHandler 添加到 ChannelPipeline 中
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                } )
                .localAddress(address);
    }
    
    //绑定 Channel。注意,DatagramChannel 是无连接的
    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        //构造一个新的LogEventMonitor
        LogEventMonitor monitor = new LogEventMonitor(
                new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        } finally {
            monitor.stop();
        }
    }
}

三、运行 LogEventBroadcaster 和 LogEventMonitor

编写完广播者和监视器后,我们在idea中分别启动它们,当我们看到控制台打印:”LogEventMonitor running“时,说明已经启动成功!

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
7月前
Netty实战: HTTP文件列表服务器
Netty实战: HTTP文件列表服务器
83 0
|
7月前
|
网络协议 Java 测试技术
阿里内部Netty实战小册,值得拥有
Netty 是一个高性能的 Java 网络通信框架,简化了网络编程并涵盖了最新的Web技术。它提供了一种抽象,降低了底层复杂性,使得更多开发者能接触网络编程。Netty 因其易用性、高效性和广泛的应用场景受到推崇,适合互联网行业从业者学习,有助于理解和开发基于Netty的系统。免费的《Netty实战小册》详细介绍了Netty的各个方面,包括概念、架构、编解码器、网络协议和实际案例,帮助读者深入理解和应用Netty。如需完整版小册,可点击链接获取。
阿里内部Netty实战小册,值得拥有
|
4月前
|
调度
Netty运行原理问题之事件调度工作的问题如何解决
Netty运行原理问题之事件调度工作的问题如何解决
|
5月前
|
存储 网络协议 Ubuntu
【Linux开发实战指南】基于UDP协议的即时聊天室:快速构建登陆、聊天与退出功能
UDP 是一种无连接的、不可靠的传输层协议,位于IP协议之上。它提供了最基本的数据传输服务,不保证数据包的顺序、可靠到达或无重复。与TCP(传输控制协议)相比,UDP具有较低的传输延迟,因为省去了建立连接和确认接收等过程,适用于对实时性要求较高、但能容忍一定数据丢失的场景,如在线视频、语音通话、DNS查询等。 链表 链表是一种动态数据结构,用于存储一系列元素(节点),每个节点包含数据字段和指向下一个节点的引用(指针)。链表分为单向链表、双向链表和循环链表等类型。与数组相比,链表在插入和删除操作上更为高效,因为它不需要移动元素,只需修改节点间的指针即可。但访问链表中的元素不如数组直接,通常需要从
309 2
|
4月前
|
编解码 NoSQL Redis
(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
109 3
|
4月前
|
前端开发 网络协议
Netty实战巅峰:从零构建高性能IM即时通讯系统,解锁并发通信新境界
【8月更文挑战第3天】Netty是一款高性能、异步事件驱动的网络框架,适用于开发高并发网络应用,如即时通讯(IM)系统。本文将指导你利用Netty从零构建高性能IM程序,介绍Netty基础及服务器/客户端设计。服务器端使用`ServerBootstrap`启动,客户端通过`Bootstrap`连接服务器。示例展示了简单的服务器启动过程。通过深入学习,可进一步实现用户认证等功能,打造出更完善的IM系统。
195 1
|
4月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
253 2
|
5月前
|
网络协议 网络架构
【网络编程入门】TCP与UDP通信实战:从零构建服务器与客户端对话(附简易源码,新手友好!)
在了解他们之前我们首先要知道网络模型,它分为两种,一种是OSI,一种是TCP/IP,当然他们的模型图是不同的,如下
227 1
|
3天前
|
监控 网络协议 网络性能优化
不再困惑!一文搞懂TCP与UDP的所有区别
本文介绍网络基础中TCP与UDP的区别及其应用场景。TCP是面向连接、可靠传输的协议,适用于HTTP、FTP等需要保证数据完整性的场景;UDP是无连接、不可靠但速度快的协议,适合DNS、RIP等对实时性要求高的应用。文章通过对比两者在连接方式、可靠性、速度、流量控制和数据包大小等方面的差异,帮助读者理解其各自特点与适用场景。
|
13天前
|
存储 网络协议 安全
用于 syslog 收集的协议:TCP、UDP、RELP
系统日志是从Linux/Unix设备及网络设备生成的日志,可通过syslog服务器集中管理。日志传输支持UDP、TCP和RELP协议。UDP无连接且不可靠,不推荐使用;TCP可靠,常用于rsyslog和syslog-ng;RELP提供可靠传输和反向确认。集中管理日志有助于故障排除和安全审计,EventLog Analyzer等工具可自动收集、解析和分析日志。