Netty 源码
一、源码构成
二、Netty对三种I/O的支持
首先,我们看下这几种 I/O 的实现。
BIO(阻塞IO):食堂排队打饭模式,持队在窗口,打好才走
NIO(非阻塞IO):点单、等待被叫模式,等待被叫,好了自己去端
AIO(异步IO):包厢模式,点单后菜直接被端上桌。
BIO的实现:
// 如果是BIO的话 EventLoopGroup bossGroup = new OioEventLoopGroup(); EventLoopGroup workerGroup = new OioEventLoopGroup(); b.group(bossGroup, workerGroup).channel(OioServerSocketChannel.class)
其余实现也如上所示,我们 Linux 下的环境基本都是 NIO 的 I/O 实现
1、三种模式如何切换
我们可以看到,上面我们 Netty 实际支持三种 I/O 模式,那么这三种 I/O 是怎么样进行的切换呢?
方法的执行是在:
b.group(bossGroup, workerGroup) // 主要在下面这一行!!! .channel(NioServerSocketChannel.class)
我们观察一下,.channel 方法做了什么
public B channel(Class<? extends C> channelClass) { // 反射工厂的实现 return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); } public ReflectiveChannelFactory(Class<? extends T> clazz) { // 校验当前传入的clazz是否为空 ObjectUtil.checkNotNull(clazz, "clazz"); try { // 通过反射获取对应类的无参构造器 this.constructor = clazz.getConstructor(); } } // 上面获取完无参构造器之后,这里会通过无参构造器调用 newInstance 得到一个类的实例 public T newChannel() { try { //反射创建channel return constructor.newInstance(); } }
简单描述一下,通过 泛型+反射+工厂 实现 IO 模式切换
三、Netty 如何支持 Reactor 模型
Reactor 模型通过注册监听事件的方式,将阻塞模式修改为非阻塞模式,大大的提升了效率。
真正去执行的是 EventLoop,而我们的 EventLoopGroup 则是多个 EventLoop 的集合
1. EventLoop 是线程嘛
首先,我们看一下 EventLoop 的继承关系:
可以明显的看出,我们的 EventLoop 最终继承 ScheduledExecutorService 方法,也就是我们经常经常使用的线程池中的定时线程。
2. 如何实现三种模式的 Reactor 模型
我们知道,一般 Reactor 模型,有三种:
2.1 单线程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //有参的构造方法,创建1个线程
使用上述代码即可创建单线程的 Reactor 模型。
但具体如何实现,我们去看下源码:
// nThreads 传递的入参 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
在源码中明确表明了,如果当前你传入的入参为 **0(不传入参)**时,当前线程为 DEFAULT_EVENT_LOOP_THREADS,如果你传递了入参,则按照你入参的个数申请线程。
PS:DEFAULT_EVENT_LOOP_THREADS 的大小在源码中定义如下:
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
这里的 NettyRuntime.availableProcessors() 最终调用的方法为:
Runtime.getRuntime().availableProcessors())
有兴趣的读者可以自己去看下自己电脑的线程,博主的线程如下:
- IDEA的线程
- 鲁大师的面板
2.2 多线程 Reactor 模式
如果我们传递无入参的话,创建的就是多线程的 Reactor 模式
这种相对于单线程来说,其实提升并不是很大,简单而言就是多个线程的堆积。
比如
- 原来一个线程完成接受、读取、解码、计算、加码、发送整个流程
- 现在多个线程一起完成接受、读取、解码、计算、加码、发送整个流程
相当于复制了几份而已
2.3 主从 Reactor 模式
而我们的主从 Reactor 模式则打破了原有的架构,采用了一个新的架构进行数据的接受和发送
我们创建两个线程组,一个线程组负责接受客户端的消息,另外一个线程组负责读取、解码、计算、加码、发送整个流程。
这样,我们每个线程都有自己要做的事情并且由于接受客户端的消息很快,我们的 mainReactor 线程组会比原来接受更多的客户端消息
2.3.1 主从模式如何实现
Netty 中只需要指定两个 EventLoopGroup 即可,如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(); //无参的构造方法: 线程组,多个线程 EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)
我们的 bossGroup 就是负责接受数据的线程组,而我们的 workerGroup 就是负责处理数据的线程组
2.3.2 主从模式源码实现
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { // 这里会设置父线程组(接受数据的) super.group(parentGroup); ObjectUtil.checkNotNull(childGroup, "childGroup"); this.childGroup = childGroup; return this; } // 类的全路径:io.netty.bootstrap.AbstractBootstrap volatile EventLoopGroup group; public B group(EventLoopGroup group) { ObjectUtil.checkNotNull(group, "group"); this.group = group; return self(); }
经过上述操作,我们传入的 bossGroup 已经被赋值到 io.netty.bootstrap.AbstractBootstrap.group 了。
我们看上述的图片,可以看出来,这个 bossGroup 主要做的是接受客户端的数据连接,但口说无凭,源码中在哪实现了呢
主要在于 bind() 这个方法:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//这个方法比较重要 } final ChannelFuture initAndRegister() { // 在这里的config().group()拿出我们的 EventLoopGroup 进行注册 ChannelFuture regFuture = config().group().register(channel); return regFuture; }
这里的 EventLoopGroup 我们可以看到,他正是我们之前设置的 bossGroup,也就是 io.netty.bootstrap.AbstractBootstrap.group 这个变量
另外,我们的从模式解决读事件的方法,可以直接去
io.netty.bootstrap.ServerBootstrap.childGroup 这个变量去找
3. Netty 给 Channel 分配 NIOEventLoop 的规则
我们上面可以看到,一个 EventLoopGroup 包括多个 EventLoop,
那么我们处理数据,Netty 是如何分配这些 EventLoop 的呢?
在源码 io.netty.util.concurrent.MultithreadEventExecutorGroup.next() 方法中
@Override public EventExecutor next() { return chooser.next(); //chooser是一个选择器,一般的实现会使用策略模式 }
利用 chooser 选择器,选择不同的策略执行不同的 EventLoop
// 选择器的选择 public EventExecutorChooser newChooser(EventExecutor[] executors) { //根据待绑定的executor是否是2的幂次方,做出不同的选择 // 这里的二次方后面会讲到 if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
我们点进去发现,这里的策略一共有两种
- GenericEventExecutorChooser:取模轮询
- PowerOfTwoEventExecutorChooser:幂等运算(数组长度必须是2的幂次方)
3.1 GenericEventExecutorChooser
这里其实没什么好说的,就简单的取模、
private static final class GenericEventExecutorChooser implements EventExecutorChooser { // 原子递增 private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { //递增、取模,取正值,不然可能是负数,另外:有个非常小的缺点,当idx累加成最大值后,有短暂的不公平: //1,2,3,4,5,6,7,0,7 7 7 // 当我们的 idx.getAndIncrement() 到达最大值 Integer.MAX_VALUE 时,会不再增加,也就一直是 return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } }
3.2 PowerOfTwoEventExecutorChooser
这里利用的是 & 的知识点
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override //executors总数必须是2的幂次方(2,4,8...等)才会用,&运算效率更高,同时当idx累加成最大值之后,相比较通用的方式(GenericEventExecutorChooser),更公平 public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } }
读者如果阅读过 HashMap 的源码,在 put 的过程中,采用了 & 的方式去进行数组的确定。
public V put(K key, V value) { return putVal(hash(key), key, value, false, true); } final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; if ((tab = table) == null || (n = tab.length) == 0){ n = (tab = resize()).length; } if ((p = tab[i = (n - 1) & hash]) == null){ tab[i] = newNode(hash, key, value, null) } }
其中,tab[i = (n - 1) & hash] 利用 (n - 1) & hash 将当前的数组均匀的分配到每个数组上,其实效果和我们上面的取模是一样的。
主要是性能上的提高
- 对于取模运算来说,我们计算机需要去进行不断的取余计算,如果数据过大,其实会耗费一些性能。
- 但对于 & 运算,直接操作二进制,这种效率较高
- 这里就不介绍为什么
(n - 1) & hash等价于hash % n了,有兴趣的读者可以翻一下以前介绍HashMap的文章
4. Netty 如何保证跨平台性
我们知道,对于不同的平台,Netty 具有不同的实现,如下:
那这种是如何实现的呢,毕竟我们使用的都是同一套代码
当我们创建 EventLoopGroup 时,我们会有这么一个实现:
public NioEventLoopGroup(int nThreads, Executor executor) { //默认selector,最终实现类似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java //basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default this(nThreads, executor, SelectorProvider.provider()); }
这里面的 SelectorProvider.provider() 就是我们所选择的实现
- 最终的实现是在我们下载的JDK里面,所以不同平台的JDK默认有不同的实现
public static SelectorProvider provider() { // 防止多个线程并发访问 synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { // 这里是通过SPI去查询文件是否配置 // String cn = System.getProperty("java.nio.channels.spi.SelectorProvider"); if (loadProviderFromProperty()) return provider; // META-INF/services // ServiceLoader<SelectorProvider> sl = ServiceLoader.load(SelectorProvider.class, ClassLoader.getSystemClassLoader()); if (loadProviderAsService()) return provider; // 默认会用下面这个 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } } // 类的路径:java.nio.channels.spi.SelectorProvider.DefaultSelectorProvider public static SelectorProvider create() { return new WindowsSelectorProvider(); } // 博主安装的是 Window 的JDK版本,如果是MAC的,下面的将会是这个样子 public static SelectorProvider create() { return new sun.nio.ch.KQueueSelectorProvider(); }
四、Netty 对粘包和半包的支持
1. 什么是粘包、半包
我们的客户端向服务端发送 ABC DED,但我们的服务端收到的消息却不一样,具体情况如下:
- 没有问题的情况下:服务端收到 ABC DEF
- 一次收到了:服务端收到 ABCDEF (TCP粘包)
- 多次收到:第一次 AB,第二次 CD,第三次 EF(TCP拆包)
我们这里实际测试一下,有的读者可能还是不太清楚:
首先,我们的服务端核心代码如下:
public class EchoServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); /*** 服务端读取到网络数据后的处理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + System.getProperty("line.separator"); ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } }
当服务端收到客户端的消息时,会执行 channelRead0 这个方法,我使用了一个 counter 原子变量来记录服务端当前收到的消息数量。
客户端核心代码:
/** * 类说明:粘包/半包问题展示 */ public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** 客户端读取到网络数据后的处理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "ABC,DEF,GHI,JKL,MNO" + System.getProperty("line.separator"); //发送100次 for(int i=0;i<100;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } }
我们可以看到,我们的客户端会发送 100 条 ABC,DEF,GHI,JKL,MNO 的字符串
正常来说,我们的服务端会收到 100 条消息,我们运行一下看看实际情况
服务端的截图:
- 收到的第一条消息:
- 收到的第二条消息
这正是出现的粘包、半包问题
2. 为什么会出现这个问题?
当我们的客户端向服务端发送数据时,并不是立即发送
主要在于:socket会有缓存区,当缓存区达到一定的条件时,才会进行发送至服务端。
当然,本质还是在与:对于TCP来说,TCP是一个流式的协议,消息无边界
3. 如何避免粘包、半包
既然出现问题的原因在于消息无边界,那么只要我们找到当前客户端发送消息的边界就可以了。
3.1 短连接(不推荐)
当我们的客户端和服务端建立连接后,客户端发送一次消息就立即断开,这样我们的消息仅仅只发一次,服务端也只会收到一次。
这种一般不太实用,总不能我客户端每一次都要和服务端重新建立连接吧
3.2 固定长度(不推荐)
我们客户端和服务端发送消息规定一个长度,比如我现在想发送 ABCDEF 100 次,那么我规定的长度就是 6,一次6个字符为一次调用。
Netty 对长度的支持为:FixedLengthFrameDecoder,自动为你分割服务端当前收到的消息
但这种比较浪费空间,比如你当前的长度设置成 100,但最后一次消息分割只有 10,浪费掉了 90 的空间。
3.3 分隔符(推荐)
我们客户端和服务端发送消息规定一个换行符,比如我现在想发送 ABCDEF 100 次,那么我们每一次发送后面都加一个 换行符。
Netty 对换行符的支持为:LineBasedFrameDecoder,自动为你分割服务端当前收到的消息。
当然,如果你不想局限于 换行符,也可以使用自定义的:DelimiterBasedFrameDecoder
但这种的缺点在于有的分隔符需要进行转义,代码写起来较为复杂。
3.4 消息头和消息体(推荐)
参考我们的 HTTP 请求
分别以下几方面:
- 魔数:4 个字节的魔数
- 版本:当前的版本号
- 序列化方式:JDK或者JSON
- 指令类型:
- 请求序号:唯一性
- 对齐填充
- 获取内容的字节数组
- 正文长度
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { // 1. 4 个字节的魔数 out.writeBytes(new byte[]{1, 2, 3, 4}); // 2. 1 字节的版本 out.writeByte(1); // 3. 1 字节的序列化方式:0 JDK 1 JSON out.writeByte(0); // 4. 1 指令类型 out.writeByte(msg.getMessageType()); // 5. 4 请求序号 out.writeInt(msg.sequenceID); // 对齐充填充 out.writeByte(0xff); // 6. 获取内容的字节数组 ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); // 7. 正文长度 int len = bytes.length; out.writeInt(len); // 8. 写入内容 out.writeBytes(bytes); }














