Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制

简介: Netty自定义消息协议的实现逻辑处理粘包拆包、心跳机制


自定义消息协议的实现逻辑


消息协议:这一次消息需要包含两个部分,即消息长度和消息内容本身。
自定义消息编码器︰消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息长度和消息内容的消息
自定义消息解码器∶消息解码器根据消息协议的消息长度,来获得指定长度的消息内容。

自定义编码器

自定义消息协议:

//自定义消息协议
public class MessageProtocal {
    //消息的长度
    private int length;
    //消息的内容
    private byte[] content;
    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    public byte[] getContent() {
        return content;
    }
    public void setContent(byte[] content) {
        this.content = content;
    }
}

客户端基本代码

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        //设置相关的参数
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加处理器,分包编码器
                        pipeline.addLast(new MessageEncoder());
                        //添加具体的业务处理器
                        pipeline.addLast(new NettyMessageClientHandler());
                    }
                });
        System.out.println("客户端启动了");
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        channelFuture.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

客户端业务代码

public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocal> {
    //连接通道创建后要向服务端发送消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<200;i++){
            String msg = "西安科技大学";
            //创建消息协议对象
            MessageProtocal messageProtocal = new MessageProtocal();
            messageProtocal.setLength(msg.getBytes(StandardCharsets.UTF_8).length);
            messageProtocal.setContent(msg.getBytes(StandardCharsets.UTF_8));
            //发送协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据
            ctx.writeAndFlush(messageProtocal);
        }
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {
    }
}

自定义编码器

public class MessageEncoder extends MessageToByteEncoder<MessageProtocal> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocal msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent());
    }
}

服务端基本代码

public class NettyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boosGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boosGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加解码器
                        pipeline.addLast(new MessageDecoder());
                        pipeline.addLast(new NettyMessageServerHandler());
                    }
                });
        System.out.println("Netty的服务端启动了");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        boosGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

自定义解码器

//自定义解码器代码
public class MessageDecoder extends ByteToMessageDecoder {
    int length = 0;
    //ctx
    //in:客户端发送来的MessageProtocol编码后的ByteBuf数据
    //out:out里的数据会被放行到下一个handler把解码出来的MessageProtocol放到out里面
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("ByteBuf:"+in);
        //获得前面的4个字节的数据 == 描述实际内容的长度
        if(in.readableBytes()>=4){
            //ByteBuf里面可能有MessageProtocol数据
            if(length==0){
                length = in.readInt();
            }
            //length = 15
            if(in.readableBytes()<length){
                //说明数据还没到齐,等待下一次调用decode
                System.out.println("当前数据量不够,继续等待");
                return;
            }
            //可读数据量>=length ==> 意味着这一次的MessageProtocol的内容已经到齐了
            //创建了一个指定length长度的字节数组
            byte[] content = new byte[length];
            //把ByteBuf里面的指定长度的数据读到content数组中
            in.readBytes(content);
            //创建协议MessageProtocol对象赋值
            MessageProtocal messageProtocal = new MessageProtocal();
            messageProtocal.setLength(length);
            messageProtocal.setContent(content);
            out.add(messageProtocal);
            length=0;
        }
    }
}

服务端业务处理代码

public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocal> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocal msg) throws Exception {
        System.out.println("---服务器收到的数据---");
        System.out.println("消息的长度:"+msg.getLength());
        System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));
    }
}

运行结果:




心跳机制

在分布式系统中,心跳机制常常在注册中心组件中提及,比如Zookeeper、Eureka、Nacos等,通过维护客户端的心跳,来判断客户端是否正常在线。如果客户端达到超时次数等预设的条件时,服务端将释放客户端的连接资源。
试想一下,当我们一个用来写数据的通道,它虽然没有下线,但这个通道长时间都不写数据了,是不是我们可以利用心跳机制,关闭此类通道及其对应的客户端



实现客户端发送心跳包

客户端基本代码

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加编解码器
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new NettyClientHandler());
                    }
                });
        System.out.println("客户端启动了");
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9090).sync();
        //模拟向服务端发送心跳数据
        String packet = "heartbeat packet";
        Random random = new Random();
        Channel channel = channelFuture.channel();
        while (channel.isActive()){
            //随机的事件来实现时间间隔等待
            int num = random.nextInt(10);
            Thread.sleep(num*1000);
            channel.writeAndFlush(packet);
        }
        group.shutdownGracefully();
    }
}

客户端拦截器

public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("客户端收到的数据"+s);
    }
}


IdleStateHandler类描述三种空闲状态
读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
写空闲︰在指定时间间隔内没有数据写入到Channel中,将会创建状态为WRITER_IDLE的ldleStateEvent对象。
读写空闲:在指定时间间隔内Channel中没有发生读写操作,将会创建状态为ALL_IDLE的ldleStateEvent对象。
服务端基本代码

public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        //超时状态处理器会在服务端发现有超过3秒没有没有发生读操作的话会触发超时事件
                        //创建出IdleStateEvent对象,将该对象交给下一个Handler
                        pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
                        //HeartbeatServerHandler必领重写userEventTriggered方法,用来做具体的超时的业务处理
                        pipeline.addLast(new HeartbeatServerHandler());
                    }
                });
        System.out.println("Netty服务端启动了");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}

服务端业务代码

public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    int readIdleTimes = 0;
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println("服务端收到的心跳"+s);
        channelHandlerContext.writeAndFlush("服务端已经收到了心跳");
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent)evt;
        switch (event.state()){
            case READER_IDLE:
                readIdleTimes++;
                break;
            case WRITER_IDLE:
                System.out.println("写超时");
                break;
            case ALL_IDLE:
                System.out.println("读写超时");
                break;
        }
        if(readIdleTimes>3){
            System.out.println("读超时超过三次,关闭连接");
            ctx.writeAndFlush("超时关闭");
            ctx.channel().close();
        }
    }
}
目录
相关文章
|
5月前
|
编解码 网络协议 开发者
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
Netty运行原理问题之NettyTCP的粘包和拆包的问题如何解决
|
5月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
266 2
|
7月前
|
网络协议
netty粘包问题分析
netty粘包问题分析
51 0
|
7月前
|
Java
Netty传输object并解决粘包拆包问题
Netty传输object并解决粘包拆包问题
67 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13538 1
|
8月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
155 1
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
181 1
|
8月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
176 0
|
8月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
273 0
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】