使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 下

简介: 使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 下

netty客户端为多实例,每个实例绑定一个线程,持续阻塞到客户端关闭为止,每个客户端中可以保存自己的业务数据,以便在后续与服务端交互时处理业务使用。客户端执行连接时,给了2次重试的机会,如果3次都没连接成功则放弃。后续可以选择将该消息重新入列消费。我们实际项目中,此处还应该预先给服务端发送一条登录消息,待服务端确认后才能执行后续通讯,这需要视实际情况进行调整。

另一个需要注意的点是EventLoopGroup是从构造函数传入的,而不是在客户端中创建的,因为当客户端数量非常多时,每个客户端都创建自己的线程组会极大的消耗服务器资源,因此我们在实际使用中是按业务去创建统一的线程组给该业务下的所有客户端共同使用的,线程组的大小需要根据业务需求灵活配置。

在init方法中,我们给客户端加上了一个handler来处理与服务端的交互,下面来看一下具体实现。

package org.example.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.example.client.NettyClient;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @author ReWind00
 * @date 2023/2/15 10:09
 */
@Slf4j
@Component
@Scope("prototype")
public class DemoClientHandler extends BaseClientHandler {
    private final String imei;
    private final Map<String, Object> bizData;
    private final NettyClient nettyClient;
    private int allIdleCounter = 0;
    private static final int MAX_IDLE_TIMES = 3;
    public DemoClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.imei = nettyClient.getImei();
        this.bizData = nettyClient.getBizData();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端imei={},通道激活成功", this.imei);
        synchronized (this.nettyClient) { //当通道激活后解锁队列线程,然后再发送消息
            this.nettyClient.notify();
        }
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.warn("客户端imei={},通道断开连接", this.imei);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端imei={},收到消息:  {}", this.imei, msg);
        //处理业务...
        if ("shutdown".equals(msg)) {
            this.nettyClient.close();
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            boolean flag = false;
            if (e.state() == IdleState.ALL_IDLE) {
                this.allIdleCounter++;
                log.info("客户端imei={}触发闲读或写第{}次", this.imei, this.allIdleCounter);
                if (this.allIdleCounter >= MAX_IDLE_TIMES) {
                    flag = true;
                }
            }
            if (flag) {
                log.warn("读写超时达到{}次,主动断开连接", MAX_IDLE_TIMES);
                ctx.channel().close();
            }
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("客户端imei={},连接异常{}", imei, cause.getMessage(), cause);
    }
}

DemoClientHandler也是多实例bean,每个实例持有自己的NettyClient引用,以便在后续处理具体业务。在channelActive方法中,我们可以看到执行了客户端实例的notify方法,此处就是在客户端创建成功并且通道激活后解除wait锁的地方。channelRead方法就是我们处理服务端发送过来的消息的方法,我们的具体业务应该在该方法执行,当然不建议长时间阻塞客户端的工作线程,可以考虑异步处理。

最后我们看一下客户端缓存类。

package org.example.client;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author ReWind00
 * @date 2023/2/15 11:01
 */
public class NettyClientHolder {
    private static final ConcurrentHashMap<String, NettyClient> clientMap = new ConcurrentHashMap<>();
    public static ConcurrentHashMap<String, NettyClient> get() {
        return clientMap;
    }
}

由于netty的通道无法序列化,因此不能存入redis,只能缓存在本地内存中,其本质就是一个ConcurrentHashMap。

五、测试

package org.example.client.controller;
import org.example.client.QueueHolder;
import org.example.client.model.NettyMsgModel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author ReWind00
 * @date 2023/2/15 13:48
 */
@RestController
@RequestMapping("/demo")
public class DemoController {
    /**
     * 间隔发送两条消息
     */
    @GetMapping("testOne")
    public void testOne() {
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World!"));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        QueueHolder.get().offer(NettyMsgModel.create("87654321", "Hello World Too!"));
    }
    /**
     * 任意发送消息
     *
     * @param imei
     * @param msg
     */
    @GetMapping("testTwo")
    public void testTwo(@RequestParam String imei, @RequestParam String msg) {
        QueueHolder.get().offer(NettyMsgModel.create(imei, msg));
    }
    /**
     * 连续发送两条消息 第二条由于redis锁将会重新放回队列延迟消费
     */
    @GetMapping("testThree")
    public void testThree() {
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World!"));
        QueueHolder.get().offer(NettyMsgModel.create("12345678", "Hello World Too!"));
    }
}

测试接口代码如上,调用testOne,日志如下:

image.png

可以看到第一条消息触发了客户端创建流程,创建后发送了消息,而5秒后的第二条消息直接通过已有通道发送了。

测试接口代码如上,调用testTwo,日志如下:

image.png

发送shutdown可以主动断开已有连接。

测试接口代码如上,调用testThree,日志如下:

image.png

可以看到第二条消息重新入列并被延迟消费了。

六、源码

https://gitee.com/jaster/netty-tcp-demo

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

后记

本demo项目仅作学习交流使用,如果要应用到生产环境还有些许不足,有问题的同学可以留言交流。



相关文章
|
3月前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
646 1
|
8天前
|
决策智能 数据库 开发者
使用Qwen2.5+SpringBoot+SpringAI+SpringWebFlux的基于意图识别的多智能体架构方案
本项目旨在解决智能体的“超级入口”问题,通过开发基于意图识别的多智能体框架,实现用户通过单一交互入口使用所有智能体。项目依托阿里开源的Qwen2.5大模型,利用其强大的FunctionCall能力,精准识别用户意图并调用相应智能体。 核心功能包括: - 意图识别:基于Qwen2.5的大模型方法调用能力,准确识别用户意图。 - 业务调用中心:解耦框架与业务逻辑,集中处理业务方法调用,提升系统灵活性。 - 会话管理:支持连续对话,保存用户会话历史,确保上下文连贯性。 - 流式返回:支持打字机效果的流式返回,增强用户体验。 感谢Qwen2.5系列大模型的支持,使项目得以顺利实施。
167 7
使用Qwen2.5+SpringBoot+SpringAI+SpringWebFlux的基于意图识别的多智能体架构方案
|
30天前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot提供了简便的方式来集成和使用分布式缓存。通过Redis和Memcached等缓存方案,可以显著提升应用的性能和扩展性。合理配置和优化缓存策略,可以有效避免常见的缓存问题,保证系统的稳定性和高效运行。
47 3
|
3月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
77 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
3月前
|
easyexcel Java UED
SpringBoot中大量数据导出方案:使用EasyExcel并行导出多个excel文件并压缩zip后下载
在SpringBoot环境中,为了优化大量数据的Excel导出体验,可采用异步方式处理。具体做法是将数据拆分后利用`CompletableFuture`与`ThreadPoolTaskExecutor`并行导出,并使用EasyExcel生成多个Excel文件,最终将其压缩成ZIP文件供下载。此方案提升了导出效率,改善了用户体验。代码示例展示了如何实现这一过程,包括多线程处理、模板导出及资源清理等关键步骤。
|
3月前
|
存储 NoSQL Java
Spring Boot项目中使用Redis实现接口幂等性的方案
通过上述方法,可以有效地在Spring Boot项目中利用Redis实现接口幂等性,既保证了接口操作的安全性,又提高了系统的可靠性。
67 0
|
5月前
|
前端开发 Java Spring
springboot 整合 netty框架, 实现 心跳检测,自动重连
springboot 整合 netty框架, 实现 心跳检测,自动重连
|
5月前
|
移动开发 网络协议 算法
(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战
在前面关于《Netty入门篇》的文章中,咱们已经初步对Netty这个著名的网络框架有了认知,本章的目的则是承接上文,再对Netty中的一些进阶知识进行阐述,毕竟前面的内容中,仅阐述了一些Netty的核心组件,想要真正掌握Netty框架,对于它我们应该具备更为全面的认知。
259 2
|
6月前
|
存储 运维 监控
在Spring Boot中集成分布式日志收集方案
在Spring Boot中集成分布式日志收集方案
|
7月前
|
缓存 NoSQL Java
Spring Boot中的分布式缓存方案
Spring Boot中的分布式缓存方案