开发者社区> 问答> 正文

netty 大批量发送数据的时候 出现channel关闭异常?报错

用netty建立的长链接,在长链接过多,大批量发送数据的时候。发现好多消息发送失败,并且链接断开很多。

报错的的日志:

    19:23:18.036 [nioEventLoopGroup-3-10] ERROR com.suning.live.service.MsgServiceImpl - chat msg  发送失败, msg is : {"content":"这是测试内容","fromCustNum":"test","fromHeadPic":"test","fromNickName":"test","level":1,"msgType":"text","session_id":"755069b917624224bdefd3a70c3d6c4b","toChatRoomId":"lyRoom","type":"Push2ChatRoom"}     cause: java.nio.channels.ClosedChannelException: null

代码部分:

final Set<String> custNums = redisProcess.smembers(RedisKeys.ROOM_ID_KEY_PREFIXX + roomId);
        LOGGER.info("准备发送chat msg给client, 接收消息的custNum为 :" + custNums);
        if (null == custNums || custNums.size() <= 0) {
            return;
        }
        taskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                for (String custNum : custNums) {
                     sendMsgToCustNum(msg, custNum);
                }
            }
        });



         
private void sendMsgToCustNum(final String msg, String toCustNum) {
        try {
            Channel channel = channelStore.getChannel(toCustNum);
            if (null != channel && channel.isActive()) {
                channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            LOGGER.error("chat msg  发送失败, msg is : {}     cause: ", msg, future.cause());
                        }
                    }
                });
            }
        } catch (Exception e) {
            LOGGER.error("chat msg 发送失败 wrapper, msg is : {}", e);
        }
    }




netty的链接关闭异常方法回调并没有进入。inactive channel的日志有打印
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("Unexpected exception from downstream,channel" + ctx.channel() + " will be closed, exception is "
                + cause);
        // 当有异常时,关闭channel
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("Channel " + ctx.channel() + " inactive.");
        // 删除channelGroup中的已关闭channel
        channelClosed(ctx);
        super.channelInactive(ctx);
    }



请问导致cause: java.nio.channels.ClosedChannelException: null这个问题的原因是什么。跪求帮助















展开
收起
爱吃鱼的程序员 2020-06-08 20:17:23 2695 0
1 条回答
写回答
取消 提交回答
  • https://developer.aliyun.com/profile/5yerqm5bn5yqg?spm=a2c6h.12873639.0.0.6eae304abcjaIB

    我之前用socket推消息的时候,也会报socketclosed的啥的错误。但我有重发机制。也就没打关心。

    那个错误可能与你的网络带宽已用满有关。

    你好,请问这个问题你解决了吗?

    回复 @Super帅哥哥:我知道你的意思是判断客户端连接的合法性,这个我做了。但是这样不解决服务端并发问题。因为所有的客户端都是合法的。我现在困惑的是要切换服务器的时候,客户端的重连机制是3秒,到时候10万的客户端的话,每秒并发3万多上来,服务端是承受不了。这有办法解决吗?回复 @xiaoyaoweizi:socket对连接我感觉上是没有办法限制的,比如别人服务器正常开通了一个socket端口,我们一样可以用telnet连接测试,没法拒绝。但是我可以在他进行消息传输的时候进行判断是否是‘合法’链接,每个新连接进行建立的时候进行token校验,然后每次这个连接进行消息解码处理的时候判断下这个连接进行过token校验没。我个人任务回复 @Super帅哥哥:好的,我去测试一下,我还有一个问题,netty如何限流防止高并发呢?场景假设服务端连接10万台的客户端,当服务端崩溃重启后,10客户端大量连接上来,怎么在netty层进行socket限流呢?回复 @xiaoyaoweizi:具体原因可能有所不同,要看现场情况。排除的时候,你可以将超时处理关掉再压测,如果没问题,说明就是这个情况了。至于我的那个问题原因,是因为客户端发送超时,调度连接池因为处理不过来到延迟发送超,超过了服务端设置的READER_IDLE值回复 @Super帅哥哥:你的意思是压力测试时,客户端的心跳发送会超时,还是服务端处理心跳包时超时?我也是在做压力测试,跑了12万的客户端,每隔60s发一次心跳包和一次数据。跑了6万多时,大量报出这个错误。
    2020-06-08 20:17:42
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载