自定义消息协议的实现逻辑
消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。
自定义编码器
自定义消息协议:
//自定义消息协议 public class MessageProtocal { //消息的长度 private int length; //消息的内容 private byte[] content; public int getLength() { return length; } public void setLength(int length) { this.length = length; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
客户端基本代码
public class NettyClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(1); Bootstrap bootstrap = new Bootstrap(); //设置相关的参数 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //添加处理器,分包编码器 pipeline.addLast(new MessageEncoder()); //添加具体的业务处理器 pipeline.addLast(new NettyMessageClientHandler()); } }); System.out.println("客户端启动了"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync(); channelFuture.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客户端业务代码
public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> { //连接通道创建后要向服务端发送消息 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i=0;i<200;i++){ String msg = "西安科技大学"; //创建消息协议对象 MessageProtocal messageProtocal = new MessageProtocal(); messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length); messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8)); //发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据 ctx.writeAndFlush(messageProtocal); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception { } }
自定义编码器
public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception { out.writeInt(msg.getLength()); out.writeBytes(msg.getContent()); } }
服务端基本代码
public class NettyServer { public static void main(String[] args) throws Exception { EventLoopGroup boosGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boosGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //添加解码器 pipeline.addLast(new MessageDecoder()); pipeline.addLast(new NettyMessageServerHandler()); } }); System.out.println("Netty的服务端启动了"); ChannelFuture channelFuture = serverBootstrap.bind(9090).sync(); channelFuture.channel().closeFuture().sync(); boosGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
自定义解码器
//自定义解码器代码 public class MessageDecoder extends ByteToMessageDecoder { int length = 0; //ctx //in:客户端发送来的MessageProtocol编码后的ByteBuf数据 //out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面 @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("ByteBuf:"+in); //获得前面的4个字节的数据 == 描述实际内容的长度 if(in.readableBytes()>=4){ //ByteBuf里面可能有MessageProtocol数据 if(length==0){ length = in.readInt(); } //length = 15 if(in.readableBytes()<length){ //说明数据还没到齐,等待下一次调用decode System.out.println("当前数据量不够,继续等待"); return; } //可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了 //创建了一个指定length长度的字节数组 byte[] content = new byte[length]; //把ByteBuf里面的指定长度的数据读到content数组中 in.readBytes(content); //创建协议MessageProtocol对象赋值 MessageProtocal messageProtocal = new MessageProtocal(); messageProtocal.setLength(length); messageProtocal.setContent(content); out.add(messageProtocal); length=0; } } }
服务端业务处理代码
public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception { System.out.println("---服务器收到的数据---"); System.out.println("消息的长度:"+msg.getLength()); System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8)); } }
运行结果:
心跳机制
在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端
实现客户端发送心跳包
客户端基本代码
public class NettyClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(1); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //添加编解码器 pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyClientHandler()); } }); System.out.println("客户端启动了"); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync(); //模拟向服务端发送心跳数据 String packet = "heartbeat packet"; Random random = new Random(); Channel channel = channelFuture.channel(); while (channel.isActive()){ //随机的事件来实现时间间隔等待 int num = random.nextInt(10); Thread.sleep(num*1000); channel.writeAndFlush(packet); } group.shutdownGracefully(); } }
客户端拦截器
public class NettyClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println("客户端收到的数据"+s); } }
IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。
服务端基本代码
public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringEncoder()); pipeline.addLast(new StringDecoder()); //超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件 //创建出IdleStateEvent对象,将该对象交给下一个Handler pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS)); //HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理 pipeline.addLast(new HeartbeatServerHandler()); } }); System.out.println("Netty服务端启动了"); ChannelFuture channelFuture = serverBootstrap.bind(9090).sync(); channelFuture.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
服务端业务代码
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> { int readIdleTimes = 0; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println("服务端收到的心跳"+s); channelHandlerContext.writeAndFlush("服务端已经收到了心跳"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent)evt; switch (event.state()){ case READER_IDLE: readIdleTimes++; break; case WRITER_IDLE: System.out.println("写超时"); break; case ALL_IDLE: System.out.println("读写超时"); break; } if(readIdleTimes>3){ System.out.println("读超时超过三次,关闭连接"); ctx.writeAndFlush("超时关闭"); ctx.channel().close(); } } }