channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush() 方法将数据写入并刷出
ChannelFuture
下面是一段客户端的代码
1. new Bootstrap() 2. .group(new NioEventLoopGroup()) 3. .channel(NioSocketChannel.class) 4. .handler(new ChannelInitializer<Channel>() { 5. @Override 6. protected void initChannel(Channel ch) { 7. ch.pipeline().addLast(new StringEncoder()); 8. } 9. }) 10. .connect("127.0.0.1", 8080) 11. .sync() 12. .channel() 13. .writeAndFlush(new Date() + ": hello world!");
现在把它拆开来看
1. ChannelFuture channelFuture = new Bootstrap() 2. .group(new NioEventLoopGroup()) 3. .channel(NioSocketChannel.class) 4. .handler(new ChannelInitializer<Channel>() { 5. @Override 6. protected void initChannel(Channel ch) { 7. ch.pipeline().addLast(new StringEncoder()); 8. } 9. }) 10. .connect("127.0.0.1", 8080); // 1 11. 12. channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
1 处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象
实验如下:
1. ChannelFuture channelFuture = new Bootstrap() 2. .group(new NioEventLoopGroup()) 3. .channel(NioSocketChannel.class) 4. .handler(new ChannelInitializer<Channel>() { 5. @Override 6. protected void initChannel(Channel ch) { 7. ch.pipeline().addLast(new StringEncoder()); 8. } 9. }) 10. .connect("127.0.0.1", 8080); 11. 12. System.out.println(channelFuture.channel()); // 1 13. channelFuture.sync(); // 2 14. System.out.println(channelFuture.channel()); // 3
- 执行到 1 时,连接未建立,打印
[id: 0x2e1884dd]
- 执行到 2 时,sync 方法是同步等待连接建立完成
- 执行到 3 时,连接肯定建立了,打印
[id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:
1. ChannelFuture channelFuture = new Bootstrap() 2. .group(new NioEventLoopGroup()) 3. .channel(NioSocketChannel.class) 4. .handler(new ChannelInitializer<Channel>() { 5. @Override 6. protected void initChannel(Channel ch) { 7. ch.pipeline().addLast(new StringEncoder()); 8. } 9. }) 10. .connect("127.0.0.1", 8080); 11. System.out.println(channelFuture.channel()); // 1 12. channelFuture.addListener((ChannelFutureListener) future -> { 13. System.out.println(future.channel()); // 2 14. });
- 执行到 1 时,连接未建立,打印
[id: 0x749124ba]
- ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印
[id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]
CloseFuture
关闭是由另外一个线程来进行处理的,跟上面的原理一样
这里可以看出CLOSE之前就执行了关闭操作
正确关闭的第一种方式:同步关闭处理
1. ChannelFuture closeFuture = channel.closeFuture(); 2. closeFuture.sync(); 3. log.debug("处理关闭以后的操作");
正确关闭的第二种方式:异步关闭处理
1. closeFuture.addListener(new ChannelFutureListener() { 2. public void operationComplete(ChannelFuture future) throws Exception { 3. log.debug("处理关闭以后的操作"); 4. group.shutdownGracefully(); 5. } 6. });
上述的测试代码
1. @Slf4j 2. public class CloseFutureClient { 3. public static void main(String[] args) throws Exception{ 4. NioEventLoopGroup group = new NioEventLoopGroup(); 5. ChannelFuture channelFuture = new Bootstrap() 6. .group(group) 7. .channel(NioSocketChannel.class) 8. .handler(new ChannelInitializer<NioSocketChannel>() { 9. protected void initChannel(NioSocketChannel ch) throws Exception { 10. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); 11. ch.pipeline().addLast(new StringEncoder()); 12. } 13. }).connect(new InetSocketAddress("localhost", 8080)); 14. Channel channel = channelFuture.sync().channel(); 15. log.debug("{}",channel); 16. new Thread(()->{ 17. Scanner scanner = new Scanner(System.in); 18. while (true){ 19. String line = scanner.nextLine(); 20. if("q".equals(line)){ 21. 22. channel.close();//close异步操作 23. log.debug("处理关闭之后的操作");//不能在这里善后 错误的 24. break; 25. } 26. } 27. },"input").start(); 28. 29. //获取CloseFuture对象 30. //1)同步关闭处理 31. //2)异步关闭处理 32. /* ChannelFuture closeFuture = channel.closeFuture(); 33. closeFuture.sync(); 34. log.debug("处理关闭以后的操作"); */ 35. 36. ChannelFuture closeFuture = channel.closeFuture(); 37. closeFuture.addListener(new ChannelFutureListener() { 38. public void operationComplete(ChannelFuture future) throws Exception { 39. log.debug("处理关闭以后的操作"); 40. group.shutdownGracefully(); 41. } 42. }); 43. 44. } 45. }
为什么要异步关闭
为什么不在一个线程中去执行建立连接、去执行关闭 channel,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。
还有人会笼统地回答,因为 netty 异步方式用了多线程、多线程就效率高。其实这些认识都比较片面,多线程和异步所提升的效率并不是所认为的
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下
因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍,(一个小时内接待病人的效率大大提高,四个医生不是同时工作,但都要干满8小时才下班)
要点
- 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势,单线程不能同时处理多个任务,因此无法实现异步处理提高效率。异步处理需要利用多线程或多进程机制来实现,以便同时处理多个任务。多线程或多进程能够允许程序同时执行多个操作,从而提高效率。
- 异步并没有缩短响应时间,反而有所增加,异步可以提高系统的吞吐量,即能够同时处理多个请
- 合理进行任务拆分,也是利用异步的关键