1. 小叙
Netty, 一个神奇的网络应用程序框架!
异步 和 事件驱动 是它的精髓,整个框架都是围绕这两个主题展开。
Netty 最初由JBoss公司开发,现在由Netty项目社区开发和维护,源代码放在 Github Netty仓库。最新版是5.0,不过已被官方废弃,具体原因可以从官方 Netty Issues 4466 查看,这里不再赘述。官方推荐版本是4.x,而本指南使用的是4.1.65版本。闲话不多说,下面开始进入正题。
2. 背景
相较于编写传统的BIO阻塞式网络程序,NIO的性能更佳。早在JDK1.4,Java就已经提供了NIO API,java.nio 包提供了一套完整的机制用于NIO编程。下图展示了两者的区别:
既然JDK可以实现,那我们为什么要使用Netty呢?原因有以下几点:
- Netty基于NIO类库,封装了代码复杂性,可以进行快速开发;
- 关注点分离,解耦业务与网络逻辑;
- 非阻塞网络调用和异步IO使Netty可以支持百万级并发(可以自行测试)。
"抑能知其然,未知其所以然者也。",接下来我们通过拆解Netty内部相关组件来具体讲解。
3. 组件
3.1 Channel, EventLoop and ChannelFuture
- Channel——Sockets
- EventLoop——Control flow, multithreading, concurrency
- ChannelFuture——Asynchronous notification
3.1.1 Channel
基本I/O操作(bind()、connect()、read()和write())依赖于底层网络传输提供的原语。在基于java的网络中,基本构造是类Socket。Channel接口提供了一个API,降低了直接使用Sockets的复杂性。
Channel的生命周期如下所示:
当这些状态发生变化时,将生成相应的事件,如下所示:
3.1.2 EventLoop
EventLoop定义了Netty的核心抽象,用于处理在连接生命周期内发生的事件。
下面说明一下Channels, EventLoops, Threads 和 EventLoopGroups 之间的关系:
- EvenLoopGroup 与 EventLoop 一对多;
- EventLoop 与 Thread 一对一;
- EventLoop 与 Channel 一对多;
- EventLoop 处理的所有I/O事件都在其专用的单个线程上处理。
注意,在这种设计中,给定通道的I/O由同一个线程执行,实际上消除了同步的需要。
3.1.3 ChannelFuture
Netty中的所有I/O操作都是异步的。 由于异步操作不会立即返回结果。为此目的,Netty提供了ChannelFuture,它的 addListener() 方法注册了一个ChannelFutureListener,当异步操作完成时(无论是否成功)通知它。这就是我们常说的基于事件驱动的异步操作的回调函数。
ChannelFuture: 可以将ChannelFuture看作是将来要执行的操作结果的占位符。
3.2 ChannelHandler and ChannelPipeline
3.2.1 ChannelHandler
ChannelHandler 用于处理出站数据和入站数据,主要使用了责任链模式。例如:服务器接收入站数据,由若干个ChannelHandler进行处理。每个ChannelHandler负责其中一个环节,环环相扣,其中一个处理完就会将数据传递到下一个进行处理。
因为一个ChannelHandler可以属于多个ChannelPipeline,所以它可以绑定到多个ChannelHandlerContext实例。可重用的ChannelHandler必须使用@shareable注释;否则,尝试将其添加到多个ChannelPipeline将触发异常。显然,为了安全地使用多个并发通道(即连接),这样的ChannelHandler必须是线程安全的。
// 线程安全的共享处理器 @ChannelHandler.Sharable public class SharableHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Channel read message: " + msg); ctx.fireChannelRead(msg); } } // 非线程安全的共享处理器,并发情况下会产生异常,可以通过同步channelRead()方法解决 @ChannelHandler.Sharable class SharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { count++; System.out.println("channelRead(...) called the " + count + " time"); ctx.fireChannelRead(msg); } }
3.2.2 ChannelPipeline
ChannelPipeline 为 ChannelHandlers 链提供了一个容器,并定义了用于传播入站和出站事件流的API。当创建Channel时,它会自动分配自己的ChannelPipeline。ChannelHandlers按如下方式安装在ChannelPipeline中。
- 在ServerBootstrap中实现ChannelInitializer接口;
- 当ChannelInitializer.initChannel()被调用时,ChannelInitializer将所有的ChannelHandlers安装到管道中;
- 最后,ChannelInitializer会将自己从管道中移除。
3.2.3 ChannelHandlerContext
ChannelHandlerContext 表示 ChannelHandler 和 ChannelPipeline 之间的绑定关系,类比于关系数据库中的中间表,尽管这个对象可以用于获取底层Channel,但它主要用于写出站数据。
在Netty中有两种发送信息的方式。可以直接写入Channel或写入与ChannelHandler关联的ChannelHandlerContext对象,前者使消息从ChannelPipeline的尾部开始,后者使消息从ChannelPipeline的下一个处理器开始。ChannelHandlerContext方法涉及的事件流比其他类上相同名称的方法更短。应该尽可能利用这一点来提供最大的性能。
下图展示了 ChannelHandlerContext, Channel, 和 ChannelPipeline 之间的关联关系。
下图展示了通过ChannelHandlerContext, Channel, 和 ChannelPipeline 进行事件传递的区别。
3.2.4 Encoders and decoders
当使用Netty发送或接收消息时,就会发生数据转换。入站消息将被解码; 即从字节转换为另一种格式,通常是Java对象。如果消息是出站的,反之,它会被编码为字节。这两种转换的原因很简单:网络数据总是一系列字节。
3.3 Bootstrapping
Netty的引导类为应用程序的网络层提供了配置容器。
主要有以下两种引导类:
- Bootstrap——连接远程主机和端口
- ServerBootstrap——绑定本地端口
Bootstrap只需要一个EventLoopGroup,而ServerBootstrap需要两个EventLoopGroup。为什么呢?
ServerBootstrap需要两组不同的通道。第一组包含单个ServerChannel,表示服务器的侦听套接字,绑定到本地端口。第二组包含若干个处理客户端连接而创建的所有通道。请看下图所示:
3.4 ByteBuf
ByteBuf——Netty的数据容器。相对于 Java NIO ByteBuffer 来说,具有以下优点:
- 它可以扩展到用户定义的缓冲区类型;
- 通过内置的复合缓冲区类型实现了透明的零拷贝;
- 容量是按需扩展的(与JDK StringBuilder一样);
- 读取模式和写入模式之间的切换不需要调用ByteBuffer的flip()方法;
- 读写使用不同的索引;
- 支持方法链式调用;
- 支持引用计数;
- 存储池支持。
3.4.1 工作机制
ByteBuf维护两个不同的索引:一个用于读取,一个用于写入。当从ByteBuf中读取数据时,它的readerIndex将按读取的字节数递增。类似地,当写入一个ByteBuf时,它的writerIndex也会递增。以read或write开头的ByteBuf方法会增加相应的索引,而以set和get开头的操作则不会。存储机制如图所示:
3.4.2 ByteBuf使用模式
- HEAP BUFFERS
最常用的ByteBuf模式,将数据存储在JVM的堆空间中。这种模式在不使用池的情况下提供了快速的分配和回收。
- DIRECT BUFFERS
直接缓冲区是另一种ByteBuf模式,将数据存储在直接缓冲区中——即堆外内存,这样就避免了每次进行I/O操作之前,都需要将缓冲区数据复制到直接缓冲区,这也是零拷贝的一种体现。直接缓冲区的主要缺点是,与基于堆的缓冲区相比,它们的分配和释放成本更高。
- COMPOSITE BUFFERS
复合缓冲区是多个ByteBufs的聚合视图,它保持多个ByteBuf的引用,将物理上的多个Buffer组合成了一个逻辑上完整的CompositeByteBuf,而无需额外的复制,这就是它的零拷贝特性。并且可以根据需要添加和删除ByteBuf实例,而JDK的ByteBuffer却无法实现。
// Java NIO 实现复合缓冲区 ByteBuffer[] message = new ByteBuffer[] { header, body }; ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining()); message2.put(header); message2.put(body); message2.flip(); // Netty 实现复合缓冲区 CompositeByteBuf messageBuf = Unpooled.compositeBuffer(); ByteBuf headerBuf = ...; ByteBuf bodyBuf = ...; messageBuf.addComponents(headerBuf, bodyBuf); ... messageBuf.removeComponent(0); for (ByteBuf buf : messageBuf) { System.out.println(buf.toString()); }
3.4.3 派生buffers
派生缓冲区提供一个ByteBuf视图。可以通过调用以下方法之一来创建一个现有缓冲区的视图:
- duplicate()
- slice()
- slice(int, int)
- retaindeDuplicate()
- retaindeSlice()
- retaindeSlice(int, int)
- readRetainedSlice(int)
派生缓冲区将有一个独立的readerIndex、writerIndex和标记索引,而它与NIO缓冲区一样共享内部数据,但同时也要小心,因为派生缓冲区与原生缓冲区的数据是共享的,如果修改派生缓冲区的数据,那么原生缓冲区的数据也会改变。如果需要一个现有缓冲区的全新副本,请调用copy()方法。
注意,duplicate(), slice(), slice(int, int)和readSlice(int)不会对返回的派生缓冲区调用retain(),因此它的引用计数不会增加。如果你需要创建一个增加引用计数的派生缓冲区,考虑使用retainedDuplicate(), retainedSlice(), retainedSlice(int, int)和readRetainedSlice(int),这可能会返回一个产生更少垃圾的缓冲区实现。
3.4.4 ByteBuf分配
- Pooling buffers
为了减少分配和释放内存的开销,Netty使用 ByteBufAllocator 接口实现了池,该接口可用于分配我们所描述的任何ByteBuf模式的实例。可以从Channel(每个Channel可以有一个不同的实例)或通过绑定到ChannelHandler的ChannelHandlerContext获得对ByteBufAllocator的引用。Netty提供了ByteBufAllocator的两种实现: PooledByteBufAllocator 和 UnpooledByteBufAllocator 。前者使用ByteBuf实例来提高性能并最小化内存碎片。后一种实现并不共享ByteBuf实例,而是在每次调用它时返回一个新的实例。
- Unpooled buffers
通过分配新的空间或包装或复制现有字节数组、字节缓冲区和字符串,创建一个新的ByteBuf。
4. 内存
4.1 引用计数
引用计数是一种优化内存使用和性能的技术,这与JVM在回收对象之前,用来判断对象是否存活的引用计数法在理论上是一样的。它通过在对象不再被其他对象引用时释放对象所持有的资源来实现。Netty在版本4中为ByteBuf和ByteBufHolder引入了引用计数,两者都实现了ReferenceCounted接口。
4.2 资源管理
当通过调用ChannelInboundHandler.channelRead()或ChannelOutboundHandler.write(),需要确保没有资源泄漏。Netty使用引用计数来处理池化的ByteBufs。因此,在使用完ByteBuf之后调整引用计数是很重要的。
我们通过设置Java系统属性来开启泄漏检测:java -Dio.netty.leakDetectionLevel=ADVANCED。
有以下四种检测级别,默认检测级别是SIMPLE。
对于入站消息,我们需要在已实现的最后一个ChannelInboundHandler释放消息;而对于出站消息,如果你处理一个write()操作并且丢弃消息,那么你就要负责释放消息。另一种防止资源泄露的方式是使用SimpleChannelInboundHandler来处理入站消息,它将在channelRead0()使用消息后自动释放消息。
4. 代码
- 客户端代码
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } final String host = args[0]; final int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } } @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
- 服务器代码
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>"); return; } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() + " started and listening for connections on " + f.channel().localAddress()); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } } @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
6. 参考
Netty in Action by Norman Maurer, Marvin Wolfthal,2015. The Netty Project .
7. 总结
- 本指南旨在提供入门级教程,梳理了Netty的关键组件,其它细节等待你的发掘;
- 软件开发没有银弹,唯有勤加练习,敢于试错,多学多用,才能融会贯通。
版本 |
日期 |
作者 |
修订原因 |
V1.0.0 |
2021-08-15 |
kunlong-luo |
初始版本 |
V1.0.1 |
2021-09-30 |
kunlong-luo |
增加组件图 |