Netty网络框架(二)

简介: Netty网络框架

Netty网络框架(一)https://developer.aliyun.com/article/1469519


(二) 应用篇

1、HTTP

1)  0.9版本

GET /index.html

服务端只能返回html格式,传输过程只能处理文字。

2)  1.0版本

支持任何格式的内容,包括图像、视频、二进制等等

引入了POST命令、HEAD命令

增加了请求头、状态码,以及权限、缓存等

GET / HTTP/1.0
User-Agent:Mozilla/1.0
Accept: */*
HTTP/1.0 200 OK
Content-Type: text/plain
Content-Encoding: gzip
<html>
  <body> hello world </body>
</html>

a、 Content-Type

服务端通知客户端,当前数据的格式

示例: text/html 、 image/png 、 application/pdf 、 video/mp4

前面是一级类型,后面是二级类型,用斜杠分隔; 还可以增加其他参数,如编码格式。

Content-Type: text/plain; charset=utf-8

b、Content-Encoding

表示数据压缩的方式,gzip、compress、deflate

对应客户端的字段为  Accept-Encoding,代表接收哪些压缩方式

c、缺点和问题

每个TCP连接只能发送一个请求,发送完毕连接关闭,使用成本很高,性能较差。

Connection: keep-alive   - 非标准字段

3)  1.1版本

GET / HTTP/1.1
User-Agent: PostmanRuntime/7.24.1
Accept: */*
Cache-Control: no-cache
Postman-Token: 636ce8a6-7eab-451a-8638-4534a3578095
Host: cn.bing.com
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Content-Length: 45786
Content-Type: text/html; charset=utf-8
Content-Encoding: br
Vary: Accept-Encoding
P3P: CP="NON UNI COM NAV STA LOC CURa DEVa PSAa PSDa OUR IND"
Set-Cookie: SRCHD=AF=NOFORM; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUID=V=2&GUID=5C28FF778A2C4A00B32F5408147038BF&dmnchg=1; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUSR=DOB=20200622; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: _SS=SID=23AE726877396796143D7C99761766C7; domain=.bing.com; path=/
Set-Cookie: _EDGE_S=F=1&SID=23AE726877396796143D7C99761766C7; path=/; httponly; domain=bing.com
Set-Cookie: _EDGE_V=1; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUID=1CB3B15BCBD2637E2C01BFAACAFC6214; samesite=none; path=/; secure; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUIDB=1CB3B15BCBD2637E2C01BFAACAFC6214; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT
X-MSEdge-Ref: Ref A: 71D6F4B3BA9C448EB453341AC182C7BC Ref B: BJ1EDGE0311 Ref C: 2020-06-22T07:03:23Z
Date: Mon, 22 Jun 2020 07:03:23 GMT

a、持久连接,含义为默认不关闭tcp连接,可以被多个请求复用。大多时候,浏览器对同一个域名,允许同时建立6个连接。

b、管道机制,支持客户端发送多个请求,管理请求的顺序的。服务器还是按照接受请求的顺序,返回对应的响应结果。

c、Content-Length,  用来区分数据包的重要字段

d、支持PUT、DELETE、PATCH等命令

缺点和问题

当部分请求耗时较长时,仍会阻塞后续请求的处理速度,这种现象叫做“队头阻塞”/"线头阻塞"。

4)  2.0版本

解决队头阻塞的问题,使用的是多路复用的方式。

说了这么多 上代码来操作一下吧!!

我们的编写思路是这样的

  • 编写初始化服务端
  • 编写自定义初始化器 和 自定义处理器
  • 启动postman 查看我们设置的 http 的响应结果

我们这里有三个类

  • HttpServer   初始化服务端
  • MyHttpHandler   自定义处理器
  • MyHttpInitializer 自定义初始化

首先是 server

我们需要在初始化服务端的时候 设置主从线程模型(Netty中常用)
设置 启动参数 和阻塞队列的长度等设置
设置 初始化

public class HttpServer {
    public static void main(String[] args) {
        //可以自定义线程的数量
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 默认创建的线程数量 = CPU 处理器数量 *2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置连接保持为活动状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new MyHttpInitializer());
        try {
        //设置为异步启动 异步 关闭
            ChannelFuture future = serverBootstrap.bind(9988).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
          //netty的优雅关闭 指 等一切执行完毕之后 慢慢的关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

编写 初始化

继承ChannelInitializer泛型为Channel,用来进行设置出站解码器和入站编码器
使用 codec netty封装好的解码器,这样我们就不用每次定义 解码和编码

public class MyHttpInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //先对应请求解码,后对响应解码
        //pipeline.addLast("decoder", new HttpRequestDecoder());
        //pipeline.addLast("encoder", new HttpRequestEncoder());
        //当然每次我们都要解码编码很麻烦,netty也有为我们提供对应的解决方案,建议直接使用这个 不会出错
        pipeline.addLast("codec", new HttpServerCodec());
        //压缩数据
        pipeline.addLast("compressor", new HttpContentCompressor());
        //聚合完整的信息 参数代表可以处理的最大值 此时的是 512 kb
        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
        pipeline.addLast(new MyHttpHandler());
    }
}

有了初始化,我们还需要一个做事的 那就是 处理器 Handler

netty帮我们封装了返回完整http响应的类 DefaultFullHttpResponse
我们只需要在读取的时候 设置协议,状态码和响应信息,
配置响应头的类型和长度 就可以完成对请求的响应

/*
 * 泛型需要设置为 FullHttpRequest
 * 筛选 message 为此类型的消息才处理
 * */
public class MyHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
        //DefaultFullHttpResponse 是一个默认的完整的http响应
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                Unpooled.wrappedBuffer("hello http netty demo".getBytes())
        );
        //    我们还需要设置响应头
        // 设置请求 响应头字段 可以使用 HttpHeaderNmaes
        // 设置字段时 可以使用
        HttpHeaders headers = response.headers();
        headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + ";charset=UTF-8");
        headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        ctx.write(response);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

启动结果

postman  差看到的请求响应 参数
可以看到我们设置的 http1.1 协议
类型 text/plain  长度 47
响应给客户端的 内容 hello http netty demo

小结

  1. 了解 http 各个版本的解决了什么问题,优缺点,优劣性
  2. 手动编写一个服务端的响应,用postman 查看响应头我们设置的内容
  3. 体验到 netty强大的封装带给我们的便利性

2、WebSocket

websocket是由浏览器发起的

协议标识符 http://127.0.0.1:8080    ws://127.0.0.1:7777

GET ws://127.0.0.1:7777 HTTP/1.1
Host: 127.0.0.1
Upgrade: websocket    # 升级为ws   
Connection: Upgrade   # 此链接需要升级
Sec-WebSocket-key: client-random-string ...  # 标识加密相关信息
HTTP/1.1 101
Upgrade: websocket
Connection: Upgrade

响应码 101   代表本次协议需要更改为websocket

连接建立后,支持文本信息及二进制信息。

Websocket实现的原理:
通过http协议进行连接的建立(握手和回答),建立连接后不再使用http,而tcp自身是支持双向通信的,所以能达到“全双工”的效果。

通信使用的单位叫帧  frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装

【客户端】

var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
     ws.send("hello"); //建立连接后发送数据
}

设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。

Websocket 应用demo

服务端代码

public class WebSocketServer {
    public static void main(String[] args) {
        //可以自定义线程的数量
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 默认创建的线程数量 = CPU 处理器数量 *2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler())
                //当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置连接保持为活动状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new WebSocketInitialzer());
        try {
            ChannelFuture future = serverBootstrap.bind(7777).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端初始化器

public class WebSocketInitialzer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //增加编解码器 的另一种方式
        pipeline.addLast(new HttpServerCodec());
        //    块方式写的处理器 适合处理大数据
        pipeline.addLast(new ChunkedWriteHandler());
        //聚合
        pipeline.addLast(new HttpObjectAggregator(512 * 1024));
        /*
         * 这个时候 我们需要声明我们使用的是 websocket 协议
         * netty为websocket也准备了对应处理器  设置的是访问路径
         * 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
         * 这个handler是将http协议升级为websocket  并且使用 101 作为响应码
         * */
        pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
        pipeline.addLast(new WebSocketHandler());
    }
}

服务端处理器

通信使用的单位叫帧  frame
客户端:发送时将消息切割成多个帧
服务端:接收时,将关联的帧重新组装

/*
 * 泛型 代表的是处理数据的单位
 * TextWebSocketFrame : 文本信息帧
 * */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        //可以直接调用text 拿到文本信息帧中的信息
        System.out.println("msg:" + textWebSocketFrame.text());
        Channel channel = ctx.channel();
        //我们可以使用新建一个对象 将服务端需要返回的信息放入其中 返回即可
        TextWebSocketFrame resp = new TextWebSocketFrame("hello client from websocket server");
        channel.writeAndFlush(resp);
    }
}

websocket前端编写

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var socket;
    //    判断当前浏览器是否支持websocket
    if (!window.WebSocket) {
        alert("不支持websocket")
    } else {
        <!-- 创建websocket 连接对象-->
        socket = new WebSocket("ws://127.0.0.1:7777/hello");
        //设置开始连接的方法
        socket.onopen = function (ev) {
            var tmp = document.getElementById("respText");
            tmp.value = "连接已经开启";
        }
        //设置关闭连接的方法
        socket.onclose = function (ev) {
            var tmp = document.getElementById("respText");
            tmp.value = tmp.value + "\n" + "连接已经关闭";
        }
        //设置接收数据的方法
        socket.onmessage = function (ev) {
            var tmp = document.getElementById("respText");
            tmp.value = tmp.value + "\n" + ev.data;
        }
    }
    function send(message) {
        if (!window.socket) {
            return
        }
        /*
        * 判断socket的状态
        * connecting 正在连接 closing 正在关闭
        * closed 已经关闭或打开连接失败
        * open 连接可以 已经正常通信
        * */
        if (socket.readyState == WebSocket.OPEN) {
            socket.send(message);
        } else {
            alert("连接未开启");
        }
    }
</script>
<!--防止表单自动提交-->
<form onsubmit="return false">
    <textarea name="message" style="height: 400px;width: 400px"></textarea>
    <input type="button" value="发送" onclick="send(this.form.message.value)">
    <textarea id="respText" style="height: 400px;width: 400px"></textarea>
</form>
</body>
</html>

【客户端】

var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
     ws.send("hello"); //建立连接后发送数据
}

设计一个样式
左右两个各有一个文本框,中间放一个发送按钮。
左侧文本框用来发送数据,右侧文本框用来显示数据。

演示效果

启动服务发送消息

小结

  1. websocket 一般用于做可复用连接,http一般做短链接
  2. websocket解决了http连接只能客户端发起的

(三)原理篇

1、ByteBuf

NIO中ByteBuffer的缺点:

A 长度固定,无法动态的扩容和缩容,缺乏灵活性
B 使用一个position记录读写的索引位置,在读写模式切换时需手动调用flip方法,增加了使用的复杂度。
C  功能有限,使用过程中往往需要自行封装

1)分类

按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区direct buffer、复合内存缓冲区composite buffer。

A heap buffer

将数据存储到JVM的堆空间中,实际使用字节数组byte[]来存放。
优点:数据可以快速的创建和释放,并且能够直接访问内部数组
缺点:在读写数据时,需要将数据复制到直接缓冲区  再进行网络传输。

B direct buffer

不在堆中,而是使用了操作系统的本地内存。
优点:在使用Socket进行数据传输过程中,减少一次拷贝,性能更高。
缺点:释放和分配的空间更昂贵,使用时需要更谨慎。

C composite buffer

将两个或多个不同内存的缓冲区合并
优点:可以统一进行操作

应用场景:在通信线程使用缓冲区时,往往使用direct buffer,而业务消息使用缓冲区时,往往使用heap buffer,在解决http包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用

D 池化的概念

对于内存空间分配和释放的复杂度和效率,netty通过内存池的方式来解决。
内存池,可以循环利用ByteBuf,提高使用率。但是管理和维护较复杂。

Unpooled正是非池化缓冲区的工具类。

主要区别在于,池化的内存由netty管理,非池化的内存由GC回收。

E  回收方式

回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。
当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为0时,对象可以回收。

弊端:可能引发内存泄漏。
当对象不可达,JVM会通过GC回收掉,但此时引用计数可能不为0,对象无法归还内存池,会导致内存泄漏。netty只能通过对内存缓冲区进行采样,来检查。

2)工作原理

和ByteBuffer不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做readerIndex,写指针叫做writerIndex。

A 读写分离

当执行clear()方法时,索引位置清空回初始位置,但数据保持不变。

mark和reset方法在ByteBuf中同样适用,如markReaderIndex和resetReaderIndex。

B  深浅拷贝

浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。
深拷贝,拷贝的是整个对象,和原对象之间完全独立。

duplicate和slice方法,达成全部浅拷贝和部分浅拷贝。
copy,部分深拷贝,部分代表的是可读空间。

3)扩容机制

A ByteBuffer的存储

ByteBuffer在put数据时,会校验剩余空间是否不足,如果不足,会抛出异常。

ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("yu".getBytes());
----------------------------------------------------
    public final ByteBuffer put(byte[] src) {
        return put(src, 0, src.length);
    }
    
    // 额外接收偏移量(存储数据的起始位置)  和数据长度
    public ByteBuffer put(byte[] src, int offset, int length) {
        // 校验参数的有效性
        checkBounds(offset, length, src.length);
        // 如果要存储数据的长度 > 剩余可用空间  抛出buffer越界的异常
        if (length > remaining())
            throw new BufferOverflowException();
        // 如果剩余空间足够  计算存储的结束位置 = 偏移量 + 数据长度    
        int end = offset + length;
        for (int i = offset; i < end; i++)
            this.put(src[i]);
        return this;
    }

如果要手动对ByteBuffer扩容,可以在put之前,先校验剩余空间是否足够,如果不足够,创建一个新的ByteBuffer,新的容量确保足够,旧的buffer数据拷贝到新的buffer中,然后继续存储数据。

B ByteBuf的存储和扩容

当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以64作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加4M,这种方式叫做"步进式"。

查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。
ByteBuf buf = Unpooled.buffer(10);
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 11; i++) {
    buf.writeByte(i);
}
----------------------------------------------------   
[ByteBuf类]
public abstract ByteBuf writeByte(int value);
按住Ctrl+Alt快捷键
[AbstractByteBuf子类]
---------------------------------------------------- 
    @Override
    public ByteBuf writeByte(int value) {
        // 确保可写空间足够
        ensureWritable0(1);
        // 写入数据
        _setByte(writerIndex++, value);
        return this;
    }
    
    // 参数为 最小写入数据的大小
    final void ensureWritable0(int minWritableBytes) {
        final int writerIndex = writerIndex();
        // 目标容量 = 当前写操作索引 + 最小写入数据大小
        final int targetCapacity = writerIndex + minWritableBytes;
        // 容量足够  不需扩容
        if (targetCapacity <= capacity()) {
            ensureAccessible();
            return;
        }
        // 容量不足时 如果目标容量 超出最大容量  抛出异常
        if (checkBounds && targetCapacity > maxCapacity) {
            ensureAccessible();
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
    
    // 扩容逻辑
        // 获取可写空间大小
        final int fastWritable = maxFastWritableBytes();
        // 如果 可写空间 >= 所需空间   新的容量=写操作索引+可写空间大小 
        // 如果 可写空间 < 所需空间   计算要扩容的新容量大小 calculateNewCapacity方法
        int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
                : alloc().calculateNewCapacity(targetCapacity, maxCapacity);
        // Adjust to the new capacity.
        
        // 计算完成后 生成新的ByteBuffer
        capacity(newCapacity);
    }
    
    // 获取可写空间大小
    public int maxFastWritableBytes() {
        return writableBytes();
    }
    
[AbstractByteBufAllocator子类]
---------------------------------------------------- 
    // 计算要扩容的新容量大小
    @Override
    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        // 校验参数有效性
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        // 扩容方式的分界点 以4M大小为界
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
        if (minNewCapacity == threshold) {
            return threshold;
        }
        // If over threshold, do not double but just increase by threshold.、
        // 如果所需容量大于4M  按照步进的方式扩容 
        //   举例: 比如 minNewCapacity = 5M  
        if (minNewCapacity > threshold) {
            // newCapacity = 5 / 4 * 4 = 4M  确保是4的倍数
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                // newCapacity = 4 + 4 = 8M;
                newCapacity += threshold;
            }
            return newCapacity;
        }
        // Not over threshold. Double up to 4 MiB, starting from 64.
        // 如果所需容量大于4M  按照64的倍数扩容  找到最接近所需容量的64的倍数
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }
        
        // 保障在最大可接受容量范围内
        return Math.min(newCapacity, maxCapacity);
    }

4)优势

A 池化的方式提高内存使用率

B 提出了复合型缓冲区的整合方案

C 增加了索引,使读写分离,使用更便捷

D 解决了ByteBuffer长度固定的问题,增加了扩容机制

E  用引用计数的方式进行对象回收

2、Channel

1)Channel

channel是通讯的载体,对应通讯的一端,在BIO中对应Socket,NIO中对应SocketChannel,Netty中对应NioSocketChannel,ServerSocket同理。
channelhandler是通道的处理器,一个channel往往有多个handler
channelpipeline是handler的容器,装载并管理handler的顺序(本质是双向链表)

如图,channel创建时,会对应创建一个channelpipeline,pipeline首先会记录一个头部的处理器handler,当pipeline进行分发时,先分发给头部,然后依次执行,执行handler全部执行完成。

同时,channel创建后,会注册进EventLoop之中,EventLoop会监听事件的发生。不同的事件调用handler不同的处理方法,让流程运转起来。

channel生命周期,对应四种状态,分别为:
A)  ChannelUnregistered   已创建但还未被注册到监听器中
B)  ChannelRegistered   已注册到监听器EventLoop中
C)  ChannelActive   连接完成处于活跃状态,此时可以接收和发送数据
D)  ChannelInactive   非活跃状态,代表连接未建立或者已断开

channelhandler生命周期,对应三种状态,分别为:
A)  handlerAdded   把handler添加到pipeline之中
B)  handlerRemoved   从pipeline中移除
C)  exceptionCaught   在处理过程中有错误产生

创建channel源码分析

以服务端启动为例
ChannelFuture future = serverBootstrap.bind(8888).sync();
参数设置
serverBootstrap.channel(NioServerSocketChannel.class)
【AbstractBootstrap】 启动对象的父类
------------------------------------------------------------------
    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }
    
    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        .......
    }
    
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } 
        .......
    }
   
   
 【ReflectiveChannelFactory】  工厂实现类
 ------------------------------------------------------------------   
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

在启动对象调用bind()或connect()方法时,会创建channel
本质上通过反射,使用工厂的反射实现类创建对应的实例,此时实例对象的类型是通过channel参数来设置的

2)ChannelHandler

类层次关系图

入站和出站:
从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。是相对的概念。

从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。

不论是入站还是出站,handler从一端开始,到另一端结束,以责任链的模式依次执行。

责任链模式——"击鼓传花",当请求被不同的接收者处理时,每个接收者都包含对下一个接收者的引用,一个接收者处理完成后,将依次向下传递。

适配器模式——出国时要使用的电源转换器(美国/日本110V   中国220V电压),作为两个不兼容的接口之间的桥梁,将类的接口转换为需要的另外一种接口。

ChannelDuplexHandler是除了入站和出站handler之外的,另一个常用子类。
它同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,如果需要既处理入站事件又处理出站事件,可以继承此类。

serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
 
 ------------------------------------------------------------------
 public class LoggingHandler extends ChannelDuplexHandler{}
 
 ------------------------------------------------------------------
 public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
 
  ------------------------------------------------------------------
  public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}

ChannelHandlerAdapter
提供了额外的isSharable()方法,用来判断handler是否可以被共享到多个pipeline之中。默认情况不共享,如果需要共享,在继承了适配器的handler上,增加注解@Sharable

@Sharable
public class LoggingHandler extends ChannelDuplexHandler {}

ChannelInboundHandler
最重要的方法是channelRead(),在使用时,需要显式的释放ByteBuf相关的内存。使用ReferenceCountUtil是引用计数的工具类。

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // netty中的缓冲区  叫做ByteBuf  -- 对ByteBuffer的封装
        ByteBuf buf = (ByteBuf) msg;
        // 释放ByteBuf内存
        ReferenceCountUtil.release(msg);
    }

为了减少对资源内存的管理,使用SimpleChannelInboundHandler,使用其channelRead0()方法,可以自动释放资源,使用更便利。

SimpleChannelInboundHandler源码
------------------------------------------------
@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

3)ChannelPipeline

pipeline中维护入站和出站链路,两条链路的执行顺序。

handler只负责处理自身的业务逻辑,对通道而言,它是无状态的。通道的信息会保存到handlerContext处理器上下文中,它是连接pipeline和handler之间的中间角色。

pipeline管理的是由handlerContext包裹的handler,也就是说,当添加handler时,先将其转为handlerContext,然后添加到pipeline的双向链表中。头结点叫做HeadContext,尾节点叫做TailContext。

ch.pipeline().addLast(new NettyServerHandler());
 
 [DefaultChannelPipeline]
 ----------------------------------------------------------------
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
        return this;
    }
    
    
    // 关键逻辑
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
            checkMultiplicity(handler);
        // 将handler封装为context
            newCtx = newContext(group, filterName(name, handler), handler);
            // 将context添加到链表尾部
            addLast0(newCtx);
            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 判断当前通道的注册状态,如果是未注册,执行此逻辑
            if (!registered) {
                // 添加一个任务,当通道被注册后,能够回调handlerAdded方法
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            // 如果已被注册  执行调用handlerAdded方法
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    
    
    
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    
    
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    
    // 尾节点会提前声明并创建
    final AbstractChannelHandlerContext tail;
    
    //  prev -> tail   在其中插入newctx
    //  prev -> newctx -> tail   放到倒数第二个位置中  tail节点是保持不变的
    //  依次更改 新节点的前后指针   以及prev节点的后置指针和tail节点的前置指针
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    
    // 构造器中已经提前创建了头尾节点
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

使用pipeline模式的优点:
A)  解耦,让处理器逻辑独立,可以被多个channel共享
B)  channel相关信息,交给context维护
C)  具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序


Netty网络框架(三)https://developer.aliyun.com/article/1469522

目录
相关文章
|
1月前
|
XML 网络协议 前端开发
Netty网络框架(三)
Netty网络框架
28 1
|
存储 设计模式 网络协议
Netty网络框架(一)
Netty网络框架
36 1
|
3月前
|
前端开发 Java 网络安全
【Netty 网络通信】Netty 核心组件
【1月更文挑战第9天】【Netty 网络通信】Netty 核心组件
|
10月前
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
67 0
|
Java
netty学习(二)
Java BIO 编程
95 0
netty学习(二)
|
弹性计算 缓存 网络协议
netty学习(三)
Java NIO 编程
83 0
|
分布式计算 Dubbo 网络协议
netty学习(一)
netty的介绍和应用场景
135 0
netty学习(一)
|
前端开发 网络协议 Java
Netty(一)Netty核心功能与线程模型1
Netty(一)Netty核心功能与线程模型
120 0
Netty(一)Netty核心功能与线程模型2
Netty(一)Netty核心功能与线程模型
50 0
|
编解码 弹性计算 网络协议
【Netty】Netty高性能原理剖析
我们在实际项目中必然会遇到网络间的通信,也就是RPC,大家肯定都用过Dubbo,那么你对Dubbo底层---Netty了解多少呢?对于它为什么性能如此之高又了解多少呢?这篇文章就简单的介绍下Netty高性能原理。
157 0
【Netty】Netty高性能原理剖析