Netty网络框架(一)https://developer.aliyun.com/article/1469519
(二) 应用篇
1、HTTP
1) 0.9版本
GET /index.html
服务端只能返回html格式,传输过程只能处理文字。
2) 1.0版本
支持任何格式的内容,包括图像、视频、二进制等等
引入了POST命令、HEAD命令
增加了请求头、状态码,以及权限、缓存等
GET / HTTP/1.0 User-Agent:Mozilla/1.0 Accept: */*
HTTP/1.0 200 OK Content-Type: text/plain Content-Encoding: gzip <html> <body> hello world </body> </html>
a、 Content-Type
服务端通知客户端,当前数据的格式
示例: text/html 、 image/png 、 application/pdf 、 video/mp4
前面是一级类型,后面是二级类型,用斜杠分隔; 还可以增加其他参数,如编码格式。
Content-Type: text/plain; charset=utf-8
b、Content-Encoding
表示数据压缩的方式,gzip、compress、deflate
对应客户端的字段为 Accept-Encoding,代表接收哪些压缩方式
c、缺点和问题
每个TCP连接只能发送一个请求,发送完毕连接关闭,使用成本很高,性能较差。
Connection: keep-alive - 非标准字段
3) 1.1版本
GET / HTTP/1.1 User-Agent: PostmanRuntime/7.24.1 Accept: */* Cache-Control: no-cache Postman-Token: 636ce8a6-7eab-451a-8638-4534a3578095 Host: cn.bing.com Accept-Encoding: gzip, deflate, br Connection: keep-alive HTTP/1.1 200 OK Cache-Control: private, max-age=0 Content-Length: 45786 Content-Type: text/html; charset=utf-8 Content-Encoding: br Vary: Accept-Encoding P3P: CP="NON UNI COM NAV STA LOC CURa DEVa PSAa PSDa OUR IND" Set-Cookie: SRCHD=AF=NOFORM; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/ Set-Cookie: SRCHUID=V=2&GUID=5C28FF778A2C4A00B32F5408147038BF&dmnchg=1; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/ Set-Cookie: SRCHUSR=DOB=20200622; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/ Set-Cookie: _SS=SID=23AE726877396796143D7C99761766C7; domain=.bing.com; path=/ Set-Cookie: _EDGE_S=F=1&SID=23AE726877396796143D7C99761766C7; path=/; httponly; domain=bing.com Set-Cookie: _EDGE_V=1; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com Set-Cookie: MUID=1CB3B15BCBD2637E2C01BFAACAFC6214; samesite=none; path=/; secure; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com Set-Cookie: MUIDB=1CB3B15BCBD2637E2C01BFAACAFC6214; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT X-MSEdge-Ref: Ref A: 71D6F4B3BA9C448EB453341AC182C7BC Ref B: BJ1EDGE0311 Ref C: 2020-06-22T07:03:23Z Date: Mon, 22 Jun 2020 07:03:23 GMT
a、持久连接,含义为默认不关闭tcp连接,可以被多个请求复用。大多时候,浏览器对同一个域名,允许同时建立6个连接。
b、管道机制,支持客户端发送多个请求,管理请求的顺序的。服务器还是按照接受请求的顺序,返回对应的响应结果。
c、Content-Length, 用来区分数据包的重要字段
d、支持PUT、DELETE、PATCH等命令
缺点和问题
当部分请求耗时较长时,仍会阻塞后续请求的处理速度,这种现象叫做“队头阻塞”/"线头阻塞"。
4) 2.0版本
解决队头阻塞的问题,使用的是多路复用的方式。
说了这么多 上代码来操作一下吧!!
我们的编写思路是这样的
- 编写初始化服务端
- 编写自定义初始化器 和 自定义处理器
- 启动postman 查看我们设置的 http 的响应结果
我们这里有三个类
- HttpServer 初始化服务端
- MyHttpHandler 自定义处理器
- MyHttpInitializer 自定义初始化
首先是 server
我们需要在初始化服务端的时候 设置主从线程模型(Netty中常用)
设置 启动参数 和阻塞队列的长度等设置
设置 初始化
public class HttpServer { public static void main(String[] args) { //可以自定义线程的数量 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 默认创建的线程数量 = CPU 处理器数量 *2 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度 .option(ChannelOption.SO_BACKLOG, 128) //设置连接保持为活动状态 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new MyHttpInitializer()); try { //设置为异步启动 异步 关闭 ChannelFuture future = serverBootstrap.bind(9988).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //netty的优雅关闭 指 等一切执行完毕之后 慢慢的关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
编写 初始化
继承ChannelInitializer泛型为Channel,用来进行设置出站解码器和入站编码器
使用 codec netty封装好的解码器,这样我们就不用每次定义 解码和编码
public class MyHttpInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //先对应请求解码,后对响应解码 //pipeline.addLast("decoder", new HttpRequestDecoder()); //pipeline.addLast("encoder", new HttpRequestEncoder()); //当然每次我们都要解码编码很麻烦,netty也有为我们提供对应的解决方案,建议直接使用这个 不会出错 pipeline.addLast("codec", new HttpServerCodec()); //压缩数据 pipeline.addLast("compressor", new HttpContentCompressor()); //聚合完整的信息 参数代表可以处理的最大值 此时的是 512 kb pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024)); pipeline.addLast(new MyHttpHandler()); } }
有了初始化,我们还需要一个做事的 那就是 处理器 Handler
netty帮我们封装了返回完整http响应的类 DefaultFullHttpResponse
我们只需要在读取的时候 设置协议,状态码和响应信息,
配置响应头的类型和长度 就可以完成对请求的响应
/* * 泛型需要设置为 FullHttpRequest * 筛选 message 为此类型的消息才处理 * */ public class MyHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception { //DefaultFullHttpResponse 是一个默认的完整的http响应 DefaultFullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer("hello http netty demo".getBytes()) ); // 我们还需要设置响应头 // 设置请求 响应头字段 可以使用 HttpHeaderNmaes // 设置字段时 可以使用 HttpHeaders headers = response.headers(); headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + ";charset=UTF-8"); headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); ctx.write(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
启动结果
postman 差看到的请求响应 参数
可以看到我们设置的 http1.1 协议
类型 text/plain 长度 47
响应给客户端的 内容 hello http netty demo
小结
- 了解 http 各个版本的解决了什么问题,优缺点,优劣性
- 手动编写一个服务端的响应,用postman 查看响应头我们设置的内容
- 体验到 netty强大的封装带给我们的便利性
2、WebSocket
websocket是由浏览器发起的
协议标识符 http://127.0.0.1:8080 ws://127.0.0.1:7777
GET ws://127.0.0.1:7777 HTTP/1.1 Host: 127.0.0.1 Upgrade: websocket # 升级为ws Connection: Upgrade # 此链接需要升级 Sec-WebSocket-key: client-random-string ... # 标识加密相关信息
HTTP/1.1 101 Upgrade: websocket Connection: Upgrade
响应码 101 代表本次协议需要更改为websocket
连接建立后,支持文本信息及二进制信息。
Websocket实现的原理:
通过http协议进行连接的建立(握手和回答),建立连接后不再使用http,而tcp自身是支持双向通信的,所以能达到“全双工”的效果。
通信使用的单位叫帧 frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装
【客户端】
var ws = new WebSocket("ws://127.0.0.1:7777/hello"); ws.onopen = function(ev){ ws.send("hello"); //建立连接后发送数据 }
设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。
Websocket 应用demo
服务端代码
public class WebSocketServer { public static void main(String[] args) { //可以自定义线程的数量 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 默认创建的线程数量 = CPU 处理器数量 *2 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度 .option(ChannelOption.SO_BACKLOG, 128) //设置连接保持为活动状态 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new WebSocketInitialzer()); try { ChannelFuture future = serverBootstrap.bind(7777).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
服务端初始化器
public class WebSocketInitialzer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //增加编解码器 的另一种方式 pipeline.addLast(new HttpServerCodec()); // 块方式写的处理器 适合处理大数据 pipeline.addLast(new ChunkedWriteHandler()); //聚合 pipeline.addLast(new HttpObjectAggregator(512 * 1024)); /* * 这个时候 我们需要声明我们使用的是 websocket 协议 * netty为websocket也准备了对应处理器 设置的是访问路径 * 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了 * 这个handler是将http协议升级为websocket 并且使用 101 作为响应码 * */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); pipeline.addLast(new WebSocketHandler()); } }
服务端处理器
通信使用的单位叫帧 frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装
/* * 泛型 代表的是处理数据的单位 * TextWebSocketFrame : 文本信息帧 * */ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { //可以直接调用text 拿到文本信息帧中的信息 System.out.println("msg:" + textWebSocketFrame.text()); Channel channel = ctx.channel(); //我们可以使用新建一个对象 将服务端需要返回的信息放入其中 返回即可 TextWebSocketFrame resp = new TextWebSocketFrame("hello client from websocket server"); channel.writeAndFlush(resp); } }
websocket前端编写
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> var socket; // 判断当前浏览器是否支持websocket if (!window.WebSocket) { alert("不支持websocket") } else { <!-- 创建websocket 连接对象--> socket = new WebSocket("ws://127.0.0.1:7777/hello"); //设置开始连接的方法 socket.onopen = function (ev) { var tmp = document.getElementById("respText"); tmp.value = "连接已经开启"; } //设置关闭连接的方法 socket.onclose = function (ev) { var tmp = document.getElementById("respText"); tmp.value = tmp.value + "\n" + "连接已经关闭"; } //设置接收数据的方法 socket.onmessage = function (ev) { var tmp = document.getElementById("respText"); tmp.value = tmp.value + "\n" + ev.data; } } function send(message) { if (!window.socket) { return } /* * 判断socket的状态 * connecting 正在连接 closing 正在关闭 * closed 已经关闭或打开连接失败 * open 连接可以 已经正常通信 * */ if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("连接未开启"); } } </script> <!--防止表单自动提交--> <form onsubmit="return false"> <textarea name="message" style="height: 400px;width: 400px"></textarea> <input type="button" value="发送" onclick="send(this.form.message.value)"> <textarea id="respText" style="height: 400px;width: 400px"></textarea> </form> </body> </html>
【客户端】
var ws = new WebSocket("ws://127.0.0.1:7777/hello"); ws.onopen = function(ev){ ws.send("hello"); //建立连接后发送数据 }
设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。
演示效果
启动服务发送消息
小结
- websocket 一般用于做可复用连接,http一般做短链接
- websocket解决了http连接只能客户端发起的
(三)原理篇
1、ByteBuf
NIO中ByteBuffer的缺点:
A 长度固定,无法动态的扩容和缩容,缺乏灵活性
B 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
C 功能有限,使用过程中往往需要自行封装
1)分类
按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer。
A heap buffer
将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放。
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输。
B direct buffer
不在堆中,而是使用了操作系统的本地内存。
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高。
缺点:释放和分配的空间更昂贵,使用时需要更谨慎。
C composite buffer
将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作
应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用
D 池化的概念
对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决。
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂。
Unpooled正是非池化缓冲区的工具类。
主要区别在于,池化的内存由netty管理,非池化的内存由GC回收。
E 回收方式
回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收。
弊端:可能引发内存泄漏。
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样,来检查。
2)工作原理
和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex。
A 读写分离
当执行clear()方法时,索引位置清空回初始位置,但数据保持不变。
mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex。
B 深浅拷贝
浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。
深拷贝,拷贝的是整个对象,和原对象之间完全独立。
duplicate和slice方法,达成全部浅拷贝和部分浅拷贝。
copy,部分深拷贝,部分代表的是可读空间。
3)扩容机制
A ByteBuffer的存储
ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常。
ByteBuffer buffer = ByteBuffer.allocate(8); buffer.put("yu".getBytes()); ---------------------------------------------------- public final ByteBuffer put(byte[] src) { return put(src, 0, src.length); } // 额外接收偏移量(存储数据的起始位置) 和数据长度 public ByteBuffer put(byte[] src, int offset, int length) { // 校验参数的有效性 checkBounds(offset, length, src.length); // 如果要存储数据的长度 > 剩余可用空间 抛出buffer越界的异常 if (length > remaining()) throw new BufferOverflowException(); // 如果剩余空间足够 计算存储的结束位置 = 偏移量 + 数据长度 int end = offset + length; for (int i = offset; i < end; i++) this.put(src[i]); return this; }
如果要手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够,如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据。
B ByteBuf的存储和扩容
当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"。
查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。 ByteBuf buf = Unpooled.buffer(10); System.out.println("capacity: " + buf.capacity()); for (int i = 0; i < 11; i++) { buf.writeByte(i); } ---------------------------------------------------- [ByteBuf类] public abstract ByteBuf writeByte(int value); 按住Ctrl+Alt快捷键 [AbstractByteBuf子类] ---------------------------------------------------- @Override public ByteBuf writeByte(int value) { // 确保可写空间足够 ensureWritable0(1); // 写入数据 _setByte(writerIndex++, value); return this; } // 参数为 最小写入数据的大小 final void ensureWritable0(int minWritableBytes) { final int writerIndex = writerIndex(); // 目标容量 = 当前写操作索引 + 最小写入数据大小 final int targetCapacity = writerIndex + minWritableBytes; // 容量足够 不需扩容 if (targetCapacity <= capacity()) { ensureAccessible(); return; } // 容量不足时 如果目标容量 超出最大容量 抛出异常 if (checkBounds && targetCapacity > maxCapacity) { ensureAccessible(); throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // 扩容逻辑 // 获取可写空间大小 final int fastWritable = maxFastWritableBytes(); // 如果 可写空间 >= 所需空间 新的容量=写操作索引+可写空间大小 // 如果 可写空间 < 所需空间 计算要扩容的新容量大小 calculateNewCapacity方法 int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable : alloc().calculateNewCapacity(targetCapacity, maxCapacity); // Adjust to the new capacity. // 计算完成后 生成新的ByteBuffer capacity(newCapacity); } // 获取可写空间大小 public int maxFastWritableBytes() { return writableBytes(); } [AbstractByteBufAllocator子类] ---------------------------------------------------- // 计算要扩容的新容量大小 @Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { // 校验参数有效性 checkPositiveOrZero(minNewCapacity, "minNewCapacity"); if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } // 扩容方式的分界点 以4M大小为界 final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold.、 // 如果所需容量大于4M 按照步进的方式扩容 // 举例: 比如 minNewCapacity = 5M if (minNewCapacity > threshold) { // newCapacity = 5 / 4 * 4 = 4M 确保是4的倍数 int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { // newCapacity = 4 + 4 = 8M; newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. // 如果所需容量大于4M 按照64的倍数扩容 找到最接近所需容量的64的倍数 int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } // 保障在最大可接受容量范围内 return Math.min(newCapacity, maxCapacity); }
4)优势
A 池化的方式提高内存使用率
B 提出了复合型缓冲区的整合方案
C 增加了索引,使读写分离,使用更便捷
D 解决了ByteBuffer长度固定的问题,增加了扩容机制
E 用引用计数的方式进行对象回收
2、Channel
1)Channel
channel是通讯的载体,对应通讯的一端,在BIO中对应Socket,NIO中对应SocketChannel,Netty中对应NioSocketChannel,ServerSocket同理。
channelhandler是通道的处理器,一个channel往往有多个handler
channelpipeline是handler的容器,装载并管理handler的顺序(本质是双向链表)
如图,channel创建时,会对应创建一个channelpipeline,pipeline首先会记录一个头部的处理器handler,当pipeline进行分发时,先分发给头部,然后依次执行,执行handler全部执行完成。
同时,channel创建后,会注册进EventLoop之中,EventLoop会监听事件的发生。不同的事件调用handler不同的处理方法,让流程运转起来。
channel生命周期,对应四种状态,分别为:
A) ChannelUnregistered 已创建但还未被注册到监听器中
B) ChannelRegistered 已注册到监听器EventLoop中
C) ChannelActive 连接完成处于活跃状态,此时可以接收和发送数据
D) ChannelInactive 非活跃状态,代表连接未建立或者已断开
channelhandler生命周期,对应三种状态,分别为:
A) handlerAdded 把handler添加到pipeline之中
B) handlerRemoved 从pipeline中移除
C) exceptionCaught 在处理过程中有错误产生
创建channel源码分析
以服务端启动为例 ChannelFuture future = serverBootstrap.bind(8888).sync(); 参数设置 serverBootstrap.channel(NioServerSocketChannel.class) 【AbstractBootstrap】 启动对象的父类 ------------------------------------------------------------------ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); ....... } final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } ....... } 【ReflectiveChannelFactory】 工厂实现类 ------------------------------------------------------------------ public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
在启动对象调用bind()或connect()方法时,会创建channel
本质上通过反射,使用工厂的反射实现类创建对应的实例,此时实例对象的类型是通过channel参数来设置的
2)ChannelHandler
类层次关系图
入站和出站:
从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。是相对的概念。
从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。
不论是入站还是出站,handler从一端开始,到另一端结束,以责任链的模式依次执行。
责任链模式——"击鼓传花",当请求被不同的接收者处理时,每个接收者都包含对下一个接收者的引用,一个接收者处理完成后,将依次向下传递。
适配器模式——出国时要使用的电源转换器(美国/日本110V 中国220V电压),作为两个不兼容的接口之间的桥梁,将类的接口转换为需要的另外一种接口。
ChannelDuplexHandler是除了入站和出站handler之外的,另一个常用子类。
它同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,如果需要既处理入站事件又处理出站事件,可以继承此类。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)) ------------------------------------------------------------------ public class LoggingHandler extends ChannelDuplexHandler{} ------------------------------------------------------------------ public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {} ------------------------------------------------------------------ public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}
ChannelHandlerAdapter
提供了额外的isSharable()方法,用来判断handler是否可以被共享到多个pipeline之中。默认情况不共享,如果需要共享,在继承了适配器的handler上,增加注解@Sharable
@Sharable public class LoggingHandler extends ChannelDuplexHandler {}
ChannelInboundHandler
最重要的方法是channelRead(),在使用时,需要显式的释放ByteBuf相关的内存。使用ReferenceCountUtil是引用计数的工具类。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // netty中的缓冲区 叫做ByteBuf -- 对ByteBuffer的封装 ByteBuf buf = (ByteBuf) msg; // 释放ByteBuf内存 ReferenceCountUtil.release(msg); }
为了减少对资源内存的管理,使用SimpleChannelInboundHandler,使用其channelRead0()方法,可以自动释放资源,使用更便利。
SimpleChannelInboundHandler源码 ------------------------------------------------ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
3)ChannelPipeline
pipeline中维护入站和出站链路,两条链路的执行顺序。
handler只负责处理自身的业务逻辑,对通道而言,它是无状态的。通道的信息会保存到handlerContext处理器上下文中,它是连接pipeline和handler之间的中间角色。
pipeline管理的是由handlerContext包裹的handler,也就是说,当添加handler时,先将其转为handlerContext,然后添加到pipeline的双向链表中。头结点叫做HeadContext,尾节点叫做TailContext。
ch.pipeline().addLast(new NettyServerHandler()); [DefaultChannelPipeline] ---------------------------------------------------------------- public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } // 关键逻辑 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { // 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错 checkMultiplicity(handler); // 将handler封装为context newCtx = newContext(group, filterName(name, handler), handler); // 将context添加到链表尾部 addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. // 判断当前通道的注册状态,如果是未注册,执行此逻辑 if (!registered) { // 添加一个任务,当通道被注册后,能够回调handlerAdded方法 newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } // 如果已被注册 执行调用handlerAdded方法 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; } private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } } private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } // 尾节点会提前声明并创建 final AbstractChannelHandlerContext tail; // prev -> tail 在其中插入newctx // prev -> newctx -> tail 放到倒数第二个位置中 tail节点是保持不变的 // 依次更改 新节点的前后指针 以及prev节点的后置指针和tail节点的前置指针 private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } // 构造器中已经提前创建了头尾节点 protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
使用pipeline模式的优点:
A) 解耦,让处理器逻辑独立,可以被多个channel共享
B) channel相关信息,交给context维护
C) 具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序
Netty网络框架(三)https://developer.aliyun.com/article/1469522