引言
上篇文章中主要对Netty启动过程中,涉及的NioEventLoopGroup相关细节进行了详细介绍,本篇文章主要介绍启动过程中其他一些初始化步骤。
Channel的创建和初始化过程- 总结
一、Channel的创建和初始化过程
Channel是Netty对于网络实现层的抽象,可以对应于JDK中的NIO包实现,Netty服务端的Channel类型是 NioServerSocketChannel。下面来分析NioServerSocketChannel的创建和初始化。
1、端口绑定
启动过程中重要的操作为绑定端口,而NioServerSocketChannel的创建就是在此过程中进行的。ServerBootStrap的bind()方法
ChannelFuture f = b.bind(PORT).sync();
查看对应的源码,实际调用的是AbstractBootstrap类中的bind方法:
... public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } ... public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } ... private ChannelFuture doBind(final SocketAddress localAddress) { //初始化channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //若注册成功,则进行bind操作 if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
2、创建NioServerSocketChannel
initAndRegister()中完成NioServerSocketChannel初始化以及注册的操作,当初始化以及注册完成后,进行实际的绑定操作,我们继续往下看:
final ChannelFuture initAndRegister() { Channel channel = null; try { //channel工厂创建新的channel channel = channelFactory.newChannel(); //初始化channel init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //注册channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
创建channel,实际调用的是ReflectiveChannelFactory中的newChannel方法。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Constructor<? extends T> constructor; public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } ... }
从上述代码中我们可以看出,clazz.getConstructor().newInstance()通过反射的方式创建了Channel。
Channel创建好之后,进行Channel的初始化操作,
@Override void init(Channel channel) { //设置channle选项 setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); //设置channel属性 setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
二、总结
本文主要讲述了NioServerSocketChannel创建过程以及源码分析,后面文章继续说明channel创建之后的操作,如等待连接等等。