Netty源码
相信学过 Netty
的小伙伴都应该熟悉 Java
的 NIO
,在 Java 中创建服务端和客户端的代码如下所示:
服务端
// 1. 创建一个 selector 对象 Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建FD-1 ssc.configureBlocking(false); // 非阻塞模式 // 2. 建立 selector 与 channel 的联系(注册) // SelectionKey:事件发生时,通过这个可以知道事件和那个channel的事件 // 这个key,只关注 accept 事件 SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null); // 3. 注册端口号 ssc.bind(new InetSocketAddress(8080));
客户端
// 建立客户端的channel SocketChannel channel = SocketChannel.open(); // 连接服务端的IP和端口 channel.connect(new InetSocketAddress("localhost", 8080)); // 发送消息 channel.write(Charset.defaultCharset().encode("hello"));
我们的 Netty
正是在 Java NIO
做的一层封装
既然是封装,Netty
的源码中必然存在以上 服务端
和 客户端
的代码
一、Netty 服务端
1. Netty 服务端启动代码
public class TestNettyServer { public static void main(String[] args) { // 1. 服务器端的启动器,负责组装 netty 主键,启动服务器 new ServerBootstrap() // 2. BossEventLoop、WorkerEventLoop(selector、thread) .group(new NioEventLoopGroup()) // 3. 选择服务器的IO模式 .channel(NioServerSocketChannel.class) // 4. boss 负责处理连接 worker 负责处理读写 .childHandler( // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 6. 添加具体的 handler ch.pipeline().addLast(new LoggingHandler()); } }) // 7. 绑定监听端口 .bind(8080); } }
Netty
中获取选择器Selector
,Netty
中使用NioEventloopGroup
中的NioEventloop
封装了线程和选择器- 创建
NioServerSocketChannel
,该Channel
作为附件添加到ServerSocketChannel
中
// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config NioServerSocketChannel attachment = new NioServerSocketChannel(); //注册(仅关联 selector 和 NioServerSocketChannel),未关注事件 SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
- 绑定端口
要剖析启动代码,我们直接从 bind
入手
2. Bind
选择器 Selector
的创建是在 NioEventloopGroup
中完成的。
而 NioServerSocketChannel
与 ServerSocketChannel
的创建,ServerSocketChannel
注册到 Selector
上以及绑定的操作都由 bind
完成。
所以,我们的启动入口:io.netty.bootstrap.AbstractBootstrap.bind
public ChannelFuture bind(SocketAddress localAddress) { this.validate(); return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress")); }
3. doBind
private ChannelFuture doBind(final SocketAddress localAddress) { // 负责NioServerSocketChannel和ServerSocketChannel的创建 // ServerSocketChannel的注册工作 // init由main线程完成,regisetr由NIO线程完成 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 因为register操作是异步的 // 所以要判断主线程执行到这里时,register操作是否已经执行完毕 if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 执行doBind0绑定操作 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. // 如果register操作还没执行完,就会到这个分支中来 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); // 添加监听器,NIO线程异步进行doBind0操作 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); // 执行doBind0绑定操作 doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
3.1 init
Channel channel = null; try { // 通过 channelFactory 创建 NIOServerSocketChannel channel = this.channelFactory.newChannel(); this.init(channel); } catch (Throwable var3) { if (channel != null) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } return (new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE)).setFailure(var3); }
3.1.1 创建 NIOServerSocketChannel
我们看一下通过这个 channelFactory
怎么创建的 NIOServerSocketChannel
,并如何实现 ServerSocketChannel.open()
public T newChannel() { try { return (Channel)this.constructor.newInstance(); } }
很明显,利用的反射的原理创建的 Channel
通过 DEBUG 我们可以得到当前 Channel
的地址:io.netty.channel.socket.nio.NioServerSocketChannel
我们追进去看看,构造方法:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); // 构造方法 public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) { return provider.openServerSocketChannel(); }
所以,最终调用的是:SelectorProvider.provider().openServerSocketChannel()
而这个调用,正好和我们 ServerSocketChannel ssc = ServerSocketChannel.open();
的调用链路一模一样,我用图展示一下:
通过上图,我们可以明显的看到,Netty
在初始化的时候,实际上调用 NioServerSocketChannel
的构造方法,通过其实现了 ServerSocketChannel.open()
,新建 FD
到这里,我们的初始化 NioServerSocketChannel
告一段落,下面看一下 Register
操作
3.1.2 添加 NIOServerSocketChannel 初始化 Handler
我们继续往下追,可以看到有一个 this.init(channel);
方法,这个方法添加了一个 Handler
p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() { public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = ServerBootstrap.this.config.handler(); if (handler != null) { pipeline.addLast(new ChannelHandler[]{handler}); } ch.eventLoop().execute(new Runnable() { public void run() { pipeline.addLast(new ChannelHandler[]{new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); } }});
们暂时不解释这里的作用,我们只需要记住这里添加了一个 Handler
,后面会进行调用(后面调用的时候会讲)。
3.2 Register
我们先总览一下 Register
的源码部分:
ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;
public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 获取当前的eventLoop AbstractChannel.this.eventLoop = eventLoop; // 此处完成了由 主线程 到 NIO线程 的切换 // eventLoop.inEventLoop()用于判断当前线程是否为NIO线程 if (eventLoop.inEventLoop()) { register0(promise); } else { // 向NIO线程中添加任务 eventLoop.execute(new Runnable() { @Override public void run() { // 该方法中会执行doRegister // 执行真正的注册操作 register0(promise); } }); } }
3.2.1 register0
private void register0(ChannelPromise promise) { try { // 执行真正的注册逻辑 doRegister(); neverRegistered = false; registered = true; // 调用init中的initChannel方法 pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); } }
我们的注册整体上主要分为 2 部分
- 执行真正的注册逻辑,也就是:
ssc.register(selector, SelectionKey.OP_ACCEPT, attach);
- 让我们之前写的
Handler
执行doRegister - eventLoop().unwrappedSelector():我们的 Selector 存储的位置
- this:将当前的 NIOServerSocketChannel 作为附件,便于之后的获取
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 注册 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } } }
pipeline.invokeHandlerAddedIfNeeded:此方法会调用我们一开始定义的 Handler
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 添加新任务,任务负责添加 handler // 该handler负责发生Accepet事件后建立连接 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
Register
主要实现了 ssc.register(selector, SelectionKey.OP_ACCEPT, attach);
的注册功能,并在创建时进行了线程切换,从 main线程
到 NIO线程
到这里,我们的 Register
也告一段落
3.3 doBind0
我们上面讲到了 safeSetSuccess(promise)
,向我们的 promise
设置成功的结果,并通过下面的 doBind0(regFuture, channel, localAddress, promise);
进行调用
final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) {. promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }
底层实现如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); boolean wasActive = isActive(); try { // 注册端口 doBind(localAddress); } // 前面一系列操作后,我们的 NIOServerSocketChannel 是否可用 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
注册端口的代码
@SuppressJava6Requirement(reason = "Usage guarded by java version check") @Override protected void doBind(SocketAddress localAddress) throws Exception { // 根据当前 JDK 的版本是否大于 7 if (PlatformDependent.javaVersion() >= 7) { // 调用ServerSocketChannel的bind方法,绑定端口 // javaChannel() = ServerSocketChannel javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
到这里,我们已经完成了端口的注册,距离我们服务器的启动只差绑定 SelectionKey.OP_ACCEPT
我们后续看一下:pipeline.fireChannelActive();
pipeline.fireChannelActive:触发所有该 pipeline 上的事件
我们 Netty
的 Handler
信息如下:
除了我们自定义的 Handler
,我们需要查看一下 Head
的代码
最终实现代码如下:
@Override protected void doBeginRead() throws Exception { final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); // readInterestOp = 1 << 4 = 16 if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
我们对比着 SelectionKey
的描述代码看一下:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
可以看到,上述代码添加的事件正是 OP_ACCEPT
事件。
至此,我们的 Netty
服务端就正式启动了
4. 总结
Netty 服务端的启动框架基本封装了 Java NIO 启动的部分源码。
剩余的源码将在以后的系列中持续更新,喜欢的小伙伴不妨点个关注~