是什么
EventLoop (事件循环对象)本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
1. public class Test { 2. public static void main(String[] args) { 3. DefaultEventLoopGroup group = new DefaultEventLoopGroup(2); 4. System.out.println(group.next()); 5. System.out.println(group.next()); 6. System.out.println(group.next()); 7. } 8. }
底层是通过去摸进行实现的
NioEventLoop 处理任务
NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则,可以保证任务执行的公平性。NioEventLoop 处理的任务类型基本可以分为三类。
普通任务:通过 NioEventLoop 的 execute() 方法向任务队列 taskQueue 中添加任务。例如 Netty 在写数据时会封装 WriteAndFlushTask 提交给 taskQueue。taskQueue 的实现类是多生产者单消费者队列 MpscChunkedArrayQueue,在多线程并发添加任务时,可以保证线程安全。
定时任务:通过调用 NioEventLoop 的 schedule() 方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务。例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现。
尾部队列:tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等。
普通任务
1. public class Test { 2. public static void main(String[] args) { 3. NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(2); 4. nioEventLoopGroup.execute(()->{ 5. System.out.println(Thread.currentThread()); 6. System.out.println("test..."); 7. }); 8. } 9. }
传递一个run()接口即可
定时任务
1. public class Test { 2. public static void main(String[] args) { 3. NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(2); 4. nioEventLoopGroup.scheduleAtFixedRate(()->{ 5. System.out.println("test..."); 6. },0,1, TimeUnit.SECONDS); 7. } 8. }
IO事件
服务器端两个 nio worker 工人
1. new ServerBootstrap() 2. .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) 3. .channel(NioServerSocketChannel.class) 4. .childHandler(new ChannelInitializer<NioSocketChannel>() { 5. @Override 6. protected void initChannel(NioSocketChannel ch) { 7. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { 8. @Override 9. public void channelRead(ChannelHandlerContext ctx, Object msg) { 10. ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; 11. if (byteBuf != null) { 12. byte[] buf = new byte[16]; 13. ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); 14. log.debug(new String(buf)); 15. } 16. } 17. }); 18. } 19. }).bind(8080).sync();
客户端,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
1. public static void main(String[] args) throws InterruptedException { 2. Channel channel = new Bootstrap() 3. .group(new NioEventLoopGroup(1)) 4. .handler(new ChannelInitializer<NioSocketChannel>() { 5. @Override 6. protected void initChannel(NioSocketChannel ch) throws Exception { 7. System.out.println("init..."); 8. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); 9. } 10. }) 11. .channel(NioSocketChannel.class).connect("localhost", 8080) 12. .sync() 13. .channel(); 14. 15. channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes())); 16. Thread.sleep(2000); 17. channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
1. 22:03:34 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 2. 22:03:36 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - zhangsan 3. 22:05:36 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 4. 22:05:38 [DEBUG] [nioEventLoopGroup-3-2] c.i.o.EventLoopTest - lisi 5. 22:06:09 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu 6. 22:06:11 [DEBUG] [nioEventLoopGroup-3-1] c.i.o.EventLoopTest - wangwu
可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定
1. DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2); 2. new ServerBootstrap() 3. .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) 4. .channel(NioServerSocketChannel.class) 5. .childHandler(new ChannelInitializer<NioSocketChannel>() { 6. @Override 7. protected void initChannel(NioSocketChannel ch) { 8. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); 9. ch.pipeline().addLast(normalWorkers,"myhandler", 10. new ChannelInboundHandlerAdapter() { 11. @Override 12. public void channelRead(ChannelHandlerContext ctx, Object msg) { 13. ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; 14. if (byteBuf != null) { 15. byte[] buf = new byte[16]; 16. ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); 17. log.debug(new String(buf)); 18. } 19. } 20. }); 21. } 22. }).bind(8080).sync();
客户端代码不变,启动三次,分别修改发送字符串为 zhangsan(第一次),lisi(第二次),wangwu(第三次)
输出
1. 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] REGISTERED 2. 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] ACTIVE 3. 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B 4. +-------------------------------------------------+ 5. | 0 1 2 3 4 5 6 7 8 9 a b c d e f | 6. +--------+-------------------------------------------------+----------------+ 7. |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | 8. +--------+-------------------------------------------------+----------------+ 9. 22:19:48 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE 10. 22:19:48 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan 11. 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ: 8B 12. +-------------------------------------------------+ 13. | 0 1 2 3 4 5 6 7 8 9 a b c d e f | 14. +--------+-------------------------------------------------+----------------+ 15. |00000000| 7a 68 61 6e 67 73 61 6e |zhangsan | 16. +--------+-------------------------------------------------+----------------+ 17. 22:19:50 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x251562d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52588] READ COMPLETE 18. 22:19:50 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - zhangsan 19. 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] REGISTERED 20. 22:20:24 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] ACTIVE 21. 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B 22. +-------------------------------------------------+ 23. | 0 1 2 3 4 5 6 7 8 9 a b c d e f | 24. +--------+-------------------------------------------------+----------------+ 25. |00000000| 6c 69 73 69 |lisi | 26. +--------+-------------------------------------------------+----------------+ 27. 22:20:25 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE 28. 22:20:25 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi 29. 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ: 4B 30. +-------------------------------------------------+ 31. | 0 1 2 3 4 5 6 7 8 9 a b c d e f | 32. +--------+-------------------------------------------------+----------------+ 33. |00000000| 6c 69 73 69 |lisi | 34. +--------+-------------------------------------------------+----------------+ 35. 22:20:27 [DEBUG] [nioEventLoopGroup-4-2] i.n.h.l.LoggingHandler - [id: 0x94b2a840, L:/127.0.0.1:8080 - R:/127.0.0.1:52612] READ COMPLETE 36. 22:20:27 [DEBUG] [defaultEventLoopGroup-2-2] c.i.o.EventLoopTest - lisi 37. 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] REGISTERED 38. 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] ACTIVE 39. 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B 40. +-------------------------------------------------+ 41. | 0 1 2 3 4 5 6 7 8 9 a b c d e f | 42. +--------+-------------------------------------------------+----------------+ 43. |00000000| 77 61 6e 67 77 75 |wangwu | 44. +--------+-------------------------------------------------+----------------+ 45. 22:20:38 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE 46. 22:20:38 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu 47. 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ: 6B 48. +-------------------------------------------------+ 49. | 0 1 2 3 4 5 6 7 8 9 a b c d e f | 50. +--------+-------------------------------------------------+----------------+ 51. |00000000| 77 61 6e 67 77 75 |wangwu | 52. +--------+-------------------------------------------------+----------------+ 53. 22:20:40 [DEBUG] [nioEventLoopGroup-4-1] i.n.h.l.LoggingHandler - [id: 0x79a26af9, L:/127.0.0.1:8080 - R:/127.0.0.1:52625] READ COMPLETE 54. 22:20:40 [DEBUG] [defaultEventLoopGroup-2-1] c.i.o.EventLoopTest - wangwu
可以看到,nio 工人和 非 nio 工人也分别绑定了 channel(LoggingHandler 由 nio 工人执行,而我们自己的 handler 由非 nio 工人执行)