一、网络编程的本质与IO模型基石
网络编程的核心是实现跨主机的进程间通信,而IO模型则是决定通信性能、并发能力的核心底层逻辑。在深入Netty之前,必须先彻底搞懂IO模型的核心分类与本质区别,这是所有网络编程的根基。
1.1 核心概念的权威界定
基于UNIX网络编程的标准定义,先明确两个极易混淆的核心维度:
- 阻塞/非阻塞:描述线程等待IO事件就绪的状态。阻塞模式下,线程在IO事件就绪前会被挂起,无法执行其他任务;非阻塞模式下,线程会立即返回,无需等待事件就绪,可轮询检查事件状态。
- 同步/异步:描述数据拷贝的发起主体。同步模式下,用户线程主动发起数据从内核态到用户态的拷贝,全程阻塞线程;异步模式下,由操作系统完成数据拷贝后,主动通知用户线程,全程无需用户线程参与。
1.2 四大IO模型的核心对比与通俗解读
| IO模型 | 核心特点 | 适用场景 | 核心痛点 |
| 同步阻塞IO(BIO) | 一连接一线程,阻塞等待连接与数据读写 | 连接数少、架构简单的场景 | 线程资源消耗大,并发上限极低,高并发下性能雪崩 |
| 同步非阻塞IO(NIO) | 单线程管理多连接,非阻塞轮询事件就绪状态 | 高并发、短连接场景 | 轮询消耗CPU,API复杂,开发门槛极高 |
| IO多路复用 | 单线程通过Selector监听多个连接的事件,仅事件就绪时才通知线程处理 | 高并发、长连接场景,是当前主流架构的底层基石 | 原生API复杂,需处理大量底层细节,存在已知底层BUG |
| 异步非阻塞IO(AIO) | 操作系统完成IO全流程后通知用户线程,全程无阻塞 | 连接数多、读写操作耗时的场景 | Linux系统下实现不完善,实际生产应用极少 |
通俗类比:把IO操作比作餐厅点餐
- BIO:你点单后一直站在柜台前等待,直到餐品做好,期间无法做任何事
- NIO:你点单后拿到取餐号,回到座位每隔一段时间去柜台看餐品是否做好
- IO多路复用:餐厅有叫号屏,所有取餐号的就绪状态统一展示,餐品做好后叫号通知你,你只需等待通知即可
- AIO:你点单后直接坐好,餐品做好后服务员直接送到你的座位上
1.3 原生Java NIO的核心痛点(为什么必须用Netty)
Java从JDK1.4开始提供NIO API,但原生NIO存在无法忽视的致命问题,这也是Netty成为Java网络编程事实标准的核心原因:
- Epoll空轮询BUG:Linux系统下,Selector会被意外唤醒,导致线程空轮询,CPU占用率飙升至100%,该问题在JDK中始终未被彻底修复
- API开发门槛极高:需手动处理Channel、Selector、ByteBuffer等核心组件,代码冗余度高,异常场景处理复杂,稍有不慎就会出现BUG
- 半包粘包问题无原生解决方案:TCP是面向流的无边界协议,原生NIO未提供编解码框架,需开发者手动处理半包粘包,极易出错
- 缺少高级特性封装:无内存池、零拷贝、心跳检测、断线重连、流量控制等生产级特性,需开发者手动实现
- 可靠性与稳定性不足:需手动处理断连、重连、网络波动等异常场景,容错能力弱
二、Netty核心定位与架构设计
Netty是一款基于Java NIO封装的高性能、异步事件驱动的网络编程框架,屏蔽了原生NIO的底层复杂度,提供了极简易用的API,同时具备极高的性能、稳定性和可扩展性,是Dubbo、RocketMQ、Elasticsearch、网关等中间件的核心通信层底座。
2.1 Netty的核心优势
- 性能极致:基于池化内存管理、零拷贝技术、Reactor线程模型,性能远超原生NIO,处于行业顶尖水平
- API友好:高度封装的API,屏蔽了底层网络细节,开发者只需关注业务逻辑,开发效率提升数倍
- 稳定性极强:彻底修复了原生NIO的Epoll空轮询BUG,完善的异常处理机制,经过了海量生产环境的验证
- 功能全面:内置了丰富的编解码器、心跳检测、流量控制、SSL/TLS加密等生产级特性,支持多种主流协议
- 可扩展性强:基于责任链模式的Pipeline设计,可灵活定制和扩展业务逻辑,无侵入式开发
2.2 Netty的核心架构:主从Reactor多线程模型
Netty的核心架构基于Doug Lea在《Scalable IO in Java》中提出的Reactor模式实现,默认采用主从Reactor多线程模型,这是支撑Netty高并发、高性能的核心架构。 主从Reactor模型的核心分工:
- 主Reactor(BossGroup):负责监听客户端的连接请求,完成TCP三次握手建立连接后,将生成的Channel注册到从Reactor上,本身不处理业务逻辑
- 从Reactor(WorkerGroup):负责管理已注册的Channel,监听所有读写事件,事件就绪后,将事件分发到Pipeline责任链中,由对应的Handler完成业务处理
- 业务线程池:Handler中若存在耗时操作,必须提交到独立的业务线程池执行,避免阻塞Reactor线程
三、Netty核心组件全解析
Netty的所有功能都基于核心组件实现,彻底理解每个组件的作用、底层逻辑与使用规范,是掌握Netty的核心前提。
3.1 启动引导类:Bootstrap与ServerBootstrap
这两个类是Netty服务端与客户端的启动入口,负责整个Netty程序的初始化、配置与启动,是程序的总入口。
- ServerBootstrap:服务端启动引导类,用于绑定服务端口,启动服务端程序
- Bootstrap:客户端启动引导类,用于连接服务端,启动客户端程序
核心配置规范:
- 服务端必须配置两个EventLoopGroup(BossGroup+WorkerGroup),客户端只需配置一个
- 必须指定Channel类型,服务端对应NioServerSocketChannel,客户端对应NioSocketChannel
- 必须通过ChannelInitializer配置Pipeline责任链,完成Handler的注册
- 可通过ChannelOption配置TCP底层参数,如SO_BACKLOG、TCP_NODELAY、SO_KEEPALIVE等
3.2 线程与事件循环:EventLoop与EventLoopGroup
这是Netty线程模型的核心组件,负责管理Reactor线程,处理所有IO事件与任务。
- EventLoopGroup:本质是线程池,管理多个EventLoop,负责EventLoop的生命周期管理与任务分配,服务端的BossGroup和WorkerGroup都是EventLoopGroup的实现类
- EventLoop:本质是单线程执行器,绑定了一个固定的线程,内部维护了一个Selector,负责处理多个注册到自身的Channel的所有IO事件,生命周期内线程不会更换,彻底避免了线程安全问题
核心特性与规范:
- 单个EventLoop可绑定多个Channel,单个Channel只会绑定到一个EventLoop上,全程不会更换,彻底解决并发安全问题
- EventLoop除了处理IO事件,还可处理定时任务、普通任务,是Netty的任务执行核心
- 默认NioEventLoopGroup的线程数为
CPU核心数 * 2,可根据业务场景手动调整 - 绝对禁止在EventLoop中执行耗时操作,否则会阻塞线程,导致该EventLoop绑定的所有Channel的事件无法处理,引发性能雪崩
3.3 通信载体:Channel
Channel是Netty对网络通信套接字的抽象,对应Java NIO的Channel,是网络读写操作的核心载体,代表了一个客户端与服务端的连接。
核心特性:
- 所有IO操作都是异步的,调用后会立即返回ChannelFuture,可通过监听器获取操作结果
- 提供了网络连接的状态管理、读写操作、配置管理等核心能力
- 每个Channel都绑定了唯一的ChannelPipeline,负责事件的处理与流转
- 常用实现类:NioServerSocketChannel(服务端接收连接的Channel)、NioSocketChannel(客户端与服务端通信的Channel)
3.4 责任链核心:ChannelPipeline与ChannelHandlerContext
Netty基于责任链模式实现了事件的流转与处理,核心就是ChannelPipeline、ChannelHandler与ChannelHandlerContext。
- ChannelPipeline:每个Channel对应唯一的ChannelPipeline,本质是双向链表,维护了ChannelHandler的有序列表,负责入站与出站事件的有序流转
- ChannelHandlerContext:每个ChannelHandler对应唯一的ChannelHandlerContext,负责Handler与Pipeline之间的交互,保存了Handler的上下文信息,提供了事件流转的核心方法
核心事件流转规则(90%开发者都会踩坑的点):
- 入站事件:从网络底层流向用户业务层,执行顺序为Pipeline中Handler的添加顺序从头到尾,只能通过
fireChannelRead()方法传递给下一个入站Handler - 出站事件:从用户业务层流向网络底层,执行顺序为Pipeline中Handler的添加顺序从尾到头,只能通过
writeAndFlush()等方法传递给下一个出站Handler - 入站事件与出站事件的流转互不干扰,入站Handler只能处理入站事件,出站Handler只能处理出站事件
3.5 业务处理器:ChannelHandler
ChannelHandler是开发者实现业务逻辑的核心入口,所有业务逻辑、编解码、异常处理都通过ChannelHandler实现,是Netty中最常用的组件。
核心分类:
- ChannelInboundHandler:入站事件处理器,处理网络底层传入的事件,如连接建立、数据读取、空闲事件、异常通知等
- ChannelOutboundHandler:出站事件处理器,处理用户向网络底层发送的事件,如数据写入、连接关闭、端口绑定等
Netty提供了两个默认的适配器类,简化开发:ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter,开发者只需继承适配器类,重写对应的事件处理方法即可。
核心使用规范:
- @Sharable注解的使用:只有无状态的Handler才能添加该注解,添加后可被多个ChannelPipeline共享,避免重复创建对象;有状态的Handler绝对不能添加,否则会出现线程安全问题
- 入站数据处理完成后,必须调用
fireChannelRead()方法将事件传递给下一个Handler,否则事件会中断流转 - 必须重写
exceptionCaught()方法,处理异常事件,关闭资源,否则异常会被吞掉,无法定位问题 - 耗时操作必须提交到独立的业务线程池执行,绝对不能在Handler的事件方法中直接执行
3.6 数据容器:ByteBuf
ByteBuf是Netty对字节数据的容器抽象,替代了Java NIO的ByteBuffer,解决了原生ByteBuffer的所有痛点,是Netty高性能的核心组件之一。
3.6.1 ByteBuf的核心优势
对比原生ByteBuffer,ByteBuf具备颠覆性的优势:
- 双索引设计:维护了读索引(readerIndex)与写索引(writerIndex),无需调用flip()方法切换读写模式,彻底避免了原生ByteBuffer的操作失误
- 内存池化支持:提供了池化的ByteBuf实现,复用内存对象,减少内存分配与回收的开销,降低GC压力
- 灵活的内存类型:支持堆内内存、堆外内存、复合内存三种类型,可根据业务场景灵活选择
- API更友好:提供了丰富的读写方法,支持链式调用,操作更便捷
- 自动扩容:写入数据时,若容量不足会自动扩容,无需开发者手动计算与管理
3.6.2 ByteBuf的核心分类
| 分类维度 | 类型 | 核心特点 | 适用场景 |
| 内存管理 | 池化PooledByteBuf | 从内存池中获取内存,复用对象,性能极高 | 生产环境默认使用,高频读写场景 |
| 内存管理 | 非池化UnpooledByteBuf | 每次创建都分配新的内存,性能较低 | 低频读写、简单测试场景 |
| 内存类型 | 堆内HeapByteBuf | 内存分配在JVM堆中,受GC管理,无需手动释放 | 简单数据处理,无高频读写场景 |
| 内存类型 | 堆外DirectByteBuf | 内存分配在操作系统的堆外内存中,不受GC管理,需手动释放 | 网络IO传输,零拷贝场景,生产环境首选 |
| 内存类型 | 复合CompositeByteBuf | 将多个ByteBuf合并为一个逻辑上的ByteBuf,无需物理内存拷贝 | 多数据包合并场景,零拷贝优化 |
3.6.3 ByteBuf的核心使用规范
- 内存释放规则:堆外内存的ByteBuf必须手动释放,否则会造成内存泄漏;Netty默认会在Pipeline中完成释放,若ByteBuf没有传递到下一个Handler,必须手动调用release()方法释放
- 引用计数机制:ByteBuf基于引用计数管理内存,每次retain()引用计数+1,每次release()引用计数-1,计数为0时内存被释放
- 生产环境优先使用池化堆外内存:
PooledByteBufAllocator.DEFAULT.directBuffer(),性能最优,零拷贝支持最好 - 避免频繁创建非池化的ByteBuf,否则会造成频繁GC,影响性能
3.7 异步结果处理:ChannelFuture与Promise
Netty的所有IO操作都是异步的,调用后会立即返回,无法立即获取操作结果,Netty通过ChannelFuture与Promise实现异步结果的处理。
- ChannelFuture:继承了JDK的Future接口,代表异步IO操作的结果,可通过isDone()判断操作是否完成,get()获取操作结果,addListener()添加监听器,操作完成后触发回调
- Promise:继承了ChannelFuture,是可写的ChannelFuture,提供了setSuccess()、setFailure()方法,手动设置异步操作的结果,是Netty内部异步编程的核心接口
核心使用规范:
- 优先使用监听器模式处理异步结果,避免调用get()方法阻塞线程
- 连接、绑定、关闭等异步操作,必须添加监听器,判断操作是否成功,处理异常场景
- 绝对不能在EventLoop线程中调用get()的阻塞方法,否则会阻塞线程,引发性能问题
四、Netty核心高级特性底层原理
4.1 零拷贝技术
零拷贝是Netty高性能的核心特性之一,核心目标是减少数据在用户态与内核态之间的拷贝次数,减少CPU的上下文切换开销,提升数据传输效率。
Netty的零拷贝分为两个维度:操作系统级零拷贝与用户态零拷贝。
- 操作系统级零拷贝:基于Linux的sendFile系统调用实现,通过FileRegion传输文件时,数据直接从内核缓冲区传输到网卡,无需经过用户态,减少了2次内存拷贝与2次上下文切换,文件传输性能提升数倍
- 用户态零拷贝:
- 堆外内存DirectByteBuf:网络IO传输时,无需将堆内内存数据拷贝到堆外内存,直接使用堆外内存传输,减少了一次内存拷贝
- CompositeByteBuf:将多个ByteBuf合并为一个逻辑上的ByteBuf,无需物理内存拷贝,避免了数据合并时的内存复制
- 切片操作slice():将一个ByteBuf切分为多个共享底层内存的ByteBuf,无需内存拷贝,实现数据的零拷贝拆分
- wrap()操作:将字节数组、ByteBuffer包装为ByteBuf,共享底层内存,无需拷贝
4.2 内存池化管理
Netty的内存池是其高性能的核心支撑,解决了频繁内存分配与回收带来的性能开销、GC压力、内存碎片问题。
核心实现原理:
- Netty基于jemalloc内存分配算法实现了内存池,将内存划分为不同规格的块,按需分配,避免内存碎片
- 内存池分为Arena、Chunk、Page、SubPage四个层级,层级化管理内存,提升分配效率
- 池化的ByteBuf会被复用,分配时从内存池中获取,释放时归还到内存池,无需频繁创建与销毁对象,大幅降低GC压力
- Netty4.x版本默认开启池化内存管理,生产环境无需手动修改,即可获得最优的内存性能
4.3 编解码框架与半包粘包解决方案
TCP是面向流的协议,数据以字节流的形式传输,没有消息边界,这就会导致半包粘包问题,是网络编程中必须解决的核心问题。
4.3.1 半包粘包的核心成因
- 应用程序写入的字节大小超过了TCP发送缓冲区的大小
- 进行MSS大小的TCP分片,数据包长度超过了MTU最大传输单元
- Nagle算法导致的TCP数据包合并
- 接收方读取不及时,缓冲区堆积了多个数据包
4.3.2 行业标准解决方案
Netty提供了成熟的编解码器,彻底解决半包粘包问题,行业主流的解决方案是消息头+长度字段的固定格式协议,对应Netty提供的LengthFieldBasedFrameDecoder解码器,这是生产环境首选的解决方案。
LengthFieldBasedFrameDecoder核心参数说明:
- maxFrameLength:最大帧长度,超过该长度的数据包会被丢弃,避免内存溢出
- lengthFieldOffset:长度字段在数据包中的偏移量
- lengthFieldLength:长度字段的字节长度,通常为4字节(int类型)
- lengthAdjustment:长度字段的调整值,用于修正数据包长度的计算
- initialBytesToStrip:解码后需要跳过的字节数,通常用于跳过魔数、版本号、长度字段等头部信息
4.4 心跳与空闲检测机制
长连接场景下,必须通过心跳机制检测连接的可用性,及时处理断连、网络波动等异常场景,Netty提供了IdleStateHandler空闲检测处理器,可极简实现心跳机制。
IdleStateHandler核心参数:
- readerIdleTime:读空闲时间,指定时间内没有收到对方的数据包,触发读空闲事件
- writerIdleTime:写空闲时间,指定时间内没有向对方发送数据包,触发写空闲事件
- allIdleTime:全空闲时间,指定时间内没有读写操作,触发全空闲事件
- unit:时间单位,通常为秒
核心实现逻辑:
- 将IdleStateHandler添加到Pipeline的最前端,作为第一个入站Handler
- 自定义心跳处理器,继承ChannelInboundHandlerAdapter,重写userEventTriggered()方法,处理IdleStateEvent空闲事件
- 读空闲事件触发时,发送心跳包给对方,多次未收到响应则关闭连接
- 写空闲事件触发时,主动发送心跳包,保持连接活性
五、实战案例
本案例基于JDK17、Netty最新稳定版实现,包含自定义协议、半包粘包处理、心跳检测、服务端与客户端完整实现。
5.1 环境依赖配置(pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>netty-demo</artifactId>
<version>1.0.0</version>
<name>netty-demo</name>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netty.version>4.1.112.Final</netty.version>
<lombok.version>1.18.30</lombok.version>
<fastjson2.version>2.0.52</fastjson2.version>
<guava.version>32.1.3-jre</guava.version>
<mybatis.plus.version>3.5.6</mybatis.plus.version>
<mysql.version>8.0.36</mysql.version>
<springdoc.version>2.5.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis.plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.12.1</version>
<configuration>
<source>17</source>
<target>17</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
5.2 自定义协议定义
采用行业标准的固定格式协议,彻底解决半包粘包问题,协议格式如下:
| 字段名 | 长度(字节) | 说明 |
| 魔数 | 4 | 固定值0x12345678,用于校验数据包合法性 |
| 版本号 | 1 | 协议版本号,当前为1 |
| 指令类型 | 1 | 0-业务数据,1-心跳请求,2-心跳响应 |
| 数据长度 | 4 | 消息体数据的字节长度 |
| 消息体 | 不固定 | 业务数据,JSON格式 |
package com.jam.demo.protocol;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 自定义通信协议实体
* @author ken
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CustomProtocol {
/**
* 魔数,固定值,用于校验数据包合法性
*/
public static final int MAGIC_NUMBER = 0x12345678;
/**
* 协议版本号
*/
public static final byte VERSION = 1;
/**
* 指令类型-业务数据
*/
public static final byte COMMAND_TYPE_DATA = 0;
/**
* 指令类型-心跳请求
*/
public static final byte COMMAND_TYPE_HEARTBEAT_REQUEST = 1;
/**
* 指令类型-心跳响应
*/
public static final byte COMMAND_TYPE_HEARTBEAT_RESPONSE = 2;
/**
* 协议头部固定长度
*/
public static final int HEADER_LENGTH = 10;
/**
* 魔数
*/
private int magicNumber;
/**
* 版本号
*/
private byte version;
/**
* 指令类型
*/
private byte commandType;
/**
* 数据长度
*/
private int dataLength;
/**
* 消息体数据
*/
private byte[] data;
}
5.3 编解码器实现
5.3.1 协议编码器
package com.jam.demo.codec;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
/**
* 自定义协议编码器
* @author ken
*/
@Slf4j
public class CustomProtocolEncoder extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) {
out.writeInt(msg.getMagicNumber());
out.writeByte(msg.getVersion());
out.writeByte(msg.getCommandType());
out.writeInt(msg.getDataLength());
if (msg.getDataLength() > 0) {
out.writeBytes(msg.getData());
}
log.debug("协议编码完成,指令类型:{},数据长度:{}", msg.getCommandType(), msg.getDataLength());
}
}
5.3.2 协议解码器
package com.jam.demo.codec;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
/**
* 自定义协议解码器
* 基于LengthFieldBasedFrameDecoder实现,彻底解决半包粘包问题
* @author ken
*/
@Slf4j
public class CustomProtocolDecoder extends LengthFieldBasedFrameDecoder {
/**
* 解码器构造函数
* maxFrameLength:最大帧长度,这里设置为10MB
* lengthFieldOffset:长度字段偏移量,魔数4+版本1+指令1=6字节
* lengthFieldLength:长度字段长度,4字节
* lengthAdjustment:长度调整值,0
* initialBytesToStrip:跳过的字节数,0,完整保留头部信息
*/
public CustomProtocolDecoder() {
super(10 * 1024 * 1024, 6, 4, 0, 0);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
try {
int magicNumber = frame.readInt();
if (magicNumber != CustomProtocol.MAGIC_NUMBER) {
log.error("非法数据包,魔数校验失败,远程地址:{}", ctx.channel().remoteAddress());
ctx.close();
return null;
}
CustomProtocol protocol = new CustomProtocol();
protocol.setMagicNumber(magicNumber);
protocol.setVersion(frame.readByte());
protocol.setCommandType(frame.readByte());
protocol.setDataLength(frame.readInt());
if (protocol.getDataLength() > 0) {
byte[] data = new byte[protocol.getDataLength()];
frame.readBytes(data);
protocol.setData(data);
}
log.debug("协议解码完成,指令类型:{},数据长度:{}", protocol.getCommandType(), protocol.getDataLength());
return protocol;
} finally {
frame.release();
}
}
}
5.4 心跳处理器实现
5.4.1 服务端心跳处理器
package com.jam.demo.handler;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* 服务端心跳处理器
* @author ken
*/
@Slf4j
@ChannelHandler.Sharable
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 最大空闲次数,超过该次数关闭连接
*/
private static final int MAX_IDLE_COUNT = 3;
/**
* 当前空闲次数
*/
private int idleCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.READER_IDLE) {
idleCount++;
log.warn("读空闲事件触发,远程地址:{},当前空闲次数:{}", ctx.channel().remoteAddress(), idleCount);
if (idleCount >= MAX_IDLE_COUNT) {
log.error("连续{}次读空闲,关闭连接,远程地址:{}", MAX_IDLE_COUNT, ctx.channel().remoteAddress());
ctx.close();
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
idleCount = 0;
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() == CustomProtocol.COMMAND_TYPE_HEARTBEAT_REQUEST) {
log.debug("收到客户端心跳请求,远程地址:{}", ctx.channel().remoteAddress());
CustomProtocol response = new CustomProtocol();
response.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
response.setVersion(CustomProtocol.VERSION);
response.setCommandType(CustomProtocol.COMMAND_TYPE_HEARTBEAT_RESPONSE);
response.setDataLength(0);
ctx.writeAndFlush(response);
return;
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("服务端心跳处理器异常,远程地址:{}", ctx.channel().remoteAddress(), cause);
ctx.close();
}
}
5.4.2 客户端心跳处理器
package com.jam.demo.handler;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* 客户端心跳处理器
* @author ken
*/
@Slf4j
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.WRITER_IDLE) {
log.debug("写空闲事件触发,发送心跳请求到服务端");
CustomProtocol request = new CustomProtocol();
request.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
request.setVersion(CustomProtocol.VERSION);
request.setCommandType(CustomProtocol.COMMAND_TYPE_HEARTBEAT_REQUEST);
request.setDataLength(0);
ctx.writeAndFlush(request);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() == CustomProtocol.COMMAND_TYPE_HEARTBEAT_RESPONSE) {
log.debug("收到服务端心跳响应");
return;
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("客户端心跳处理器异常", cause);
ctx.close();
}
}
5.5 业务处理器实现
5.5.1 服务端业务处理器
package com.jam.demo.handler;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* 服务端业务处理器
* @author ken
*/
@Slf4j
@ChannelHandler.Sharable
public class ServerBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("客户端连接成功,远程地址:{}", ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("客户端连接断开,远程地址:{}", ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() != CustomProtocol.COMMAND_TYPE_DATA) {
log.warn("非业务数据指令,忽略处理,指令类型:{}", protocol.getCommandType());
return;
}
String data = new String(protocol.getData(), StandardCharsets.UTF_8);
log.info("收到客户端业务数据,远程地址:{},数据内容:{}", ctx.channel().remoteAddress(), data);
if (!StringUtils.hasText(data)) {
log.warn("业务数据为空,忽略处理");
return;
}
Map<String, Object> requestMap = JSON.parseObject(data);
Map<String, Object> responseMap = Map.of("code", 200, "msg", "处理成功", "data", requestMap);
byte[] responseData = JSON.toJSONString(responseMap).getBytes(StandardCharsets.UTF_8);
CustomProtocol response = new CustomProtocol();
response.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
response.setVersion(CustomProtocol.VERSION);
response.setCommandType(CustomProtocol.COMMAND_TYPE_DATA);
response.setDataLength(responseData.length);
response.setData(responseData);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("服务端业务处理器异常,远程地址:{}", ctx.channel().remoteAddress(), cause);
ctx.close();
}
}
5.5.2 客户端业务处理器
package com.jam.demo.handler;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* 客户端业务处理器
* @author ken
*/
@Slf4j
public class ClientBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("成功连接到服务端,服务端地址:{}", ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("与服务端的连接断开");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() != CustomProtocol.COMMAND_TYPE_DATA) {
log.warn("非业务数据指令,忽略处理,指令类型:{}", protocol.getCommandType());
return;
}
String data = new String(protocol.getData(), StandardCharsets.UTF_8);
log.info("收到服务端响应数据:{}", data);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("客户端业务处理器异常", cause);
ctx.close();
}
}
5.6 服务端与客户端启动实现
5.6.1 Netty服务端启动类
package com.jam.demo.server;
import com.jam.demo.codec.CustomProtocolDecoder;
import com.jam.demo.codec.CustomProtocolEncoder;
import com.jam.demo.handler.ServerBusinessHandler;
import com.jam.demo.handler.ServerHeartbeatHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Netty服务端启动类
* @author ken
*/
@Slf4j
@Component
public class NettyServer {
/**
* 服务端监听端口
*/
private static final int PORT = 9000;
/**
* 读空闲时间,单位秒
*/
private static final int READER_IDLE_TIME = 10;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
/**
* 启动Netty服务端
*/
public void start() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS))
.addLast(new CustomProtocolDecoder())
.addLast(new CustomProtocolEncoder())
.addLast(new ServerHeartbeatHandler())
.addLast(new ServerBusinessHandler());
}
});
ChannelFuture future = bootstrap.bind(PORT).sync();
log.info("Netty服务端启动成功,监听端口:{}", PORT);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Netty服务端启动异常", e);
Thread.currentThread().interrupt();
} finally {
shutdown();
}
}
/**
* 关闭Netty服务端,释放资源
*/
public void shutdown() {
if (bossGroup != null && !bossGroup.isShutdown()) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null && !workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
}
log.info("Netty服务端已关闭,资源释放完成");
}
}
5.6.2 Netty客户端启动类
package com.jam.demo.client;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.codec.CustomProtocolDecoder;
import com.jam.demo.codec.CustomProtocolEncoder;
import com.jam.demo.handler.ClientBusinessHandler;
import com.jam.demo.handler.ClientHeartbeatHandler;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Netty客户端启动类
* @author ken
*/
@Slf4j
@Component
public class NettyClient {
/**
* 服务端地址
*/
private static final String HOST = "127.0.0.1";
/**
* 服务端端口
*/
private static final int PORT = 9000;
/**
* 写空闲时间,单位秒
*/
private static final int WRITER_IDLE_TIME = 5;
/**
* 重连间隔时间,单位秒
*/
private static final int RECONNECT_INTERVAL = 3;
private EventLoopGroup workerGroup;
private Channel channel;
private Bootstrap bootstrap;
/**
* 初始化客户端
*/
public void init() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, WRITER_IDLE_TIME, 0, TimeUnit.SECONDS))
.addLast(new CustomProtocolDecoder())
.addLast(new CustomProtocolEncoder())
.addLast(new ClientHeartbeatHandler())
.addLast(new ClientBusinessHandler());
}
});
}
/**
* 连接服务端
*/
public void connect() {
if (bootstrap == null) {
init();
}
try {
ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
if (future.isSuccess()) {
channel = future.channel();
log.info("Netty客户端连接服务端成功,服务端地址:{}:{}", HOST, PORT);
}
future.channel().closeFuture().addListener(f -> {
log.warn("与服务端的连接断开,{}秒后尝试重连", RECONNECT_INTERVAL);
workerGroup.schedule(this::connect, RECONNECT_INTERVAL, TimeUnit.SECONDS);
});
} catch (InterruptedException e) {
log.error("Netty客户端连接服务端异常", e);
Thread.currentThread().interrupt();
}
}
/**
* 发送业务数据到服务端
* @param data 待发送的业务数据
*/
public void sendData(Map<String, Object> data) {
if (channel == null || !channel.isActive()) {
log.error("与服务端的连接未就绪,无法发送数据");
return;
}
String jsonData = JSON.toJSONString(data);
byte[] dataBytes = jsonData.getBytes(StandardCharsets.UTF_8);
CustomProtocol protocol = new CustomProtocol();
protocol.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
protocol.setVersion(CustomProtocol.VERSION);
protocol.setCommandType(CustomProtocol.COMMAND_TYPE_DATA);
protocol.setDataLength(dataBytes.length);
protocol.setData(dataBytes);
channel.writeAndFlush(protocol).addListener(future -> {
if (future.isSuccess()) {
log.debug("数据发送成功,数据内容:{}", jsonData);
} else {
log.error("数据发送失败", future.cause());
}
});
}
/**
* 关闭客户端,释放资源
*/
public void shutdown() {
if (channel != null) {
channel.close();
}
if (workerGroup != null && !workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
}
log.info("Netty客户端已关闭,资源释放完成");
}
}
5.7 Spring Boot整合与启动入口
5.7.1 Spring Boot启动类
package com.jam.demo;
import com.jam.demo.server.NettyServer;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* Netty Demo项目启动类
* @author ken
*/
@Slf4j
@SpringBootApplication
@OpenAPIDefinition(info = @Info(title = "Netty Demo API", version = "1.0.0", description = "Netty实战项目API文档"))
public class NettyDemoApplication {
public static void main(String[] args) {
SpringApplication.run(NettyDemoApplication.class, args);
log.info("Netty Demo项目启动成功");
}
/**
* 启动Netty服务端
*/
@Bean
public CommandLineRunner startNettyServer(NettyServer nettyServer) {
return args -> new Thread(nettyServer::start, "netty-server-thread").start();
}
}
5.7.2 测试接口Controller
package com.jam.demo.controller;
import com.jam.demo.client.NettyClient;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* Netty测试接口
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/netty")
@RequiredArgsConstructor
@Tag(name = "Netty测试接口", description = "Netty客户端数据发送测试接口")
public class NettyTestController {
private final NettyClient nettyClient;
/**
* 发送数据到Netty服务端
*/
@PostMapping("/send")
@Operation(summary = "发送数据", description = "通过Netty客户端发送数据到服务端")
public Map<String, Object> sendData(@RequestBody Map<String, Object> data) {
nettyClient.sendData(data);
return Map.of("code", 200, "msg", "数据发送成功");
}
/**
* 初始化并连接Netty客户端
*/
@PostMapping("/connect")
@Operation(summary = "连接服务端", description = "初始化Netty客户端并连接服务端")
public Map<String, Object> connect() {
nettyClient.init();
new Thread(nettyClient::connect, "netty-client-thread").start();
return Map.of("code", 200, "msg", "客户端连接请求已发起");
}
/**
* 关闭Netty客户端
*/
@PostMapping("/shutdown")
@Operation(summary = "关闭客户端", description = "关闭Netty客户端并释放资源")
public Map<String, Object> shutdown() {
nettyClient.shutdown();
return Map.of("code", 200, "msg", "客户端已关闭,资源释放完成");
}
}
5.8 MyBatis Plus心跳日志持久化
5.8.1 MySQL表结构
CREATE TABLE `netty_heartbeat_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`client_address` varchar(128) NOT NULL COMMENT '客户端地址',
`heartbeat_type` tinyint NOT NULL COMMENT '心跳类型:1-请求,2-响应',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_client_address` (`client_address`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='Netty心跳日志表';
5.8.2 实体类
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* Netty心跳日志实体
* @author ken
*/
@Data
@TableName("netty_heartbeat_log")
public class HeartbeatLog {
@TableId(type = IdType.AUTO)
private Long id;
private String clientAddress;
private Integer heartbeatType;
private LocalDateTime createTime;
}
六、Netty最佳实践与高频踩坑避坑指南
6.1 核心最佳实践
- 线程模型规范:绝对禁止在EventLoop线程中执行耗时操作,所有耗时业务必须提交到独立的业务线程池执行,避免阻塞Reactor线程
- 内存管理规范:生产环境优先使用池化堆外内存ByteBuf,非池化内存仅用于低频场景;堆外内存必须手动释放,避免内存泄漏
- Handler使用规范:无状态Handler添加@Sharable注解,全局单例复用;有状态Handler每次连接都创建新实例,避免线程安全问题
- Pipeline配置规范:入站Handler的添加顺序为:空闲检测->解码->业务处理;出站Handler的添加顺序为:业务处理->编码,顺序错误会导致解码失败
- 异常处理规范:所有Handler必须重写exceptionCaught()方法,处理异常并关闭资源,避免异常被吞掉无法定位
- 异步操作规范:所有异步IO操作必须添加监听器,处理成功与失败的场景,避免操作失败无感知
- 心跳机制规范:长连接场景必须配置心跳检测,客户端写空闲发送心跳,服务端读空闲检测连接可用性,及时关闭无效连接
6.2 高频踩坑避坑指南
- 内存泄漏问题
- 坑点:ByteBuf未手动释放,尤其是堆外内存,导致内存泄漏,服务内存持续上涨最终OOM
- 避坑:开启Netty的内存泄漏检测,设置JVM参数
-Dio.netty.leakDetection.level=PARANOID;ByteBuf若未传递到下一个Handler,必须手动release();使用try-finally结构确保释放
- EventLoop线程阻塞
- 坑点:在Handler中执行数据库查询、远程调用等耗时操作,阻塞EventLoop线程,导致其他连接的事件无法处理,吞吐量暴跌
- 避坑:耗时操作必须提交到独立的业务线程池执行,Handler中仅做数据编解码与事件分发
- 半包粘包问题
- 坑点:未使用Netty提供的解码器,手动处理半包粘包,逻辑错误导致数据解析失败
- 避坑:优先使用
LengthFieldBasedFrameDecoder解码器,基于固定协议格式处理,不要手动实现
- Handler线程安全问题
- 坑点:给有状态的Handler添加@Sharable注解,被多个Channel共享,导致并发数据错乱
- 避坑:只有无状态的Handler才能添加@Sharable注解,有状态的Handler每次连接都创建新实例
- 事件流转中断
- 坑点:入站Handler处理完成后,未调用fireChannelRead()方法,导致事件中断,后续Handler无法处理
- 避坑:入站数据处理完成后,必须调用fireChannelRead()方法将事件传递给下一个Handler
- TCP参数配置错误
- 坑点:未开启TCP_NODELAY,导致Nagle算法合并数据包,消息发送延迟升高;SO_BACKLOG配置过小,高并发下连接被拒绝
- 避坑:生产环境必须开启TCP_NODELAY,禁用Nagle算法;SO_BACKLOG配置为1024以上,根据并发量调整
- 异常未处理
- 坑点:未重写exceptionCaught()方法,异常被Netty默认处理,吞掉异常信息,无法定位问题
- 避坑:所有Handler必须重写exceptionCaught()方法,打印异常日志,关闭无效连接
七、总结
Netty作为Java网络编程的事实标准,屏蔽了原生NIO的底层复杂度与BUG,提供了高性能、高可靠、易扩展的网络编程能力,是分布式中间件、网关、游戏服务器、即时通信等系统的核心技术底座。
本文从IO模型的底层原理出发,彻底讲透了Netty的核心架构、核心组件、高级特性,提供了生产级可运行的完整实战案例,同时总结了最佳实践与高频踩坑指南,帮助开发者彻底掌握Netty,从入门到生产落地。
掌握Netty的核心,本质是掌握Reactor线程模型、责任链模式、内存管理、零拷贝这些底层核心思想,这些思想不仅适用于Netty,更是高性能分布式系统设计的核心基石。