开发者社区> 问答> 正文

Netty 长连接不读取报文超时断开问题

请问netty服务端长连接运行一段时间经常不读取报文,超时断开,设备主动重连后又正常,这是什么原因

展开
收起
深圳科华 2020-10-13 17:02:49 1236 0
1 条回答
写回答
取消 提交回答
  • 导入Msgpack compile 'org.msgpack:msgpack:0.6.12' 1 创建对象,命名DeviceValue(注意:需要在对象头上注入@Message) package com.zmm.netty4msgpacktest.domain;

    import org.msgpack.annotation.Message;

    @Message public class DeviceValue {

    private int type;
    
    private int seatId;
    
    private int speed;
    
    private int angle;
    
    public int getType() {
        return type;
    }
    
    public DeviceValue() {
    }
    
    public void setType(int type) {
        this.type = type;
    }
    
    public int getSeatId() {
        return seatId;
    }
    
    public void setSeatId(int seatId) {
        this.seatId = seatId;
    }
    
    public int getSpeed() {
        return speed;
    }
    
    public void setSpeed(int speed) {
        this.speed = speed;
    }
    
    public int getAngle() {
        return angle;
    }
    
    public void setAngle(int angle) {
        this.angle = angle;
    }
    
    
    @Override
    public String toString() {
        return "DeviceValue{" +
                "type=" + type +
                ", seatId=" + seatId +
                ", speed=" + speed +
                ", angle=" + angle +
                '}';
    }
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 创建type标记,命名TypeData(目的:通过DeviceValue中的不同type来区分数据类型) package com.zmm.netty4msgpacktest.domain;

    public interface TypeData {

    //模式
    byte PING = 1;
    
    byte PONG = 2;
    
    byte CUSTOME = 3;
    
    //*******************************
    byte PING_SEAT = 100;
    
    byte PONG_SEAT = 101;
    
    byte SERVER_RESPONSE = 102;
    
    byte SERVER_RESISTANT = 103;
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 创建msgpack编码器(Msgpack的具体用法可以去查一下,网上很多,其传输效率真的很高,很适合大量数据传递的情况) package com.zmm.netty4msgpacktest.code;

    import org.msgpack.MessagePack;

    import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder;

    public class MsgPackEncode extends MessageToByteEncoder {

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        MessagePack msgPack = new MessagePack();
        byte[] raw = msgPack.write(msg);
        out.writeBytes(raw);
    }
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 创建msgpack解码器 package com.zmm.netty4msgpacktest.code;

    import com.zmm.netty4msgpacktest.domain.DeviceValue;

    import org.msgpack.MessagePack;

    import java.util.List;

    import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder;

    public class MsgPackDecode extends MessageToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        final byte[] array;
        final int length=msg.readableBytes();
        array=new byte[length];
        msg.getBytes(msg.readerIndex(), array,0,length);
        MessagePack msgpack=new MessagePack();
        out.add(msgpack.read(array, DeviceValue.class));
    }
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 创建相同Server端和Client端共有的消息处理类(通过传递的type,执行不同的逻辑) package com.zmm.netty4msgpacktest.common;

    import com.zmm.netty4msgpacktest.domain.DeviceValue; import com.zmm.netty4msgpacktest.domain.TypeData;

    import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent;

    public abstract class CustomHeartbeatHandler extends ChannelInboundHandlerAdapter {

    protected String name;
    private int heartbeatCount = 0;
    
    public CustomHeartbeatHandler(String name) {
        this.name = name;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        DeviceValue deviceValue = (DeviceValue) msg;
        int type = deviceValue.getType();
        System.out.println("CustomHeartbeatHandler type="+type);
    
        switch (type){
            case 1:
                sendPongMsg(ctx);
                break;
    
            case 2:
                System.out.println(name + " get pong msg from " + ctx.channel().remoteAddress());
    
                break;
    
            case 3:
                handleData(ctx, msg);
    
                break;
        }
    
    }
    
    protected void sendPingMsg(ChannelHandlerContext context) {
        DeviceValue deviceValue = new DeviceValue();
        deviceValue.setType(TypeData.PING);
        deviceValue.setSpeed(0);
        deviceValue.setAngle(15);
        deviceValue.setSeatId(TypeData.PING_SEAT);
        context.channel().writeAndFlush(deviceValue);
        heartbeatCount++;
        System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }
    
    private void sendPongMsg(ChannelHandlerContext context) {
        DeviceValue deviceValue = new DeviceValue();
        deviceValue.setType(TypeData.PONG);
        deviceValue.setSpeed(0);
        deviceValue.setAngle(15);
        deviceValue.setSeatId(TypeData.PONG_SEAT);
        context.channel().writeAndFlush(deviceValue);
        heartbeatCount++;
        System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }
    
    protected abstract void handleData(ChannelHandlerContext channelHandlerContext, Object msg);
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑.
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
    }
    
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        System.err.println("---READER_IDLE---");
    }
    
    protected void handleWriterIdle(ChannelHandlerContext ctx) {
        System.err.println("---WRITER_IDLE---");
    }
    
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        System.err.println("---ALL_IDLE---");
    }
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 创建Client端消息处理类(因为在Client里面每隔2s发送一次数据给服务器,这里接收后就不发送了) package com.zmm.netty4msgpacktest.client;

    import com.zmm.netty4msgpacktest.common.CustomHeartbeatHandler; import com.zmm.netty4msgpacktest.domain.DeviceValue; import com.zmm.netty4msgpacktest.domain.TypeData;

    import io.netty.channel.ChannelHandlerContext;

    public class ClientHandler extends CustomHeartbeatHandler { private Client client; public ClientHandler(Client client) { super("client"); this.client = client; }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, Object msg) {
        DeviceValue deviceValue = (DeviceValue) msg;
        System.out.println("client 接收数据:"+deviceValue.toString());
    

    // DeviceValue s = new DeviceValue(); // s.setType(TypeData.CUSTOME); // s.setSpeed(0); // s.setAngle(15); // s.setSeatId(TypeData.SERVER_RESPONSE); // channelHandlerContext.writeAndFlush(s); }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(name + " exception"+cause.toString());
    
    }
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 创建Server端消息处理类(服务端收到数据后,返回固定数据给客户端) package com.zmm.netty4msgpacktest.server;

    import com.zmm.netty4msgpacktest.common.CustomHeartbeatHandler; import com.zmm.netty4msgpacktest.domain.DeviceValue; import com.zmm.netty4msgpacktest.domain.TypeData;

    import io.netty.channel.ChannelHandlerContext;

    public class ServerHandler extends CustomHeartbeatHandler {

    public ServerHandler() {
        super("server");
    }
    
    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, Object msg) {
        DeviceValue deviceValue = (DeviceValue) msg;
        System.out.println("server 接收数据:"+deviceValue.toString());
    
        DeviceValue s = new DeviceValue();
        s.setType(TypeData.CUSTOME);
        s.setSpeed(0);
        s.setAngle(15);
        s.setSeatId(TypeData.SERVER_RESPONSE);
        channelHandlerContext.writeAndFlush(s);
        System.out.println("server 发送数据:"+s.toString());
    }
    
    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        super.handleReaderIdle(ctx);
        System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
        ctx.close();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(name+" exception"+cause.toString());
    }
    

    } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 创建Client(这里执行了多个程序,包括连接服务器、重连、定时发送等) package com.zmm.netty4msgpacktest.client;

    import com.zmm.netty4msgpacktest.code.MsgPackDecode; import com.zmm.netty4msgpacktest.code.MsgPackEncode; import com.zmm.netty4msgpacktest.domain.DeviceValue; import com.zmm.netty4msgpacktest.domain.TypeData;

    import java.util.Random; import java.util.concurrent.TimeUnit;

    import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler;

    public class Client { private NioEventLoopGroup workGroup = new NioEventLoopGroup(4); private Channel channel; private Bootstrap bootstrap;

    public static void main(String[] args) throws Exception {
        Client client = new Client();
        client.start();
        client.sendData();
    }
    
    public void start() {
        try {
            bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
    

    // p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0)); p.addLast(new MsgPackDecode()); p.addLast(new MsgPackEncode()); p.addLast(new ClientHandler(Client.this)); } }); doConnect();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    /**
     * 重连机制,每隔2s重新连接一次服务器
     */
    protected void doConnect() {
        if (channel != null && channel.isActive()) {
            return;
        }
    
        ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);
    
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (futureListener.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("Connect to server successfully!");
                } else {
                    System.out.println("Failed to connect to server, try connect after 2s");
    
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConnect();
                        }
                    }, 2, TimeUnit.SECONDS);
                }
            }
        });
    }
    
    /**
     * 发送数据 每隔2秒发送一次
     * @throws Exception
     */
    public void sendData() throws Exception {
        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < 10000; i++) {
            if (channel != null && channel.isActive()) {
    
                DeviceValue deviceValue = new DeviceValue();
                deviceValue.setType(TypeData.CUSTOME);
                deviceValue.setAngle(i%15);
                deviceValue.setSeatId(i%30);
                deviceValue.setSpeed(i%120);
    
                System.out.println("client 发送数据:"+deviceValue.toString());
    
                channel.writeAndFlush(deviceValue);
            }
    

    60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 创建Server(Server开启一次就够了,不需要重复开启,若是服务器断开,Server和Client不会报异常,Client会继续不断的搜索服务器,直到搜索到为止) package com.zmm.netty4msgpacktest.server;

    import com.zmm.netty4msgpacktest.code.MsgPackDecode; import com.zmm.netty4msgpacktest.code.MsgPackEncode;

    import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler;

    public class Server { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workGroup = new NioEventLoopGroup(4); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer () { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new IdleStateHandler(10, 0, 0)); // p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0)); p.addLast(new MsgPackDecode()); p.addLast(new MsgPackEncode()); p.addLast(new ServerHandler()); } });

            Channel ch = bootstrap.bind(12345).sync().channel();
    
            System.out.println("------Server Start------");
    
            ch.closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
    

    } 运行 结果 Server端日志: 这里写图片描述

    Client端日志: 这里写图片描述

    2021-02-15 00:47:04
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
探索连接的最后十秒”落时”的网关 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载