3.4 例4:同步处理任务失败
同步处理任务失败 - await
DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } RuntimeException e = new RuntimeException("error..."); log.debug("set failure, {}", e.toString()); promise.setFailure(e); }); log.debug("start..."); log.debug("{}", promise.getNow()); promise.await(); // 与 sync 和 get 区别在于,不会抛异常 log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
输出
12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null 12:18:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error... 12:18:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
3.5 例5:异步处理任务失败
异步处理任务失败
DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); promise.addListener(future -> { log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString()); }); eventExecutors.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } RuntimeException e = new RuntimeException("error..."); log.debug("set failure, {}", e.toString()); promise.setFailure(e); }); log.debug("start...");
输出
12:04:57 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start... 12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error... 12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
3.6 例6:死锁检查
await 死锁检查
DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.submit(()->{ System.out.println("1"); try { promise.await(); // 注意不能仅捕获 InterruptedException 异常 // 否则 死锁检查抛出的 BlockingOperationException 会继续向上传播 // 而提交的任务会被包装为 PromiseTask,它的 run 方法中会 catch 所有异常然后设置为 Promise 的失败结果而不会抛出 } catch (Exception e) { e.printStackTrace(); } System.out.println("2"); }); eventExecutors.submit(()->{ System.out.println("3"); try { promise.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println("4"); });
输出
1 2 3 4 io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at com.itcast.oio.DefaultPromiseTest.lambda$main$0(DefaultPromiseTest.java:27) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745) io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at com.itcast.oio.DefaultPromiseTest.lambda$main$1(DefaultPromiseTest.java:36) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745)
4 Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
先搞清楚顺序,服务端
new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(1); ctx.fireChannelRead(msg); // 1 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(2); ctx.fireChannelRead(msg); // 2 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(3); ctx.channel().write(msg); // 3 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(4); ctx.write(msg, promise); // 4 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(5); ctx.write(msg, promise); // 5 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(6); ctx.write(msg, promise); // 6 } }); } }) .bind(8080);
客户端
new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .addListener((ChannelFutureListener) future -> { future.channel().writeAndFlush("hello,world"); });
服务器端打印:
1 2 3 6 5 4
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
- 入站处理器中,ctx.fireChannelRead(msg) 是调用下一个入站处理器
- 如果注释掉 1 处代码,则仅会打印 1
- 如果注释掉 2 处代码,则仅会打印 1 2
- 3 处的 ctx.channel().write(msg) 会从尾部开始触发后续出站处理器的执行
- 如果注释掉 3 处代码,则仅会打印 1 2 3
- 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会触发上一个出站处理器
- 如果注释掉 6 处代码,则仅会打印 1 2 3 6
- ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己
图1 - 服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
5 ByteBuf
是对字节数据的封装
5.1 创建
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); log(buffer);
上面代码创建了一个默认的 ByteBuf(池化基于直接内存的 ByteBuf),初始容量是 10
输出
read index:0 write index:0 capacity:10
其中 log 方法参考如下
private static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2) .append("read index:").append(buffer.readerIndex()) .append(" write index:").append(buffer.writerIndex()) .append(" capacity:").append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); }
5.2 直接内存 vs 堆内存
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
5.3 池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
5.4 组成
ByteBuf 由四部分组成
最开始读写指针都在 0 位置
5.5 写入
方法列表,省略一些不重要的方法
方法签名 | 含义 | 备注 |
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
- 网络传输,默认习惯是 Big Endian
先写入 4 个字节
buffer.writeBytes(new byte[]{1, 2, 3, 4}); log(buffer);
结果是
read index:0 write index:4 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 |.... | +--------+-------------------------------------------------+----------------+
再写入一个 int 整数,也是 4 个字节
buffer.writeInt(5); log(buffer);
结果是
read index:0 write index:8 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 00 00 00 05 |........ | +--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置