一、编写广播者
Netty 提供了大量的类来支持 UDP 应用程序的编写。下面我们列出一些要用到的类型:
名 称 | 描 述 | |
---|---|---|
interface AddressedEnvelope<M, A extends SocketAddress>extends ReferenceCounted | 定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中 M 是消息类型;A 是地址类型 | |
class DefaultAddressedEnvelope<M, A extends SocketAddress>implements AddressedEnvelope<M,A> | 提供了 interface AddressedEnvelope的默认实现 | |
class DatagramPacketextends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> | 扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器 | |
implements ByteBufHolder | 扩展了 DefaultAddressedEnvelope 以使用 ByteBuf 作为消息数据容器 | |
interface DatagramChannelextends Channel | 扩展了 Netty 的 Channel 抽象以支持 UDP 的多播组管理 | |
class NioDatagramChannnelextends AbstractNioMessageChannelimplements DatagramChannel | 定义了一个能够发送和接收 AddressedEnvelope 消息的 Channel 类型 |
Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
要将 LogEvent 消息转换为 DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展 Netty 的 MessageToMessageEncoder,这个我们之前使用过。
下面我们看一下将要广播的信息:我们将展示正在广播的 3 个日志条目,每一个都将通过一个专门的 DatagramPacket进行广播。
这是该 LogEventBroadcaster 的 ChannelPipeline 的一个高级别视图,展示了 LogEvent 消息是如何流经它的。
我们会将所有的将要被传输的数据都被封装在了 LogEvent 消息中。LogEventBroadcaster 将把这些写入到 Channel 中,并通过 ChannelPipeline 发送它们,在那里它们将会被转换(编码)为 DatagramPacket 消息。最后,他们都将通过 UDP 被广播,并由远程节点(监视器)所捕获。
下面我们自定义一个MessageToMessageEncoder来执行上面所说的转换:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.List;
/**
* Author: lhd
* Data: 2023/6/13
* Annotate:
*/
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
//LogEventEncoder 创建了即将被发送到指定的InetSocketAddress 的 DatagramPacket 消息
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc()
.buffer(file.length + msg.length + 1);
//将文件名写入到 ByteBuf 中
buf.writeBytes(file);
//添加一个SEPARATOR
buf.writeByte(LogEvent.SEPARATOR);
//将日志消息写入ByteBuf 中
buf.writeBytes(msg);
//将一个拥有数据和目的地地址的新 DatagramPacket 添加到出站的消息列表中
out.add(new DatagramPacket(buf, remoteAddress));
}
}
在 LogEventEncoder 被实现之后,我们就该准备引导该服务器,其包括设置各种各样的 ChannelOption,以及在 ChannelPipeline 中安装所需要的 ChannelHandler,这些都要通过主类 LogEventBroadcaster 完成。
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
/**
* Author: lhd
* Data: 2023/6/13
* Annotate: 广播者组件
*/
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
//引导该 NioDatagramChannel(无连接的)
bootstrap.group(group).channel(NioDatagramChannel.class)
//设置 SO_BROADCAST 套接字选项
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws Exception {
//绑定 Channel
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
//启动主处理循环
for (;;) {
long len = file.length();
if (len < pointer) {
// 文件已重置
//如果有必要,将文件指针设置到该文件的最后一个字节
pointer = len;
} else if (len > pointer) {
// 已添加内容
RandomAccessFile raf = new RandomAccessFile(file, "r");
//设置当前的文件指针,以确保没有任何的旧日志被发送
raf.seek(pointer);
String line;
while ((line = raf.readLine()) != null) {
//对于每个日志条目,写入一个LogEvent到Channel 中
ch.writeAndFlush(new LogEvent(null, -1,
file.getAbsolutePath(), line));
}
//存储其在文件中的当前位置
pointer = raf.getFilePointer();
raf.close();
}
try {
//休眠 1 秒,如果被中断,则退出循环;否则重新处理它
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException();
}
//创建并启动一个新的LogEventBroadcaster的实例
LogEventBroadcaster broadcaster = new LogEventBroadcaster(
new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1]));
try {
broadcaster.run();
}
finally {
broadcaster.stop();
}
}
}
这样就完成了该应用程序的广播者组件。
二、编写监视器
我们定义一个 LogEventMonitor。
这个程序将:
(1)接收由 LogEventBroadcaster 广播的 UDP DatagramPacket;
(2)将它们解码为 LogEvent 消息;
(3)将 LogEvent 消息写出到 System.out。
和之前一样,该逻辑由一组自定义的 ChannelHandler 实现,我们扩展 MessageToMessageDecoder。
下面是一个LogEventMonitor 的 ChannelPipeline,展示了LogEvent 是如何流经它的
综上所述,我们需要2个解码器来处理LogEvent ,它们分别是LogEventDecoder 、LogEventHandler。
LogEventDecoder :
1、ChannelPipeline 中的第一个解码器LogEventDecoder 负责将传入的DatagramPacket解码为LogEvent 消息(一个用于转换入站数据的任何 Netty 应用程序的典型设置)
下面我们开始写LogEventDecoder的实现:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
/**
* Author: lhd
* Data: 2023/6/13
* Annotate:LogEventDecoder
*/
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
//获取对 DatagramPacket 中的数据(ByteBuf)的引用
ByteBuf data = datagramPacket.content();
//获取该 SEPARATOR 的索引
int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
//提取文件名
String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
//提取日志消息
String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
//构建一个新的 LogEvent 对象,并且将它添加到(已经解码的消息的)列表中
LogEvent event = new LogEvent(datagramPacket.sender(),
System.currentTimeMillis(), filename, logMsg);
out.add(event);
}
}
LogEventHandler:
2、第二个 ChannelHandler 的工作是对第一个 ChannelHandler 所创建的 LogEvent 消息执行一些处理。在这个场景下,它只是简单地将它们写出到 System.out。在真实世界的应用程序中,可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。
下面我们开始写LogEventHandler的实现:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Author: lhd
* Data: 2023/6/13
* Annotate: LogEventHandler
*/
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //扩展 SimpleChannelInboundHandler 以处理 LogEvent 消息
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//当异常发生时,打印栈跟踪信息,并关闭对应的 Channe
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
//创建 StringBuilder,并且构建输出的字符串
StringBuilder builder = new StringBuilder();
builder.append(event.getReceivedTimestamp());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
//打印 LogEvent 的数据
System.out.println(builder.toString());
}
}
LogEventHandler 将以一种简单易读的格式打印 LogEvent 消息,包括以下的各项:
- 发送方的 InetSocketAddress,其由 IP 地址和端口组成;
- 生成 LogEvent 消息的日志文件的绝对路径名;
- 实际上的日志消息,其代表日志文件中的一行。
按照我们之前讲过的内容,下面我们要做的是将LogEventDecoder 和LogEventHandler 安装到ChannelPipeline。
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
/**
* Author: lhd
* Data: 2023/6/13
* Annotate: 监视器
*/
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
//引导该 NioDatagramChannel
.channel(NioDatagramChannel.class)
//设置套接字选项 SO_BROADCAST
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//将 LogEventDecoder 和 LogEventHandler 添加到 ChannelPipeline 中
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
} )
.localAddress(address);
}
//绑定 Channel。注意,DatagramChannel 是无连接的
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
}
//构造一个新的LogEventMonitor
LogEventMonitor monitor = new LogEventMonitor(
new InetSocketAddress(Integer.parseInt(args[0])));
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync();
} finally {
monitor.stop();
}
}
}
三、运行 LogEventBroadcaster 和 LogEventMonitor
编写完广播者和监视器后,我们在idea中分别启动它们,当我们看到控制台打印:”LogEventMonitor running“时,说明已经启动成功!