1 传统socket编程
1.1 实战
服务端:ServerBoot
/** * @author JavaEdge */ public class ServerBoot { private static final int PORT = 8000; public static void main(String[] args) { Server server = new Server(PORT); server.start(); } }
Server
/** * @author JavaEdge */ public class Server { private ServerSocket serverSocket; public Server(int port) { try { this.serverSocket = new ServerSocket(port); System.out.println("Server starts success,端口:" + port); } catch (IOException exception) { System.out.println("Server starts failed"); } } public void start() { new Thread(() -> doStart()).start(); } private void doStart() { while (true) { try { Socket client = serverSocket.accept(); new ClientHandler(client).start(); } catch (IOException e) { System.out.println("Server failure"); } } } }
ClientHandler
/** * @author JavaEdge */ public class ClientHandler { public static final int MAX_DATA_LEN = 1024; private final Socket socket; public ClientHandler(Socket socket) { this.socket = socket; } public void start() { System.out.println("新客户端接入"); new Thread(() -> doStart()).start(); } private void doStart() { try { InputStream inputStream = socket.getInputStream(); while (true) { byte[] data = new byte[MAX_DATA_LEN]; int len; while ((len = inputStream.read(data)) != -1) { String message = new String(data, 0, len); System.out.println("客户端传来消息: " + message); socket.getOutputStream().write(data); } } } catch (IOException e) { e.printStackTrace(); } } }
Client
/** * @author JavaEdge */ public class Client { private static final String HOST = "127.0.0.1"; private static final int PORT = 8000; private static final int SLEEP_TIME = 5000; public static void main(String[] args) throws IOException { final Socket socket = new Socket(HOST, PORT); new Thread(() -> { System.out.println("客户端启动成功!"); while (true) { try { String message = "hello world"; System.out.println("客户端发送数据: " + message); socket.getOutputStream().write(message.getBytes()); } catch (Exception e) { System.out.println("写数据出错!"); } sleep(); } }).start(); } private static void sleep() { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { e.printStackTrace(); } } }
先后启动 ServerBoot、Client,输出
Server starts success,端口:8000 新客户端接入 客户端传来消息: hello worldhello world 客户端传来消息: hello world 客户端传来消息: hello world 客户端传来消息: hello world 客户端传来消息: hello world
客户端启动成功! 客户端发送数据: hello world 客户端发送数据: hello world 客户端发送数据: hello world
1.2 传统HTTP服务器原理
- 创建一个
ServerSocket - 监听并绑定一个端口一系列客户端来请求这个端口服务器使用Accept,获得一个来自客户端的Socket连接对象
- 启动一个新线程处理连接读Socket,
- 得到字节流解码协议
- 得到Http请求对象处理Http请求
- 得到一个结果
- 封装成一个HttpResponse对象编码协议
- 将结果序列化字节流写Socket,
- 将字节流发给客户端
- 继续循环步骤3
1.3 C/S 交互流程
2 Netty版socket编程
3 Netty核心组件
3.1 NioEventLoop
3.1.1 简介
① EventLoop
一个 EventLoop 就是一个 eventexecutor:
package io.netty.channel; import io.netty.util.concurrent.OrderedEventExecutor; /** * 一旦Channel注册了,将处理所有的 I/O 操作 * 一个 EventLoop 实例通常处理一个以上的Channel ,但这取决于执行的细节和内幕 */ public interface EventLoop extends OrderedEventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); }
NioEventLoopGroup ,Netty 框架提供的一个基于 NIO 的实现。是一个处理 I/O 操作的多线程事件循环的组。Netty 为不同类型的传输提供各种 EventLoopGroup 实现。
示例代码中实现服务器端应用程序,将使用两个NioEventLoopGroup:
- boss,接受传入的连接。因为accept事件只需建立一次连接,连接是可以复用的,accept只接受一次
- work,在上司接受连接并登记到工作人员后,处理接受连接的流量。使用多少线程及如何映射到创建的通道,取决于 EventLoopGroup 实现,甚至可能通过构造函数配置
Netty的发动机:
Server端
Client端
while(true)就对应一个 run 方法。
NioEventLoop#run
@Override protected void run() { for (;;) { oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); // 打断点 processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
netty有不同I/O编程模型实现。以NIO为例,对IO事件的处理是在NioEventLoop里,事件的注册:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 首先处理 OP_WRITE,因为我们可写一些排队的缓冲区,从而释放内存 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 调用 forceFlush,一旦无剩余可写内容,它也将清除 OP_WRITE // 可见,注册 OP_WRITE 事件,要执行的就是 flush 操作. ch.unsafe().forceFlush(); } // 还要校验 readOps 为0以解决可能的JDK bug,否则可能导致 spin loop // 处理读请求(断开连接)或接入连接 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
private void processSelectedKeys() { if (selectedKeys != null) { // 不用 JDK 的 selector.selectedKeys,性能更好(%1-2%),GC更少 processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
不同事件调用unsafe的不同方法,Netty对底层socket的操作都通过
unsafe
- NioMessageUnsafe
NioServerSocketChannel使用NioMessageUnsafe做socket操作 - NioByteUnsafe
NioSocketChannel使用NioByteUnsafe做socket操作
处理每个连接:
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // 数组中的空节点允许通道关闭后对其GC // 在SelectedSelectionKeySet中使用单数组 // 动机:SelectedSelectionKeySet当前在内部使用2个数组,并且期望用户调用flip访问基础数组并切换活动数组。 // 但是,我们不能同时使用2个数组,如果在重置数组元素时格外小心,就可以摆脱使用单数组。 // 修改: 介绍包装了Selector的SelectedSelectionKeySetSelector并确保我们在选择之前重置基础的SelectedSelectionKeySet数据结构- // NioEventLoop#processSelectedKeysOptimized中的循环边界可以更精确地定义,因为我们知道基础数组的实际大小 selectedKeys.keys[i] = null; // attachment 就是 NioServerSocketChannel final Object a = k.attachment(); // 打断点!!! if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
② EventExecutorGroup
负责:
- 经由其使用
next()方法,提供EventExecutor - 处理自己的生命周期,并允许在全局模式中关闭它们
③ EventExecutor
特殊的EventExecutorGroup,附带一些快捷方法,看是否有Thread在事件循环执行。
④ EventLoopGroup
public interface EventLoopGroup extends EventExecutorGroup
特殊的 EventExecutorGroup,允许注册 Channel,即事件循环期间可执行 channel 操作,得到处理,供以后选用。
/** * EventExecutorGroup实现的抽象基类,它同时处理多个线程的任务 */ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { // 底层数组存储 private final EventExecutor[] children;
3.2 Channel
以服务端的NioMessageUnsafe为例来看下read()方法的实现,对应是否有新连接进来的情况
直接把底层的 channel 封装成 NioSocketChannel
3.3 ByteBuf
3.4 Pipeline
小样例中对应内容,实际非常复杂
netty 将其抽象成逻辑链,netty怎么把每个 pipeline 加入到客户端连接的呢?
AbstractChannel
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }
DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
3.5 ChannelHandler
// 主要入参的ChannelHandler @Override public final ChannelPipeline addAfter( EventExecutorGroup group, String baseName, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; final AbstractChannelHandlerContext ctx; synchronized (this) { checkMultiplicity(handler); name = filterName(name, handler); ctx = getContextOrDie(baseName); newCtx = newContext(group, name, handler); addAfter0(ctx, newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }