- 正确的消息发送速率性能统计策略:
正确的消息发送速率性能统计方法如下:
(1)调用writeAndFlush方法之后获取ChannelFuture。
(2)新增消息发送ChannelFutureListener并注册到ChannelFuture中,监听消息发送结果,如果消息写入SocketChannel成功,则Netty会回调ChannelFutureListener的operationComplete方法。
(3)在消息发送ChannelFutureListener的operationComplete方法中进行性能统计。
正确的性能统计代码示例如下:
public voidchannelRead(ChannelHandlerContext ctx, Object msg) {
int sendBytes =((ByteBuf)msg).readableBytes();
ChannelFuture writeFuture =ctx.write(msg);
writeFuture.addListener((f) ->
{
totalSendBytes.getAndAdd(sendBytes);
});
ctx.flush();
}
对Netty消息发送相关源码进行分析,当发送的字节数大于0时,进行ByteBuf的清理工作,代码如下:
protected voiddoWrite(ChannelOutboundBuffer in) throws Exception {
//代码省略...
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes,maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
//代码省略...
}
接着分析ChannelOutboundBuffer的removeBytes(long writtenBytes)方法,将发送的字节数与当前ByteBuf可读的字节数进行对比,判断当前的ByteBuf是否完成发送,如果完成则调用remove()清理它,否则只更新下发送进度,相关代码如下:
protected voiddoWrite(ChannelOutboundBuffer in) throws Exception {
//代码省略...
if (readableBytes <=writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -=readableBytes;
}
remove();
} else {
if (writtenBytes != 0) {
buf.readerIndex(readerIndex+ (int) writtenBytes);
progress(writtenBytes);
}
break;
}
//代码省略...
}
当调用remove()方法时,最终会调用消息发送ChannelPromise的trySuccess方法,通知监听Listener消息已经完成发送,相关代码如下所示:
public booleantrySuccess(V result) {
//代码省略...
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}
//代码省略...
}
经过以上分析可以看出,调用write/writeAndFlush方法本身并不代表消息已经发送完成,只有监听write/writeAndFlush的操作结果,在异步回调监听中计数,结果才更精确。
需要注意的是,异步回调通知由Netty的NioEventLoop线程执行,即便异步回调代码写在业务线程中,也是由Netty的I/O线程来执行累加计数的,因此这块儿需要考虑多线程并发安全问题,调用堆栈示例如下:
图4 消息发送结果异步回调通知执行线程
如果消息报文比较大,或者一次批量发送的消息比较多,可能会出现“写半包”问题,即一个消息无法在一次write操作中全部完成发送,可能只发送了一半,针对此类场景,可以创建GenericProgressiveFutureListener用于实时监听消息发送进度,做更精准的统计,相关代码如下所示:
privatestatic void notifyProgressiveListeners0(
ProgressiveFuture<?> future,GenericProgressiveFutureListener<?>[] listeners, long progress,long total) {
for(GenericProgressiveFutureListener<?> l: listeners) {
if (l == null) {
break;
}
notifyProgressiveListener0(future,l, progress, total);
}
}
问题定位出来之后,按照正确的做法对Netty性能统计代码进行了修正,上线之后,结合调用链日志,很快定位出了业务高峰期偶现的部分服务时延毛刺较大问题,优化业务线程池参数配置之后问题得到解决。
- 常见的消息发送性能统计误区:
在实际业务中比较常见的性能统计误区如下:
(1)调用write/ writeAndFlush方法之后就开始统计发送速率。
(2)消息编码时进行性能统计:编码之后,获取out可读的字节数,然后做累加。编码完成并不代表消息被写入到SocketChannel中,因此性能统计也不准确。
- Netty关键性能指标采集:
除了消息发送速率,还有其它一些重要的指标需要采集和监控,无论是在调用链详情中展示,还是统一由运维采集、汇总和展示,这些性能指标对于故障的定界和定位帮助都很大。
- Netty I/O线程池性能指标:
Netty I/O线程池除了负责网络I/O消息的读写,还需要同时处理普通任务和定时任务,因此消息队列积压的任务个数是衡量Netty I/O线程池工作负载的重要指标。由于Netty NIO线程池采用的是一个线程池/组包含多个单线程线程池的机制,因此不需要像原生的JDK线程池那样统计工作线程数、最大线程数等。相关代码如下所示:
publicvoid channelActive(ChannelHandlerContext ctx) throws Exception {
kpiExecutorService.scheduleAtFixedRate(()->
{
Iterator<EventExecutor>executorGroups = ctx.executor().parent().iterator();
while (executorGroups.hasNext())
{
SingleThreadEventExecutorexecutor = (SingleThreadEventExecutor)executorGroups.next();
int size = executor.pendingTasks();
if (executor == ctx.executor())
System.out.println(ctx.channel() + "--> " + executor +" pending size in queue is : --> " + size);
else
System.out.println(executor+ " pending size in queue is : --> " + size);
}
},0,1000, TimeUnit.MILLISECONDS);
}
}
运行结果如下所示:
图5 Netty I/O线程池性能统计KPI数据
- Netty发送队列积压消息数:
Netty消息发送队列积压数可以反映网络速度、通信对端的读取速度、以及自身的发送速度等,因此对于服务调用时延的精细化分析对于问题定位非常有帮助,它的采集方式代码示例如下:
publicvoid channelActive(ChannelHandlerContext ctx) throws Exception {
writeQueKpiExecutorService.scheduleAtFixedRate(()->
{
long pendingSize =((NioSocketChannel)ctx.channel()).unsafe().outboundBuffer().totalPendingWriteBytes();
System.out.println(ctx.channel() +"--> " + " ChannelOutboundBuffer's totalPendingWriteBytes is: "
+ pendingSize + "bytes");
},0,1000, TimeUnit.MILLISECONDS);
}
执行结果如下:
图6 Netty Channel对应的消息发送队列性能KPI数据
由于totalPendingSize是volatile的,因此统计线程即便不是Netty的I/O线程,也能够正确的读取其最新值。
- Netty消息读取速率性能统计:
针对某个Channel的消息读取速率性能统计,可以在解码ChannelHandler之前添加一个性能统计ChannelHandler,用来对读取速率进行计数,相关代码示例如下(ServiceTraceProfileServerHandler类):
public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
kpiExecutorService.scheduleAtFixedRate(()->
{
int readRates =totalReadBytes.getAndSet(0);
System.out.println(ctx.channel() +"--> read rates " + readRates);
},0,1000, TimeUnit.MILLISECONDS);
ctx.fireChannelActive();
}
public void channelRead(ChannelHandlerContextctx, Object msg) {
int readableBytes =((ByteBuf)msg).readableBytes();
totalReadBytes.getAndAdd(readableBytes);
ctx.fireChannelRead(msg);
}
运行结果如下所示:
图7 NettyChannel 消息读取速率性能统计
三、总结:
本文选自《Netty进阶之路:跟着案例学Netty》一书,由电子工业出版社出版,李林锋著。
当我们需要对服务调用时延进行精细化分析时,需要把Netty通信框架底层的处理耗时数据也采集走并进行分析,由于Netty的I/O操作都是异步的,因此不能像传统同步调用那样的思路去做性能数据统计,需要注册性能统计监听器,在异步回调中完成计数。另外,Netty的I/O线程池、消息发送队列等实现比较特殊,与传统的Tomcat等框架实现策略不同,因此对于Netty的关键性能KPI数据采集不能照搬JDK和Tomcat的做法。