netty和nio

简介: netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。Channelchannel封装了对socket的原子操作,实质是对socket的封装和扩展。

netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。

Channel

channel封装了对socket的原子操作,实质是对socket的封装和扩展。

netty框架自己定义的通道接口,是对java nio channel的封装和扩展,客户端NIO套接字通道是NioSocketChannel,提供的服务器端NIO套接字通道是NioServerSocketChannel。

NioSocketChannel内部管理了一个Java NIO的SocketChanne实例,用来创建SocketChannel实例和设置该实例的属性,并调用Connect 方法向服务端发起 TCP 链接等。

NioServerSocketChannel内部管理了Java NIO的ServerSocketChannel实例,用来创建ServerSocketChannel实例和设置该实例属性,并调用实例的bind方法在指定端口监听客户端的链接。

EventLoopGroup

netty使用了主从多线程的Reactor模型。在netty中每个EventLoopGroup本身是一个线程池,其中包含了自定义个数的 NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop都会关联一个selector选择器。

客户端一般只用一个EventLoopGroup来处理网络IO操作,服务器端一般使用两个,boss-group用来接收客户端发来的TCP链接请求,worker-group用来具体处理网络请求。

当Channel是客户端通道NioSocketChannel时,会注册NioSocketChannel管理的SocketChannel实例到自己关联的NioEventLoop的selector选择器上,然后NioEventLoop对应的线程会通过select命令监控感兴趣的网络读写事件。

当 Channel 是服务端通道NioServerSocketChannel时,NioServerSocketChannel本身会被注册到boss EventLoopGroup里面的某一个NioEventLoop管理的selector选择器,而完成三次握手的链接套接字是被注册到了worker EventLoopGroup里面的某一个NioEventLoop管理的selector选择器上。

多个Channel可以注册到同一个NioEventLoop管理的selector选择器上,这时NioEventLoop对应的单个线程就可以处理多个Channel的就绪事件;但是每个Channel只能注册到一个固定的NioEventLoop管理的selector上。

ChannelPipeline

ChannelPipeline持有一个ChannelHandler的双向链结构。每个Channel都有属于自己的ChannelPipeline,对从Channel 中读取或者要写入 Channel 中的数据进行依次处理。

多ChannelPipeline里面可以复用一个ChannelHandler。

Netty 客户端底层与 Java NIO 对应关系

NioSocketChannel是对Java NIO的SocketChannel的封装,从NioSocketChannel的构造函数可以看出:

public class NioSocketChannel extends AbstractNioByteChannel implements SocketChannel {
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioSocketChannel() {
        this(DEFAULT_SELECTOR_PROVIDER);
    }

    public NioSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    
    private static java.nio.channels.SocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openSocketChannel();
        } catch (IOException var2) {
            throw new ChannelException("Failed to open a socket.", var2);
        }
    }
}

SocketChannel的父类是AbstractNioChannel,在AbstractNioChannel中定义了java nio的SocketChannel类型的成员,并将其模式设置为非阻塞:

public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        ...
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        ch.configureBlocking(false);
    }
}

所以说NioSocketChannel内部是对java nio的SocketChannel的封装扩展,创建NioSocketChannel实例对象时候相当于执行了Java NIO中:

SocketChannel socketChannel = SocketChannel.open();

NioSocketChannel实例的创建和注册是在Bootstrap的connect阶段。Bootstrap的connect代码:

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    ...
    public ChannelFuture connect(InetAddress inetHost, int inetPort) {
        return this.connect(new InetSocketAddress(inetHost, inetPort));
    }
 
    public ChannelFuture connect(SocketAddress remoteAddress) {
        ...
        return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
    }
 
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        ChannelFuture regFuture = this.initAndRegister();
        ...
    }
    
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = this.channelFactory.newChannel();
            this.init(channel);
        } catch (Throwable var3) {}

        ChannelFuture regFuture = this.config().group().register(channel);
        ...
    }

channelFactory.newChannel()创建了NIOSocketChannel实例。

继续跟踪this.config().group().register(channel)到:

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    ...
    public ChannelFuture register(Channel channel) {
        return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
    }
    
    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

最终会跟踪到AbstractNioChannel的doRegister():

public abstract class AbstractNioChannel extends AbstractChannel {
    ...
    protected void doRegister() throws Exception {
        boolean selected = false;

        while(true) {
            try {
                this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException var3) {
                if (selected) {
                    throw var3;
                }

                this.eventLoop().selectNow();
                selected = true;
            }
        }
    }
}    

其中:

this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this)

将NioSocketChannel实例注册到了当前NioEventLoop的选择器中。

从选择器获取就绪的事件是在该客户端套接关联的NioEventLoop里面的做的,每个NioEventLoop里面有一个线程用来循环从选择器里面获取就绪的事件:

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                ...
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {}
        }
    }

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;

        try {
            while(true) {
                ...
                int selectedKeys = selector.select(timeoutMillis);
                ++selectCnt;
                ...
        } catch (CancelledKeyException var13) {}

    }

从选择器选取就绪的事件后,会最终调用processSelectedKey具体处理每个事件:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {}
    }

netty和reactor模型

netty实现单线程reactor:

private EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(group)
    .childHandler(new HeartbeatInitializer());

netty实现多线程reactor:

// 默认线程数为CPU核心数 * 2,*2是因为考虑到超线程CPU包括两个逻辑线程
private EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(boss)
    .childHandler(new HeartbeatInitializer());

netty实现主从多线程reactor:

private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
    .group(boss, work)
    .childHandler(new HeartbeatInitializer());
目录
相关文章
|
4月前
|
设计模式
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
Lettuce的特性和内部实现问题之Netty NIO的性能优于BIO的问题如何解决
|
1月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
3月前
|
Java
Netty BIO/NIO/AIO介绍
Netty BIO/NIO/AIO介绍
|
4月前
|
网络协议 C# 开发者
WPF与Socket编程的完美邂逅:打造流畅网络通信体验——从客户端到服务器端,手把手教你实现基于Socket的实时数据交换
【8月更文挑战第31天】网络通信在现代应用中至关重要,Socket编程作为其实现基础,即便在主要用于桌面应用的Windows Presentation Foundation(WPF)中也发挥着重要作用。本文通过最佳实践,详细介绍如何在WPF应用中利用Socket实现网络通信,包括创建WPF项目、设计用户界面、实现Socket通信逻辑及搭建简单服务器端的全过程。具体步骤涵盖从UI设计到前后端交互的各个环节,并附有详尽示例代码,助力WPF开发者掌握这一关键技术,拓展应用程序的功能与实用性。
150 0
|
4月前
|
存储 网络协议 Java
【Netty 神奇之旅】Java NIO 基础全解析:从零开始玩转高效网络编程!
【8月更文挑战第24天】本文介绍了Java NIO,一种非阻塞I/O模型,极大提升了Java应用程序在网络通信中的性能。核心组件包括Buffer、Channel、Selector和SocketChannel。通过示例代码展示了如何使用Java NIO进行服务器与客户端通信。此外,还介绍了基于Java NIO的高性能网络框架Netty,以及如何用Netty构建TCP服务器和客户端。熟悉这些技术和概念对于开发高并发网络应用至关重要。
88 0
|
7月前
|
Java 应用服务中间件 API
从零手写实现 tomcat-06-servlet bio/thread/nio/netty 池化处理
该文介绍了逐步改进的网络服务器实现,从最初的 BIO 基础版到使用线程池的 BIO+Thread,再到 NIO 版本和 NIO+Thread,最后展示了一个使用 Netty 框架的简洁实现。文章旨在说明如何解决阻塞问题,并对比不同模型的优劣,最终推荐使用 Netty 以简化 NIO 编程。
|
7月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
7月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
7月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
121 0
|
7月前
|
编解码 网络协议
Netty基础篇:NIO中缓冲区设置太小
Netty基础篇:NIO中缓冲区设置太小