【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?

简介: 【Netty 从成神到升仙系列 三】Netty 凭什么成为国内最流行的网络通信框架?

Netty 源码

一、源码构成

二、Netty对三种I/O的支持

首先,我们看下这几种 I/O 的实现。

BIO(阻塞IO):食堂排队打饭模式,持队在窗口,打好才走

NIO(非阻塞IO):点单、等待被叫模式,等待被叫,好了自己去端

AIO(异步IO):包厢模式,点单后菜直接被端上桌。

BIO的实现:

// 如果是BIO的话
EventLoopGroup bossGroup = new OioEventLoopGroup();
EventLoopGroup workerGroup = new OioEventLoopGroup();
b.group(bossGroup, workerGroup).channel(OioServerSocketChannel.class)

其余实现也如上所示,我们 Linux 下的环境基本都是 NIO 的 I/O 实现

1、三种模式如何切换

我们可以看到,上面我们 Netty 实际支持三种 I/O 模式,那么这三种 I/O 是怎么样进行的切换呢?

方法的执行是在:

b.group(bossGroup, workerGroup)
// 主要在下面这一行!!!
.channel(NioServerSocketChannel.class)

我们观察一下,.channel 方法做了什么

public B channel(Class<? extends C> channelClass) {
        // 反射工厂的实现
        return channelFactory(new ReflectiveChannelFactory<C>(  
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }
public ReflectiveChannelFactory(Class<? extends T> clazz) {
        // 校验当前传入的clazz是否为空
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            // 通过反射获取对应类的无参构造器
            this.constructor = clazz.getConstructor();
        }
    }
// 上面获取完无参构造器之后,这里会通过无参构造器调用 newInstance 得到一个类的实例
public T newChannel() {
    try {
        //反射创建channel
        return constructor.newInstance();
    } 
}

简单描述一下,通过 泛型+反射+工厂 实现 IO 模式切换

三、Netty 如何支持 Reactor 模型

Reactor 模型通过注册监听事件的方式,将阻塞模式修改为非阻塞模式,大大的提升了效率。


3068df931e964db283e2d4718113e0da.png



de5ae321671c41eaaa85dbcd49770fc1.png

真正去执行的是 EventLoop,而我们的 EventLoopGroup 则是多个 EventLoop 的集合

1. EventLoop 是线程嘛

首先,我们看一下 EventLoop 的继承关系:



可以明显的看出,我们的 EventLoop 最终继承 ScheduledExecutorService 方法,也就是我们经常经常使用的线程池中的定时线程。

2. 如何实现三种模式的 Reactor 模型

我们知道,一般 Reactor 模型,有三种:

2.1 单线程 Reactor 模式


EventLoopGroup bossGroup = new NioEventLoopGroup(1); //有参的构造方法,创建1个线程

使用上述代码即可创建单线程的 Reactor 模型。

但具体如何实现,我们去看下源码:

// nThreads 传递的入参
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

在源码中明确表明了,如果当前你传入的入参为 **0(不传入参)**时,当前线程为 DEFAULT_EVENT_LOOP_THREADS,如果你传递了入参,则按照你入参的个数申请线程。

PS:DEFAULT_EVENT_LOOP_THREADS 的大小在源码中定义如下:

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

这里的 NettyRuntime.availableProcessors() 最终调用的方法为:

Runtime.getRuntime().availableProcessors())

有兴趣的读者可以自己去看下自己电脑的线程,博主的线程如下:

  • IDEA的线程

  • 鲁大师的面板

2.2 多线程 Reactor 模式

如果我们传递无入参的话,创建的就是多线程的 Reactor 模式

这种相对于单线程来说,其实提升并不是很大,简单而言就是多个线程的堆积。


比如

  • 原来一个线程完成接受、读取、解码、计算、加码、发送整个流程
  • 现在多个线程一起完成接受、读取、解码、计算、加码、发送整个流程

相当于复制了几份而已

2.3 主从 Reactor 模式


而我们的主从 Reactor 模式则打破了原有的架构,采用了一个新的架构进行数据的接受和发送


我们创建两个线程组,一个线程组负责接受客户端的消息,另外一个线程组负责读取、解码、计算、加码、发送整个流程。


这样,我们每个线程都有自己要做的事情并且由于接受客户端的消息很快,我们的 mainReactor 线程组会比原来接受更多的客户端消息

2.3.1 主从模式如何实现

Netty 中只需要指定两个 EventLoopGroup 即可,如下:

EventLoopGroup bossGroup = new NioEventLoopGroup();  //无参的构造方法: 线程组,多个线程
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

我们的 bossGroup 就是负责接受数据的线程组,而我们的 workerGroup 就是负责处理数据的线程组

2.3.2 主从模式源码实现
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    // 这里会设置父线程组(接受数据的)
    super.group(parentGroup);
    ObjectUtil.checkNotNull(childGroup, "childGroup");
    this.childGroup = childGroup;
    return this;
}
// 类的全路径:io.netty.bootstrap.AbstractBootstrap
volatile EventLoopGroup group;
public B group(EventLoopGroup group) {
    ObjectUtil.checkNotNull(group, "group");
    this.group = group;
    return self();
}

经过上述操作,我们传入的 bossGroup 已经被赋值到 io.netty.bootstrap.AbstractBootstrap.group 了。

我们看上述的图片,可以看出来,这个 bossGroup 主要做的是接受客户端的数据连接,但口说无凭,源码中在哪实现了呢

主要在于 bind() 这个方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();//这个方法比较重要
}
final ChannelFuture initAndRegister() {
    // 在这里的config().group()拿出我们的 EventLoopGroup 进行注册
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

这里的 EventLoopGroup 我们可以看到,他正是我们之前设置的 bossGroup,也就是 io.netty.bootstrap.AbstractBootstrap.group 这个变量

另外,我们的从模式解决读事件的方法,可以直接去

io.netty.bootstrap.ServerBootstrap.childGroup 这个变量去找

3. Netty 给 Channel 分配 NIOEventLoop 的规则

我们上面可以看到,一个 EventLoopGroup 包括多个 EventLoop

那么我们处理数据,Netty 是如何分配这些 EventLoop 的呢?

在源码 io.netty.util.concurrent.MultithreadEventExecutorGroup.next() 方法中

@Override
public EventExecutor next() {
    return chooser.next(); //chooser是一个选择器,一般的实现会使用策略模式
}

利用 chooser 选择器,选择不同的策略执行不同的 EventLoop

// 选择器的选择
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    //根据待绑定的executor是否是2的幂次方,做出不同的选择
    // 这里的二次方后面会讲到
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

我们点进去发现,这里的策略一共有两种

  • GenericEventExecutorChooser:取模轮询
  • PowerOfTwoEventExecutorChooser:幂等运算(数组长度必须是2的幂次方

3.1 GenericEventExecutorChooser

这里其实没什么好说的,就简单的取模、

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    // 原子递增
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;
    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }
    @Override
    public EventExecutor next() {
        //递增、取模,取正值,不然可能是负数,另外:有个非常小的缺点,当idx累加成最大值后,有短暂的不公平:
        //1,2,3,4,5,6,7,0,7 7 7 
        // 当我们的 idx.getAndIncrement() 到达最大值 Integer.MAX_VALUE 时,会不再增加,也就一直是
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

3.2 PowerOfTwoEventExecutorChooser

这里利用的是 & 的知识点

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;
    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }
    @Override
    //executors总数必须是2的幂次方(2,4,8...等)才会用,&运算效率更高,同时当idx累加成最大值之后,相比较通用的方式(GenericEventExecutorChooser),更公平
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

读者如果阅读过 HashMap 的源码,在 put 的过程中,采用了 & 的方式去进行数组的确定。

public V put(K key, V value) {
    return putVal(hash(key), key, value, false, true);
}
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0){
        n = (tab = resize()).length;
    }
    if ((p = tab[i = (n - 1) & hash]) == null){
        tab[i] = newNode(hash, key, value, null)
    }
}

其中,tab[i = (n - 1) & hash] 利用 (n - 1) & hash 将当前的数组均匀的分配到每个数组上,其实效果和我们上面的取模是一样的。

主要是性能上的提高

  • 对于取模运算来说,我们计算机需要去进行不断的取余计算,如果数据过大,其实会耗费一些性能。
  • 但对于 & 运算,直接操作二进制,这种效率较高
  • 这里就不介绍为什么 (n - 1) & hash 等价于 hash % n 了,有兴趣的读者可以翻一下以前介绍 HashMap 的文章

4. Netty 如何保证跨平台性

我们知道,对于不同的平台,Netty 具有不同的实现,如下:

2843ae6f9ebe43549233b8ead5ff73a9.png

那这种是如何实现的呢,毕竟我们使用的都是同一套代码

当我们创建 EventLoopGroup 时,我们会有这么一个实现:

public NioEventLoopGroup(int nThreads, Executor executor) {
    //默认selector,最终实现类似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java
    //basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default
    this(nThreads, executor, SelectorProvider.provider());
}

这里面的 SelectorProvider.provider() 就是我们所选择的实现

  • 最终的实现是在我们下载的JDK里面,所以不同平台的JDK默认有不同的实现
public static SelectorProvider provider() {
    // 防止多个线程并发访问
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                       // 这里是通过SPI去查询文件是否配置
                       // String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
                        if (loadProviderFromProperty())
                            return provider;
                      // META-INF/services
                    // ServiceLoader<SelectorProvider> sl =
            ServiceLoader.load(SelectorProvider.class,
                               ClassLoader.getSystemClassLoader());
                        if (loadProviderAsService())
                            return provider;
                      // 默认会用下面这个
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
// 类的路径:java.nio.channels.spi.SelectorProvider.DefaultSelectorProvider
public static SelectorProvider create() {
    return new WindowsSelectorProvider();
}
// 博主安装的是 Window 的JDK版本,如果是MAC的,下面的将会是这个样子
public static SelectorProvider create() {
    return new sun.nio.ch.KQueueSelectorProvider();
}

四、Netty 对粘包和半包的支持

1. 什么是粘包、半包

我们的客户端向服务端发送 ABC DED,但我们的服务端收到的消息却不一样,具体情况如下:

  • 没有问题的情况下:服务端收到 ABC DEF
  • 一次收到了:服务端收到 ABCDEF (TCP粘包)
  • 多次收到:第一次 AB,第二次 CD,第三次 EF(TCP拆包)

我们这里实际测试一下,有的读者可能还是不太清楚:

首先,我们的服务端核心代码如下:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    private AtomicInteger counter = new AtomicInteger(0);
    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }
}

当服务端收到客户端的消息时,会执行 channelRead0 这个方法,我使用了一个 counter 原子变量来记录服务端当前收到的消息数量。

客户端核心代码:

/**
 * 类说明:粘包/半包问题展示
 */
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private AtomicInteger counter = new AtomicInteger(0);
    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    }
    /*** 客户端被通知channel活跃后,做事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "ABC,DEF,GHI,JKL,MNO"
                + System.getProperty("line.separator");
        //发送100次
        for(int i=0;i<100;i++){
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }
}

我们可以看到,我们的客户端会发送 100ABC,DEF,GHI,JKL,MNO 的字符串

正常来说,我们的服务端会收到 100 条消息,我们运行一下看看实际情况

服务端的截图:

  • 收到的第一条消息:
  • 收到的第二条消息

这正是出现的粘包、半包问题

2. 为什么会出现这个问题?

当我们的客户端向服务端发送数据时,并不是立即发送

主要在于:socket会有缓存区,当缓存区达到一定的条件时,才会进行发送至服务端。

当然,本质还是在与:对于TCP来说,TCP是一个流式的协议,消息无边界

3. 如何避免粘包、半包

既然出现问题的原因在于消息无边界,那么只要我们找到当前客户端发送消息的边界就可以了。

3.1 短连接(不推荐)

当我们的客户端和服务端建立连接后,客户端发送一次消息就立即断开,这样我们的消息仅仅只发一次,服务端也只会收到一次。

这种一般不太实用,总不能我客户端每一次都要和服务端重新建立连接吧

3.2 固定长度(不推荐)

我们客户端和服务端发送消息规定一个长度,比如我现在想发送 ABCDEF 100 次,那么我规定的长度就是 6,一次6个字符为一次调用。

Netty 对长度的支持为:FixedLengthFrameDecoder,自动为你分割服务端当前收到的消息

但这种比较浪费空间,比如你当前的长度设置成 100,但最后一次消息分割只有 10,浪费掉了 90 的空间。

3.3 分隔符(推荐)

我们客户端和服务端发送消息规定一个换行符,比如我现在想发送 ABCDEF 100 次,那么我们每一次发送后面都加一个 换行符

Netty 对换行符的支持为:LineBasedFrameDecoder,自动为你分割服务端当前收到的消息。

当然,如果你不想局限于 换行符,也可以使用自定义的:DelimiterBasedFrameDecoder

但这种的缺点在于有的分隔符需要进行转义,代码写起来较为复杂。

3.4 消息头和消息体(推荐)

参考我们的 HTTP 请求

分别以下几方面:

  • 魔数:4 个字节的魔数
  • 版本:当前的版本号
  • 序列化方式:JDK或者JSON
  • 指令类型:
  • 请求序号:唯一性
  • 对齐填充
  • 获取内容的字节数组
  • 正文长度
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 个字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版本
        out.writeByte(1);
        // 3. 1 字节的序列化方式:0 JDK  1 JSON
        out.writeByte(0);
        // 4. 1 指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 请求序号
        out.writeInt(msg.sequenceID);
        // 对齐充填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 正文长度
        int len = bytes.length;
        out.writeInt(len);
        // 8. 写入内容
        out.writeBytes(bytes);
    }


相关文章
|
7月前
|
机器学习/深度学习 算法 量子技术
GQNN框架:让Python开发者轻松构建量子神经网络
为降低量子神经网络的研发门槛并提升其实用性,本文介绍一个名为GQNN(Generalized Quantum Neural Network)的Python开发框架。
177 4
GQNN框架:让Python开发者轻松构建量子神经网络
|
5月前
|
机器学习/深度学习 算法 PyTorch
【Pytorch框架搭建神经网络】基于DQN算法、优先级采样的DQN算法、DQN + 人工势场的避障控制研究(Python代码实现)
【Pytorch框架搭建神经网络】基于DQN算法、优先级采样的DQN算法、DQN + 人工势场的避障控制研究(Python代码实现)
161 1
|
5月前
|
监控 前端开发 安全
Netty 高性能网络编程框架技术详解与实践指南
本文档全面介绍 Netty 高性能网络编程框架的核心概念、架构设计和实践应用。作为 Java 领域最优秀的 NIO 框架之一,Netty 提供了异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。本文将深入探讨其 Reactor 模型、ChannelPipeline、编解码器、内存管理等核心机制,帮助开发者构建高性能的网络应用系统。
410 0
|
5月前
|
机器学习/深度学习 算法 PyTorch
【DQN实现避障控制】使用Pytorch框架搭建神经网络,基于DQN算法、优先级采样的DQN算法、DQN + 人工势场实现避障控制研究(Matlab、Python实现)
【DQN实现避障控制】使用Pytorch框架搭建神经网络,基于DQN算法、优先级采样的DQN算法、DQN + 人工势场实现避障控制研究(Matlab、Python实现)
251 0
|
8月前
|
机器学习/深度学习 API TensorFlow
BayesFlow:基于神经网络的摊销贝叶斯推断框架
BayesFlow 是一个基于 Python 的开源框架,利用摊销神经网络加速贝叶斯推断,解决传统方法计算复杂度高的问题。它通过训练神经网络学习从数据到参数的映射,实现毫秒级实时推断。核心组件包括摘要网络、后验网络和似然网络,支持摊销后验估计、模型比较及错误检测等功能。适用于流行病学、神经科学、地震学等领域,为仿真驱动的科研与工程提供高效解决方案。其模块化设计兼顾易用性与灵活性,推动贝叶斯推断从理论走向实践。
275 7
BayesFlow:基于神经网络的摊销贝叶斯推断框架
|
9月前
|
弹性计算 网络协议 Java
Netty基础—2.网络编程基础二
本文介绍了网络编程的基本概念和三种主要模式:BIO(阻塞IO)、AIO(异步IO)和NIO(非阻塞IO)。BIO模型通过为每个客户端连接创建一个线程来处理请求,适合客户端较少的情况,但在高并发下性能较差。AIO模型通过异步IO操作,允许操作系统处理IO,适合高并发场景,但编码复杂且Linux支持有限。NIO模型通过Selector实现多路复用,适合高并发且性能要求高的场景。文章还详细介绍了NIO中的Buffer、Selector、Channel等核心组件,并提供了NIO的实战开发流程和代码示例。
|
9月前
|
监控 网络协议 Java
Netty基础—1.网络编程基础一
本文详细介绍了网络通信的基础知识,涵盖OSI七层模型、TCP/IP协议族及其实现细节。首先解释了OSI模型各层功能,如物理层负责数据通路建立与传输,数据链路层提供无差错传输等。接着探讨了TCP/IP协议,包括TCP和UDP的特点、三次握手与四次挥手过程,以及如何通过确认应答和序列号确保数据可靠性。还分析了HTTP请求的传输流程和报文结构,并讨论了短连接与长连接概念。 此外,解析了Linux下的IO模型,包括阻塞IO、非阻塞IO、IO复用(select/poll/epoll)、信号驱动IO和异步IO的特点与区别,强调了epoll在高并发场景下的优势及其水平触发和边缘触发两种工作模式。
|
9月前
|
网络协议 算法 Java
Netty基础—3.基础网络协议
本文详细梳理了计算机网络的基础知识,涵盖从物理层到应用层的各层协议及其功能。内容包括七层模型与四层模型对比、IP地址与子网划分、TCP三次握手及四次挥手过程、Socket编程原理、HTTP/HTTPS协议的工作机制等。同时深入探讨了Linux IO模型(阻塞、非阻塞、IO多路复用)及其应用场景,并分析了select、poll、epoll的区别。此外,还涉及Java IO读写的底层流程及同步异步、阻塞非阻塞的概念。这些知识点为理解网络通信和高性能服务器开发提供了全面的理论支持。
|
11月前
|
监控 安全 Cloud Native
企业网络架构安全持续增强框架
企业网络架构安全评估与防护体系构建需采用分层防御、动态适应、主动治理的方法。通过系统化的实施框架,涵盖分层安全架构(核心、基础、边界、终端、治理层)和动态安全能力集成(持续监控、自动化响应、自适应防护)。关键步骤包括系统性风险评估、零信任网络重构、纵深防御技术选型及云原生安全集成。最终形成韧性安全架构,实现从被动防御到主动免疫的转变,确保安全投入与业务创新的平衡。
|
机器学习/深度学习 算法 PyTorch
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展
本文探讨了图神经网络(GNN)与大型语言模型(LLM)结合在知识图谱问答中的应用。研究首先基于G-Retriever构建了探索性模型,然后深入分析了GNN-RAG架构,通过敏感性研究和架构改进,显著提升了模型的推理能力和答案质量。实验结果表明,改进后的模型在多个评估指标上取得了显著提升,特别是在精确率和召回率方面。最后,文章提出了反思机制和教师网络的概念,进一步增强了模型的推理能力。
779 4
基于图神经网络的大语言模型检索增强生成框架研究:面向知识图谱推理的优化与扩展

热门文章

最新文章