准备
1、NettyServer
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1、创建bossGroup线程组:处理网络连接事件。默认线程数:2*处理器线程数
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 2、创建workGroup线程组:处理网络read/write事件。 默认线程数:2*处理器线程数
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 3、创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 4、服务端启动助手,设置线程组
serverBootstrap.group(bossGroup,workerGroup)
// 5、设置服务端Channel实现类
.channel(NioServerSocketChannel.class)
// 6、设置bossGroup线程队列中等待连接个数
.option(ChannelOption.SO_BACKLOG,128)
// 7、设置workerGroup中线程活跃状态
.childOption(ChannelOption.SO_KEEPALIVE,true)
// 使用channelInitializer 可以配置多个handler
.childHandler(new ChannelInitializer<SocketChannel>() {// 8、设置一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 9、向pipeline中添加自定义的channelHandler, 处理socketChannel传送的数据
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 10、服务端启动并绑定端口
ChannelFuture future = serverBootstrap.bind(9999).sync();
// 给服务器启动绑定结果,对结果进行监听,触发回调
future.addListener((ChannelFuture channelFuture)-> {
if(channelFuture.isSuccess()){
System.out.println("服务器启动成功");
}else {
System.out.println("服务器启动失败");
}
});
// 11、关闭监听通道和连接池,将异步改同步
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
2、NettyServerHandler
/**
* 自定义的channelHandler处理器
*
* 事件触发,触发相应函数
*/
public class NettyServerHandler implements ChannelInboundHandler {
/**
* 通道读取事件
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuffer = (ByteBuf)msg;
System.out.println("客户端:"+byteBuffer.toString(CharsetUtil.UTF_8));
}
/**
* 通道数据读取完毕事件
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
TimeUnit.SECONDS.sleep(2);
ctx.writeAndFlush(Unpooled.copiedBuffer("叫我靓仔!!!".getBytes()));
}
/**
* 发生异常捕获事件
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 通道就绪事件
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
3、NettyClient
/**
* nettyClient
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1、创建线程组
NioEventLoopGroup group = new NioEventLoopGroup();
// 2、创建客户端启动助手bootstrap
Bootstrap bootstrap = new Bootstrap();
// 3、配置线程组
bootstrap.group(group)
// 4、定义socketChannel的实现类
.channel(NioSocketChannel.class)
// 5、定义channelHandler, 处理socketChannel的数据
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//6、向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandler());
}
});
// 7、启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect(new InetSocketAddress(9999)).sync();
// 8、关闭通道和关闭连接池
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
4、NettyClientHandler
/**
* 自定义的channelHandler处理器
* <p>
* 事件触发,触发相应函数
*/
public class NettyClientHandler implements ChannelInboundHandler {
/**
* 通道读取事件
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端:" +
byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 通道数据读取完毕事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("不行,不行啊!!!".getBytes()));
}
/**
* 发生异常捕获事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好哇 小客客!!!".getBytes()));
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
Netty启动流程
1、绑定端口,封装程SocketAddress
**2、创建初始化Channel,将NioServerSocketChannel绑定到Boss
NioEventLoopGroup中的EventLoop中的Selector上,指定Selector监听事件为accept**
2.1 反射创建NioServerSocketChannel
给ServerSocketChannel绑定id(唯一标识),NioMessageUnsafe(channel数据读写类),ChannelPipeline(channel业务处理管道,可以设置许多ChannelHandler进行编解码,业务处理)
1、感兴趣的事件是 0 指客户端建立连接
2、ServerSocketChannel开启非堵塞模式,阻塞模式会怎么样?试想下,如果服务器通道阻塞,一个客户端SocketChannel与ServerSocketChannel建立连接,进行相关操作,其它的客户端怎么办?通道被占用了啊,当然这是我的理解
2.2 初始化NioServerSocketChannel
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
// 获取workerGroup
final EventLoopGroup currentChildGroup = childGroup;
/**
* ServerBootStrap通过childHandler添加的childHandler,最终会在下面的ServerBootstrapAcceptor中添加进pipeline
*/
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
/**
* 向pipeline中添加一个channelHandler
* ChannelInitializer对象也是一个ChannelHandler
* TODO 这里向pipeline里添加了一个ChannelHandler,会在哪里被触发呢?
* 答:这里的channelHandler对象会在DefaultChannelPipeline#callHandlerAddedForAllHandlers方法里执行
* 需要注意的是,这里是在ChannelInitializer抽象类的handlerAdded里调用了initChannel的抽象方法,从而调进
* 此处的匿名对象类方法里。
*
*/
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// TODO 这里是ServerBootStrap中设置的handler
ChannelHandler handler = config.handler();
/**
* 将ServerBootStrap在初始化的时候添加的ChannelHandler在此处真正的添加进pipeline中(在ServerTest中ServerBootStrap#childHandler添加的)
*/
if (handler != null) {
pipeline.addLast(handler);
}
/**
* 拿出NioServerSocketChannel绑定的NioEventLoop来执行以下线程
*
* TODO 这里addLast进来的ServerBootstrapAccetor是用于workerGroup注册客户端用的!
* TODO 这里的pipeline扮演这很重要的作用。
*/
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
ServerSocketChannel初始化本质上就是为绑定的ChannelPipeline(链表)设置ChannelInitializer(ChannelHandler)。这个ChannelInitializer 主要作用:
1、为ServerSocketChannel, 设置 Channelhandler(自己配置的)
2、通过NioServerSocketChannel绑定的NioEventLoop来执行以下线程任务
ServerBootstrapAccetor是用于workerGroup注册客户端用的! 这个功能后面debug展示
2.3、ServerSocketChannel注册到NioEventLoop中的Selector中
就是从BossEventLoopGroup中拿一个NioEventLoop出来,将ServerSocketChannel注册到NioEventLoop上
获取到SocketChannel在创建的时候创建的NioMessageUnsafe类,进行注册
通过NioEvetLoop 进行 一个通道注册任务,执行任务
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是在NioEventLoop线程内,还是在外部线程
boolean inEventLoop = inEventLoop();
/**
* 这里添加的Task是一个注册NioServerSocketChannel的任务!
* 是AbstractChannel$AbstractUnsafe的一个匿名Runnable类
*/
addTask(task);
// 如果是外部线程调用的
if (!inEventLoop) {
/**
* 这里启动一个线程,就是NioEventLoop!!启动的是NioEventLoop
*/
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
*/
wakeup(inEventLoop);
}
}
/**
* 核心工作就是启动NioEventLoop线程
*/
private void doStartThread() {
assert thread == null;
/**
* 1. 这里executor是ThreadExecutorMap类中的一个匿名Executor内部类,是Executor apply()这个方法返回的实例对象
* 2. 这里传入的execute()里面的匿名内部类是SingleThreadEventExecutor中的,所以
* 是SingleThreadEventExecutor$对象
* 3. 执行的runnable是FastThreadLocalRunnable对象
*/
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 核心,就是启动NioEventLoop!!!
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
1、判断当前线程是NioEventLoop内部线程还是外部线程(由于是主线程启动,所以是外部线程)
2、将通道注册任务放入NioEventLoop中任务队列中
3、包装任务,通过NioEventLoop,主要是为了从NioEventLoop的run方法开始执行
3、NioEventLoop通过ThreadFacoty创建线程,线程开始执行NioEventLoop run方法
/**
* NIOEventLoop执行核心
*/
@Override
protected void run() {
int selectCnt = 0; // 阻塞选择次数
// 从NioEventLoop中的 taskQueue中 判断是否存在事件
for (;;) { // 轮训注册到selector的IO事件 为什么for(;;)比while(1)好?因为for(;;)底层的指令更少,效率更高
try {
int strategy; // strategy = 0 default
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); // 获取策略。如果有任务则使用非阻塞方式
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // select事件执行
long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); // 当前截止时间
if (curDeadlineNanos == -1L) { // 表明没有定时任务
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) { // 如果没有任务,则select阻塞等待任务 任务存放在SingleThreadEventLoop
// TODO 测试
System.err.println("[CurrentThread = " + Thread.currentThread().getName() + "]I'm selecting... waiting for selectKey or tasks!");
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
// 标记未唤醒状态
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
System.err.println("[CurrentThread = " + Thread.currentThread().getName() + "] select() 调用完了,此时已经有事件进来了?");
selectCnt++; // 选择次数+1
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio; // 这里的ioRatio默认是50
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys(); // 处理选择key,处理io相关的逻辑
}
} finally {
ranTasks = runAllTasks(); // 处理外部线程扔到taskQueue里的任务,这里的taskQueue是一个mpscQueue
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime(); // 计算处理选择key的时间
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
/**
* 在Netty中,有两种任务,普通任务和定时任务。在执行任务的时候,会把定时任务队列里的task扔进普通任务队列里,
* 这里的普通任务队列就是mpscQueue,接着就挨个执行mpscQueue里的任务。
*
* 任务:普通任务 、定时任务
* 队列:普通任务队列mpscQueue 、 定时任务队列
*
*/
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) 解决空轮训Bug,重置selectCnt,重新生成selector
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw (Error) e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
就是NioEventLoop中的线程进行死循环处理事件,通过事件进行驱动处理
1、判断NioEventLoop中的任务队列中是否存在任务 (存在通道注册任务哦!!),从而定义处理策略
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); // 获取策略。如果有任务则使用非阻塞方式
有任务,那么 通过 selector 获取select 值,selector如果获取这个值呢?没看懂
2、拿到的select值为0,好像是没有SelectedKey,那么执行runTask,从任务队列中获取 之前的通道注册事件
protected boolean runAllTasks(long timeoutNanos) {
// 从定时任务队列中把任务聚合到普通队列里
fetchFromScheduledTaskQueue();
// 从普通任务队列里拿任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 真正执行了任务
safeExecute(task);
taskTimes++;
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 当累积到64个任务的时候,这里判断是因为任务的执行是比较耗时的
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
// 如果当前时间 >= 截止时间,即已经超过了截止时间了
if (lastExecutionTime >= deadline) {
break;
}
}
// 将任务从任务队列中弹出
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
任务真正被执行了哦!
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
/**
* 调用selectableChannel的register方法,用于在给定的NioEventLoop上的selector上注册这个通道channel,并返回一个选择键
*
* OP_READ = 1 << 0 读操作位
* OP_WRITE = 1 << 2 写操作位
* OP_CONNECT = 1 << 3 客户端连接到服务端操作位
* OP_ACCEPT = 1 << 4 服务端接受客户端链接操作位
*
* 此处调用了jdk nio的selectableChannel的register方法,传入的操作位是0,表明对任何事件都不感兴趣,仅仅是完成注册操作。
*
* 向selector注册channel成功后,会返回一个selectionKey,后续可以拿着这个selectionKey获取到channel。
*
* javaChannel()拿到的是:AbstractSelectableChannel,在其register方法里,会调用addKey,给selectionKey添加默认数组大小3
* 并最终调用jdk底层register
*
*/
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
/**
* 由于未调用select#select(),因此可能仍然在缓存,而未删除但是已经取消了的selectionKey,强制调用selectNow()
* 将selectionKey从Selector上删除
*/
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
拿到ServerSocketChannel,注册到NioEventLoop中的Selector中 ,注册成功返回一个SelectionKey 。 对应代码 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
这个0
表示通道注册
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
1、将ServerSocketChannel,Selector 封装成 SelectionKeyImpl(所谓的通道注册凭证)
2、将ski (SelectionKeyImpl)放入 selector newKeys 队列
3、将ski 放入 selector的 SelectionKey集合中
4、设置NioEvenLoop中当前这个Selector 感兴趣的事件/操作
至此成功将ServerSocketChannel 放入 Selector中,并设置Selector所感兴趣的事件 0
2.4、调用ServerSocketChannel中的pipeLine,进行AddHandler
这里又会走到NioEventLoop中的run方法中,然后执行上图任务,整个流程走完
问题:
1、Selector如何进行Select
在NioEventLoop,ServerSocketChannel创建完后,会将channel注册到Selector中并设置通道感兴趣的事件,这里是注册通道事件。NioEventLoop会开启一个线程死循环,run方法内部 Selector会通过 select()/selectNow() 以阻塞非阻塞方式等待感兴趣的事件。具体需要看Selector 选择的源码
如果没有任务,select会阻塞
如果有任务,以非阻塞的方式执行
如果通道有IO事件,那么进行处理,内部会对感兴趣的客户端建立连接事件/通道注册 进行处理