一、Netty服务端开发
在开始使用 Netty 开发 TimeServer 之前,先回顾一下使用 NIO 进行服务端开发的步骤。
(1)创建ServerSocketChannel,配置它为非阻塞模式;
(2)绑定监听,配置TCP 参数,例如 backlog 大小;
(3)创建一个独立的I/O线程,用于轮询多路复用器 Selector;
(4)创建 Selector,将之前创建的 ServerSocketChannel 注册到 Selector 上,监听SelectionKey.ACCEPT;
(5)启动I/0线程,在循环体中执行 Selectorselect0)方法,轮询就绪的 Channel;
(6)当轮询到了处于就绪状态的 Channel 时,需要对其进行判断,如果是OP ACCEPT状态,说明是新的客户端接入,则调用 ServerSocketChannel.accept()方法接受新的客户端:(7)设置新接入的客户端链路 SocketChannel 为非阻塞模式,配置其他的一些TCP 参数(8)将SocketChannel注册到 Selector,监听 OP READ 操作位;
(9)如果轮询的Channel为OP READ,则说明 SocketChannel 中有新的就绪的数据包需要读取,则构造ByteBuffer 对象,读取数据包;
(10)如果轮询的Channel为OP WRITE,说明还有数据没有发送完成,需要继续发送。
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; public class nettyServer { Logger logger = LoggerFactory.getLogger(nettyServer.class); @Value("${netty.port}") int port; @PostConstruct public void bind() { EventLoopGroup bossrGroup = new NioEventLoopGroup();//接收客户端传过来的请求 EventLoopGroup wokerGroup = new NioEventLoopGroup();//接收到请求后将后续操作 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossrGroup, wokerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //.childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new serverHandlerAdapter()); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new HeartbeatHandler()); ch.pipeline().addLast(new IdleStateHandler(10, 1, 1)); } }); ChannelFuture f = b.bind(port).sync(); } catch (Exception e) { } }
- NioEventLoopGroup是一个处理I/O操作的多线程事件循环。Netty为不同类型的传输提供了各种EventLoopGroup实现。在本例中,我们实现了一个服务器端应用程序,因此将使用两个NioEventLoopGroup。第一个通常被称为“boss”,接受传入连接。第二个通常称为“worker”,在boss接受连接并将接受的连接注册给worker后,它处理接受的连接的流量。使用多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
- ServerBootstrap是一个设置服务器的助手类。您可以直接使用频道设置服务器。然而,请注意,这是一个乏味的过程,在大多数情况下,您不需要这样做。
- 这里,我们指定使用NioServerSocketChannel类,该类用于实例化一个新的Channel以接受传入连接。
- 此处指定的处理程序将始终由新接受的Channel评估。ChannelInitializer是一个特殊的处理程序,用于帮助用户配置新的Channel。您很可能希望通过添加一些处理程序(如DiscardServerHandler)来配置新频道的ChannelPipeline,以实现网络应用程序。随着应用程序变得复杂,您很可能会向管道中添加更多的处理程序,并最终将这个匿名类提取到顶级类中。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class serverHandlerAdapter extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { // (1) System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); // (2) } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); }
- serverHandlerAdapter扩展 ChannelHandlerAdapter,它是 ChannelHandler 的实现。通道处理程序提供了可以重写的各种事件处理程序方法。目前,扩展 ChannelHandlerAdapter 就足够了,而不是自己实现处理程序接口。
- 我们在此处重写channelRead()事件处理程序方法。每当从客户端接收到新数据时,都会使用收到的消息调用此方法。在此示例中,收到的消息的类型为 ByteBuf。
- 若要实现协议,处理程序必须忽略收到的消息。ByteBuf 是一个引用计数的对象,必须通过该方法显式释放。请记住,处理程序负责释放传递给处理程序的任何引用计数对象。通常,处理程序方法的实现方式如下:DISCARDrelease()channelRead()
二、Netty客户端开发
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class nettyClient { clientHandlerAdapter clientHandlerAdapter = new clientHandlerAdapter(); public void conect(String ip, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress("", port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(clientHandlerAdapter); } }); b.bind(ip, port).sync(); } catch (Exception e) { } } public boolean sendFile() { return clientHandlerAdapter.sendFile(); } }
我们从 connect 方法讲起,在第 13 行首先创建客户端处理 I/0 读写的 NioEventLoopGroup 线程组,然后继续创建客户端辅助启动类 Bootstrap,随后需要对其进行配置。与服务端不同的是,它的 Channel 需要设置为 NioSocketChannel,然后为其添加 handler,此处为了简单直接创建匿名内部类,实现 initChannel 方法,其作用是当创建 NioSocketChannel成功之后,在初始化它的时候将它的 ChannelHandler 设置到 ChannelPipeline 中,用于处理网络I/O事件。
客户端启动辅助类设置完成之后,调用 connect 方法发起异步连接,然后调用同步方法等待连接成功。
最后,当客户端连接关闭之后,客户端主函数退出,在退出之前,释放 NIO 线程组的资源。
下面我们继续看下TimeClientHandler 的代码如何实现
import com.entity.Message; import com.google.gson.Gson; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.springframework.beans.factory.annotation.Autowired; import java.nio.charset.Charset; public class clientHandlerAdapter extends ChannelInboundHandlerAdapter { @Autowired Gson gson; ChannelHandlerContext ctx; public clientHandlerAdapter() { super(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.ctx = ctx; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String jsonObject = gson.toJson(msg); Message message = gson.fromJson(jsonObject, Message.class); String type = message.getMsgType(); if (type != null) { ByteBuf out = getByteBuf(ctx); ctx.channel().writeAndFlush(out); } super.channelRead(ctx, msg); } private ByteBuf getByteBuf(ChannelHandlerContext ctx) { byte[] bytes = "我是发送给客户端的数据:请重启冰箱!".getBytes(Charset.forName("utf-8")); ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(bytes); return buffer; } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } public boolean sendFile() { byte[] bytes=new byte[2048]; ctx.writeAndFlush(bytes); return true; } }
三、处理基于流的传输
在基于流的传输(如 TCP/IP)中,接收的数据存储在套接字接收缓冲区中。遗憾的是,基于流的传输的缓冲区不是数据包队列,而是字节队列。这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只是一堆字节。因此,无法保证您阅读的内容正是您的远程对等方所写的内容
1、示例一:
现在让我们回到TIME客户端示例。我们这里也有同样的问题。32位整数是非常少量的数据,不太可能经常被分割。然而,问题是它可能是碎片化的,并且随着流量的增加,碎片化的可能性会增加。
简单的解决方案是创建一个内部累积缓冲区,并等待所有4个字节被接收到内部缓冲区。以下是修正了问题的TimeClientHandler实现:
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
- ChannelHandler有两个生命周期侦听器方法:handlerAdded()和handlerRemoved()。您可以执行任意(取消)初始化任务,只要它不会长时间阻塞。
- 首先,所有接收到的数据都应该累积到buf中。
- 然后,处理程序必须检查buf是否有足够的数据(在本例中为4个字节),然后继续执行实际的业务逻辑。否则,当更多数据到达时,Netty将再次调用channelRead()方法,最终将累积所有4个字节。