前言
又是偷懒的一个周末, 周一了赶紧更一更, 但是万万没想到一写就是三小时....
本篇主要讲解了EventLoop
的基本概念及其实现类的基本使用, 并通过打断点的形式带领大家进行了一次源码阅读, 提出几个问题并进行相应的解决
事件循环对象: EventLoop
EventLoop
本质是一个单线程执行器(同时维护了一个Selector
), 里面有run
方法处理Channel
上源源不断的io事件
EventLoopGroup
EventLoopGroup基本概念
一组EventLoop
就是EventLoopGroup
, Channel
一般会调用EventLoopGroup
的register
方法来绑定其中一个EventLoop
, 后续这个Channel
上的IO事件都由此EventLoop
来处理(保证了IO事件的安全)
如果你看过我之前写的Netty服务端启动流程分析可能会有种熟悉的感觉
我们在配置类MyServer
中使用NioEvenLoopGroup
进行过相关的配置
同时, 我们在该篇文章中对register
方法进行过相应的分析, 接下来我们简单说一下EventLoopGroup
接口的两个常用实现类NioEventLoopGroup
NioEventLoopGroup简单使用
能够实现 IO事件, 普通任务, 定时任务
DefaultEventLoopGroup
也是常用的EventLoopGroup
实现类, 相比于NioEventLoopGroup
它不能实现 IO事件
继承关系图
默认线程数
通过我们的启动类配置截图可以看到, 我们并没有对NioEventLoopGroup
的线程数进行配置, 那怎么确定我们的EventLoopGroup
线程的数量多少呢, 我们点金构造方法看一下
可以看到, 最后我们是执行了父类MultithreadEventLoopGroup
的构造方法
我们倒着看, 如果当前设置的线程数nThreads
为0
, 则调用静态变量DEFAULT_EVENT_LOOP_THREADS
的值当做线程数, 在初始化DEFAULT_EVENT_LOOP_THREADS
的时候, 我们调用了方法查看了当前CPU
的线程数, 然后默认是配置文件里设置的Netty设置的线程数或者CPU
核心数的double
NioEventLoopGroup.next()方法
NioEventLoopGroup.next()方法是用来获取下一个线程的方法, 通过下述案例可以看到一直是两个线程来轮询执行
普通任务
我们可以通过execute()
方法或者submit()
方法来异步的完成普通任务
定时任务
我们可以通过scheduleAtFixedRate()
方法来完成定时任务, 第一个参数是执行的任务, 第二个参数是延迟时间, 第三个参数是间隔多少时间执行一次, 第四个参数是时间单位
IO事件
我们先创建一个启动类, 新建一个NioEventLoopGroup
, 同时关心读事件, 并绑定8080
端口
public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 连接调用后调用 ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 关心读事件 @Override // ByteBuf 类型 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; log.info(byteBuf.toString(Charset.defaultCharset())); } }); } }) .bind(8080); } 复制代码
在搞两个网络连接助手, 去连接我们的Netty
服务端, 分别以2
秒和3
秒/次的速度循环向我们的服务器发送消息
最后我们可以看到, 每一个EventLoop
都只与一个Channel
进行绑定, Channel
发送的请求都会由与其绑定的EventLoop
进行处理
当然, 这是指只有两个连接的情况下, 如果连接数大于EventLoopGroup数量, 是会对其进行复用的
boss 和 worker
针对上面的代码, 我们还可以对EventLoopGroup
进行分工处理
我们查看ServerBootstrap.group()
方法, 可以看到这个方法可以传两个参数, 这两个参数传的NioEventLoopGroup
分别是
boss
: 只负责ServerSocketChannel
上的accept
事件worker
: 只负责socketChannel
上的读写
这也是我们之前MyServer
启动类上进行过的配置
NioEventLoop源码分析
到这里, 相信大家对EventLoop
的实现类也有了一定的了解, 我们正式开始源码活动第一周Netty任务五
, 开始讲解NioEventLoop
的源码
继承实现图
NioEventLoop的重要组成:
- selector
网络异常,图片无法展示|
- 线程
网络异常,图片无法展示|
- 任务队列
网络异常,图片无法展示|
- 处理定时任务的任务队列
网络异常,图片无法展示|
selector什么时候被创建
在NioEventLoop
的构造方法中执行了一个openSelector()
方法
使用provider.openSelector()
方法创建一个selector
对象
至于我们什么时候调用了NioEventLoop
的构造方法, 这里简单讲一下, 里面设计了很多次的方法重写和多态, 建议使用debug
打断点的形式去查看
调用顺序如下:
MyServer
启动类执行NioEventLoopGroup
的构造方法- 调用父类
MultithreadEventLoopGroup
的构造方法
- 执行children[i] = newChild(executor, args);
- 调用
NioEventLoopGroup.newChild()
方法 - 调用
NioEventLoop
构造方法
- 执行
openSelector()
方法
- 执行
NioEventLoop.openSelector()
构造方法
在 NioEventLoop 中为什么有两个 selector 成员变量
还记得我们说过NioEventLoop
的重要组成selector
有两个吗
看回我们的openSelector()
方法, 可以看到这个方法执行之后返回的是unwrappedSelector
, 所以unwrappedSelector
才是真正的NIO
底层的Selector
Selector
内部有一个
keys
集合, 将来我们发生的事件要去集合中获取相关的信息, 这个集合的类型是
Set
结构, 所以
Netty
团队对其进行了改良, 将
Set
结构更改为了
数组结构
还是我们的openSelector()
方法, 我将重点关注的几个方法用红框圈出来了, 他们的作用分别是:
- 获取真正
Selector
的实现类 - 拿到两个私有成员变量
keys
- 这俩 keys 的底层都是 Set 接口
- 使用反射工具类将其设置为可以赋值, 可以修改
- 调用
Field
把Selector
替换为Netty
自己的SelectedKeySet
SelectedKeySet
内部是数组结构网络异常,图片无法展示|
所以Netty
中的两个Selector
的定义分别是:
- selector: Netty自定义的
- umwrappedSelector: 原始的 Selector
定义两个 Selector 的意义: 为了在遍历selectedKeys
的遍历速度更快, Set -> 数组
nio 线程在什么时候启动
我们准备一个测试代码, 然后通过debug
的方式去启动, 进入execute()
方法看一下内部的构造
public static void main(String[] args) { EventLoop eventExecutors = new NioEventLoopGroup().next(); eventExecutors.execute(() -> { System.out.println("ning xuan"); }); } 复制代码
最后会进入到SingleThreadEventExecutor.execute()
方法中, 这个方法的流程如下:
- 判断当前线程是否为 NIO 线程
- 判断方法为 this.inEventLoop();
- 因为我们是首次执行, 不是 NIO 线程, 所以 inEventLoop == false
- 向任务队列
taskQueue
中添加任务
- addTask(task)
- 执行
startThread()
方法 - 最后一个判断: 有任务需要被执行的时候, 唤醒阻塞的 NIO 线程
OK, 我们进入到startThread()
方法
- if判断
- 在线程未启动的时候, state == 1
- 尝试将状态 1 改为 2 , 下次执行该方法时, 就不会继续执行 if 判断内的程序
doStartThread()
方法, 真正创建 NIO 线程并执行任务
考虑到doStartThread()
方法太长了, 所以我只截取了重要步骤
- 创建 NIO 线程并执行任务
- 获取到当前 NIO 线程, thread
- 执行线程
SingleThreadEventExecutor.this.run();
小总结: 首次调用的时候会启动 nio 线程, 并且只会启动一次
任务五
那么Netty
是怎么创建连接的已经讲完了, 打断点的方式和我这种方式都是殊途同归一样的, 最后都会在doStartThread()
方法中执行SingleThreadEventExecutor.this.run();
命令
我们先使用连接工具进行客户端和服务端的连接
接下来我们进入NioEventLoop.run()
方法, 这个方法的作用就是: 死循环去找任务, 执行任务, 重要方法如下:
- strategy = select(curDeadlineNanos);
- 调用 select() 从操作系统中轮询到网络 IO事件
- processSelectedKeys();
- 处理 select 轮询出来的 IO事件, 我们主要看这个
由于这个方法巨长无比,所以只截取一部分
我们本篇讲一下 processSelectedKeys();
方法, 至于select()
下次一定
接下来, 我们进入了processSelectedKeys()
方法
还记得我们之前说过的 selectedKeys 吗, 这个在之前我们进行过初始化, 它的底层是数组结构, 所以哪怕我们没有连接轮询的时候它也不应该为null
接下来, 我们进入到processSelectedKeysOptimized()
方法当中, 在这个方法当中可以看到, Netty
团队先是进行了一次 GC 回收, 然后进入我们当前任务的最终方法processSelectedKey()
null out entry in the array to allow to have it GC'ed once the Channel close 空出数组中的条目,以允许在通道关闭时对其进行垃圾回收
我们主要看 try 部分, 在这个部分可以看到, 他就是在判断时间的类型, 然后执行不同的策略
一共有四种事件, 分别是
- OP_READ: 读事件
- OP_WRITE: 写事件
- OP_CONNECT: 连接事件
- OP_ACCEPT: 接收连接事件
在前面, 讲解 NioEventLoopGroup 的时候, 我们有说过 boss 主要负责 accept 事件, worker 负责读写事件
直接进入到read()
方法中, 看到会遍历执行fireChannelRead()
方法, 这个是读回调的方法
当我们继续执行之后会发现控制台会有相应的输出channelRead
总结: 所以, 我们执行到这里, 一共进行了以下几个步骤: 阻塞 select(select()方法, 下次一定), 方法
processSelectorKeys()
来实现对 IO事件 的响应, 处理, 传播 read 方法就是Netty
的传播方法, 也是回调方法