netty案例,netty4.1源码分析篇二《ServerBootstrap配置与绑定启动》

简介: 本章节继续介绍ServerBootstrap相关代码。

11.jpg

启动NettyServer的模版代码

private void bing(int port) {
    EventLoopGroup parentGroup = new NioEventLoopGroup(); 
    EventLoopGroup childGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(parentGroup, childGroup)
                .channel(NioServerSocketChannel.class)    //非阻塞模式
                .option(ChannelOption.SO_BACKLOG, 128)
                .childHandler(new MyChannelInitializer());
        ChannelFuture f = b.bind(port).sync();
        System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
        f.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        childGroup.shutdownGracefully();
        parentGroup.shutdownGracefully();
    }
}

ServerBootstrap与Bootstrap

  • 它们都是继承于AbstractBootstrap,分别负责服务端与客户端;
  • ServerBootstrap,服务端用于接收客户端的连接并为接收连接的用户创建Channel通道
  • BootStrap,客户端不接收连接,并且是在父通道完成系列操作。

类继承结构图:

12.jpg

ServerBootstrap启动流程源码分析

1、 处理说明

  • 新建NioEventLoopGroup类型的bossGroup和group。bossGroup主要处理服务端接收客户端连接处理,group主要处理读写等I/O事件及任务等;
  • 创建ServerBootstrap,其主要对一些处理进行代理,如bind()等操作,其是其他类的一个简单门面;
  • channel()方法设置服务端的ServerSocketChannel实现类,本处实现类为NioServerSocketChannel。
  • option()方法设置Channel的相关选项,具体查看ChannelOption中的定义;
  • localAddress()设置服务端绑定的本地地址及端口;
  • handler()设置服务端的对应Channel的Handler;
  • childHandler()设置子连接的Channel的Handler;
  • bind()及sync()绑定本地地址并同步返回绑定结果;

2、 bing()调用流程

13.jpg

  • 调用ServerBootstrap.bind():应用调用ServerBootstrap的bind()操作;
  • 调用AbstractBootstrap.bind():调用doBind()对进行bind操作;
  • 调用AbstractBootstrap.initAndRegister():利用ChannelFactory.newChannel()实例化NioServerSocketChannel;
  • 调用ServerBootstrap.init():对NioServerSocketChannel进行初始化,主要操作如设置Channel相关的选项及属性、设置ChannelHandler为ServerBootstrapAcceptor等,ServerBootstrapAcceptor为inbound类型的ChannelHandler,其为ServerBootstrap的内部类,其主要实现ChannelRead()操作,将客户端的连接注册到EventLoopGroup的EventLoop中。
  • 调用NioEventLoop.register():将NioServerSocketChannel注册到bossGroup中。
  • 调用AbstractBootstrap.doBind0:将实际的bind操作以任务的形式添加到bossGroup的EventLoop中。
  • 调用NioServerSocketChannel.bind():在EventLoop中以任务的形式调用此方法进行实际的bind()操作。

源码方法分析

1、doBind()源码分析

AbstractBootstrap.java | AbstractBootstrap.doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

主要流程处理

  • 调用initAndRegister()初始化Channel并将其注册到bossGroup中的NioEventLoop中;
  • 若注册成功,则调用doBind0()进行实际的bind操作;
  • 若还未注册,则创建注册结果的监听器及doBind0()的异步结果,若Channel注册成功,则在结果监听器中进行doBind0()操作,并将bind()异步结果这种为成功;否则将在监听器中设置异步结果为失败;

2、 initAndRegister()源码分析

AbstractBootstrap.java | AbstractBootstrap.initAndRegister()

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
    return regFuture;
}

主要处理流程

  • 通过ChannelFactory新创建一个Channel;
  • 调用ServerBootstrap的init()方法对Channel进行初始化;

3、init()源码分析

AbstractBootstrap.java | AbstractBootstrap.init()

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    ChannelPipeline p = channel.pipeline();
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

主要处理流程

  • 如果设置了Channel选项,则调用setChannelOptions()对Channel进行选项设置;
  • 如果设置了属性,则将对应属性设置为Channel的属性;
  • 设置子Channel的选项及属性;
  • 初始化NioServerSocketChannel的ChannelHandler为ServerBootstrapAcceptor,ServerBootstrapAcceptor为inbound类型的ChannelHandler,其主要功能是将已经接受连接的子Channel注册到workerGroup的NioEventLoop中;

4、 doBind0()源码分析

AbstractBootstrap.java | AbstractBootstrap.doBind0()

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

主要处理流程

  • 将NioServerSocketChannel.bind()操作封装为任务,并将任务提交给其对应的EventLoop进行处理;

5、 ServerBootstrapAcceptor源码分析

ServerBootstrapAcceptor为NioServerSocketChannel的 – ChannelHandler,其类型为Inbound类型;

ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;
    ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;
        // Task which is scheduled to re-enable auto-read.
        // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
        // not be able to load the class because of the file limit it already reached.
        //
        // See https://github.com/netty/netty/issues/1328
        enableAutoReadTask = new Runnable() {
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        child.pipeline().addLast(childHandler);
        setChannelOptions(child, childOptions, logger);
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    private static void forceClose(Channel child, Throwable t) {
        child.unsafe().closeForcibly();
        logger.warn("Failed to register an accepted channel: {}", child, t);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        final ChannelConfig config = ctx.channel().config();
        if (config.isAutoRead()) {
            // stop accept new connections for 1 second to allow the channel to recover
            // See https://github.com/netty/netty/issues/1328
            config.setAutoRead(false);
            ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
        }
        // still let the exceptionCaught event flow through the pipeline to give the user
        // a chance to do something with it
        ctx.fireExceptionCaught(cause);
    }
}

ServerBootstrapAcceptor主要实现了以下方法:

  • channelRead():设置子连接的ChannelHandler、设置子连接的Channel选项,设置子连接的Channel属性,将子连接注册的child对应的EventLoop中(即workerGroup的EventLoop中);
  • exceptionCaught():若ServerSocketChannel在accept子连接时抛出异常,若ServerSocketChannel的autoRead为true,则设置其为false,即不允许自动接收客户端连接,并延迟1s后再设置其为true,使其允许自动接收客户端连接;
目录
相关文章
|
3月前
|
传感器 物联网 微服务
Netty的源码分析和业务场景
【8月更文挑战第2天】Netty 是一款高性能的异步事件驱动网络框架,其源码深邃且复杂。通过采用Reactor模式与主从多线程设计,Netty能高效处理网络事件。例如,`NioEventLoop`负责I/O事件及任务执行,内置线程循环机制。内存管理方面,Netty提供高效内存池与`ByteBuf`类来减少开销并优化内存操作。在业务场景上,Netty广泛应用于分布式系统、微服务架构中的高效通信,以及实时通信场景如在线游戏和直播中的大量并发连接处理,同时也在物联网领域发挥重要作用,确保设备与服务器间稳定快速的数据传输。
|
3月前
|
网络协议 大数据 Linux
Netty的源码分析和业务场景
通过深入分析 Netty 的源码和理解其在不同业务场景下的应用,开发者可以更好地利用这一强大的网络编程框架,构建高效、稳定且可扩展的网络应用。
222 1
|
6月前
|
前端开发 网络协议 Java
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
331 0
|
6月前
|
监控 网络协议 调度
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析
411 0
|
编解码 前端开发 Java
源码分析Netty:核心组件及启动过程分析
本篇从实例出发,了解Netty核心组件的概念、作用及串联过程。从概念到设计原理,再到深入了解实现细节,从而能够清晰地掌握Netty的技术细节甚至存在的问题,才能最终更好地支持我们实际的各项业务。
350 0
|
网络协议
由浅入深Netty聊天室案例
由浅入深Netty聊天室案例
57 0
|
消息中间件 分布式计算 NoSQL
由浅入深Netty入门案例
由浅入深Netty入门案例
129 0
|
Rust Dubbo 网络协议
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
通过 HTTP/2 协议案例学习 Java & Netty 性能调优:工具、技巧与方法论
12654 10
Netty入门到超神系列-聊天室案例
对于服务端而言需要做如下事情 selector监听客户端的链接 如果有“读”事件,就从通道读取数据 把数据转发给其他所有的客户端,要过滤掉发消息过来的客户端不用转发 对于客户端而言需要做如下事情 selector监听服务端的“读”事件 如果有数据从通道中读取数据,打印到控制台 监听键盘输入,向服务端发送消息
112 0
|
Java API 调度
Netty组件EventLoopGroup和EventLoop源码分析
Netty组件EventLoopGroup和EventLoop源码分析
74 0