Netty网络框架(二)https://developer.aliyun.com/article/1469520
3、EventLoop
EventLoop事件循环,监听IO事件,内部封装了线程
EventLoopGroup事件循环组,是对EventLoop的管理,封装了线程池。
当新建channel时,group会为其分配一个EventLoop,封装了nio中的Selector,监听通道中的所有事件,一个通道的生命周期内,所有操作都由相同的EventLoop所封装的线程处理。
同时,多个通道可以由一个EventLoop处理,是多对一的关系
1)EventLoopGroup
类层级结构
(通过选中NioEventLoopGroup源码 - 右键 - 选中Diagrams - 选中show diagram 展示出来)
A 初始化流程
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); [NioEventLoopGroup] ------------------------------------------------------------------------- public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } // 逐步增加参数 public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } // 调用父类的构造器 public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } [MultithreadEventLoopGroup] ------------------------------------------------------------------------- protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // 初始化线程数量的逻辑 线程数 = cpu核数 * 2 private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } [MultithreadEventExecutorGroup] ------------------------------------------------------------------------- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } // 核心逻辑 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 根据线程数量 创建EventLoop的逻辑 // newChild()的具体实现在NioEventLoopGroup类中 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } [NioEventLoopGroup] ------------------------------------------------------------------------- @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
2)EventLoop
重要属性为 Selector 及其父类的父类中的 Thread
Selector用于在channel创建之后,注册其中监听后续的I/O事件
Thread用于进行轮询,在channel注册之后启动线程
A 注册channel
ChannelFuture future = serverBootstrap.bind(8888).sync(); [AbstractBootstrap] -------------------------------------------------------------------------- public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); ..... } final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
注册的源码调用链路
ChannelFuture regFuture = config().group().register(channel); [MultithreadEventLoopGroup] ------------------------------------------------------------------------------ public ChannelFuture register(Channel channel) { return next().register(channel); } [SingleThreadEventLoop] ------------------------------------------------------------------------------ public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } [AbstractChannel] ------------------------------------------------------------------------------ // 核心逻辑是 调用了 register0()方法 public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // channel中存在EventLoop类型的属性 // 通道初始化时,会将指定的EventLoop与channel进行关联 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // 核心逻辑是调用了doRegister()方法 private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } [AbstractNioChannel] ---------------------------------------------------------------------------- // 核心注册逻辑 protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // 将通道注册进Selector中 此时感兴趣的事件是0 // 并且将当前对象作为附加对象存入其中 等价selectionKey.attach()方法 // 使用对象时 再通过selectionkey.attachment()方法取出对象 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
B 轮询事件的状态
读取源码的思路:找到入口,找到核心逻辑。
AbstractChannel中register()方法,对eventLoop.execute()的调用,就是启动线程进行轮询的入口。
[SingleThreadEventExecutor] ----------------------------------------------------------------------------- public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private final Queue<Runnable> taskQueue; // 当前类维护了一个任务队列 private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { // 启动线程 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } } private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { // 真正的调用逻辑 SingleThreadEventExecutor.this.run(); success = true; } ...... } } } [NioEventLoop] ----------------------------------------------------------------------------- protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { // 判断队列中是否存在任务 if (!hasTasks()) { // 如果不存在 调用select()进行获取 strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { // 处理任务的核心逻辑 processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
processSelectedKeys()是处理任务的核心逻辑,来自于NioEventLoop的run()方法调用
// 处理事件集合 private void processSelectedKeys() { // 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理 if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); // 这是注册时,存储的附加对象,即为通道对象channel final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { // 处理具体事件 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } } // 处理具体事件 // 判断事件类型,调用对应的逻辑处理 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
其中读事件的处理
unsafe.read(); [AbstractNioByteChannel] ---------------------------------------------------------------------------- public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
4、Bootstrap
引导,对应用程序进行配置,并让他运行起来的过程。
1)配置
必选参数:group 、 channel、 handler(服务端 -- childHandler)
group(): 指定一个到两个reactor
channel():指定channel工厂,反射的方式创建channel使用
handler():指定reactor的处理器,其中childHandler指定的是,服务端所接收到的客户端channel使用的处理器,而服务端的主reactor(bossGroup),已经默认添加了acceptor处理器,所以可以不指定。
option():指定TCP相关的参数,以及netty自定义的参数
配置参数的过程,称之为初始化。
2)运行
// 启动并设置端口号 但需要使用sync异步启动 try { ChannelFuture future = serverBootstrap.bind(8888).sync(); // 将关闭通道的方式 也设置为异步的 // 阻塞finally中的代码执行 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
bind(),将服务端的channel绑定到端口号,然后接收客户端的连接,让整个netty运行起来。
sync(),因为绑定事件是异步的,所以使用sync同步等待结果,换句话说,bind只触发了绑定端口的事件,需要使用sync等待事件执行的结果。
future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync使当前线程执行到此处阻塞,以确保不执行后续的shutdown方法。
3)源码解析
A 类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
B init方法
工作内容
a、设置channel相关的选项参数
b、设置channel的属性键值对
c、添加对channel的IO事件处理器 (Acceptor角色)
void init(Channel channel) { // 设置channel相关的选项参数 setChannelOptions(channel, newOptionsArray(), logger); // 设置channel的属性键值对 setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // 添加对channel的IO事件处理器 (Acceptor角色) ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
Acceptor分析
功能:将主reactor接收到的客户端通道,传递给从reactor
// ServerBootstrapAcceptor是ServerBootstrap的静态内部类 // netty将acceptor看作一个处理器,并且是入站处理器 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { // 具体的逻辑封装到channelRead()方法中 // 对客户端通道进行配置 , 然后注册到从Reactor中 public void channelRead(ChannelHandlerContext ctx, Object msg) { // 此时msg对应 服务端接收到的客户端通道 final Channel child = (Channel) msg; // 设置处理链路 child.pipeline().addLast(childHandler); // 设置通道的配置项和参数 setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { // childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // 增加监听 获取注册的异步结果 // 如果注册失败 或者抛出异常 都会关闭channel if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } // 如果处理客户端连接失败了,暂停一秒,然后继续接受 // 为保障服务端能够尽可能多的处理客户端的连接 不受某一次处理失败的结果影响 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final ChannelConfig config = ctx.channel().config(); if (config.isAutoRead()) { // stop accept new connections for 1 second to allow the channel to recover // See https://github.com/netty/netty/issues/1328 config.setAutoRead(false); ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS); } // still let the exceptionCaught event flow through the pipeline to give the user // a chance to do something with it ctx.fireExceptionCaught(cause); }
4) Future和Promise
Future代表的是,一个还未完成的异步任务的执行结果。可以通过addListener方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等。
对Future而言,状态只能读取,无法更改,又出现了Promise,但是Promise只能更改一次。
参照生活中的定额发票(Future)和机打发票(Promise)。
(四)拓展篇
1、心跳检测
检测逻辑:
1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。
2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。
3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。
需求设计:
1) 客户端向服务端发送 “I am alive” , sleep一个随机时间,模拟空闲状态
2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数
3) 设定阈值,达到阈值时主动关闭连接
IdleStateHandler , 是netty提供的处理器
1)超过多长时间没有读 readerIdleTime
- 超过多长时间没有写 writerIdleTime
- 超过多长时间没有读和写 allIdleTime
底层实现检测的是 IdleStateEvent事件,通过管道传递给下一个handler处理,处理方法是userEventTriggered。
其中IdleStateEvent事件,分为READER_IDLE、WRITER_IDLE、ALL_IDLE三大类
2、TCP粘包拆包
TCP是基于流的。
当客户端发送多个包,服务端只收到一个包,此时发生了粘包。
当客户端发送多个包,服务端也接收到多个包,但是包是不完整,多了或少了数据,此时发生了拆包。
UDP会发生拆包和粘包吗?
不会,UDP是基于报文的,在UDP的首部使用16bit存储报文的长度,因此不会发生。
TCP发生粘包和拆包的本质原因:
要发送的数据先经过TCP的缓冲区,还限制了最大报文长度。
A 如果要发送的数据 > TCP剩余的缓冲区大小,发生拆包
B 如果要发送的数据 > 最大报文长度,发生拆包
C 如果要发送的数据 << TCP剩余的缓冲区大小,发生粘包
D 接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包
解决办法:
A 设置出消息的长度
B 设置出消息的边界——分隔符
Netty提供的解码器,两类
A 基于长度的解码器,在包头部设置出数据的长度。(类似于UDP的头部处理)
LengthFieldBasedFrameDecoder 自定义长度的处理方式
FixedLengthFrameDecoder 固定长度的处理方式
B 基于分隔符的解码器
DelimiterBasedFrameDecoder 自定义分隔符的处理方式
LineBasedFrameDecoder 行尾("\n"或"\r\n")分隔符的处理方式
【Demo逻辑】
需求:客户端循环100次向服务端请求时间
1)第一种方式,传输的过程数据单位是字节流ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题
2)第二种方式,使用LineBasedFrameDecoder,配合StringDecoder使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的
3、序列化框架——protobuf
protobuf = protocol buffers
类似于xml的生成和解析,但效率更高,生成的是字节码,可读性稍差。
【Demo逻辑】
1)安装idea插件,protobuf support
如果安装之后,创建*.proto文件没有使用插件,手动设置关联关系
settings -> file types -> 找到protobuf -> 增加正则表达式
2)引入maven依赖和插件
<properties> <os.detected.classifier>windows-x86_64</os.detected.classifier> </properties> <build> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.0</version> <configuration> <protocArtifact> com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier} </protocArtifact> <pluginId>grpc-java</pluginId> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
3)在右侧maven project中可以找到相应的插件 (没有的话刷新)
4)在和java平级的目录下,创建proto文件夹,然后创建person.proto文件
5)person.proto
// 声明包名称的空间 syntax="proto3"; // 具体的类生成目录 option java_package="com.duing"; // 具体的类名 option java_outer_classname="PersonModel"; // 类结构 message Person{ int32 id = 1; // 此处的1代表顺序 string name = 2; }
- 使用插件进行编译,将编译生成的代码拷贝到需要的目录下
7)编写测例进行序列化和反序列化操作