使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 上

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Redis 版,经济版 1GB 1个月
简介: 使用 Netty+SpringBoot 打造的 TCP 长连接通讯方案 上


项目背景

最近公司某物联网项目需要使用socket长连接进行消息通讯,捣鼓了一版代码上线,结果BUG不断,本猿寝食难安,于是求助度娘,数日未眠项目终于平稳运行了,本着开源共享的精神,本猿把项目代码提炼成了一个demo项目,尽量摒弃了其中丑陋的业务部分,希望与同学们共同学习进步。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

正文

一、项目架构

本项目使用了netty、redis以及springboot2.2.0

二、项目模块

本项目目录结构如下图:

netty-tcp-core是公共模块,主要是工具类。netty-tcp-server是netty服务端,服务端仅作测试使用,实际项目中我们只使用了客户端。netty-tcp-client是客户端,也是本文的重点。

三、业务流程

我们实际项目中使用RocketMQ作为消息队列,本项目由于是demo项目于是改为了BlockingQueue。数据流为:

生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端。

当消费者接收到某设备发送的消息后,将判断缓存中是否存在该设备与服务端的连接,如果存在并且通道活跃则使用该通道发送消息,如果不存在则创建通道并在通道激活后立即发送消息,当客户端收到来自服务端的消息时进行响应的业务处理。

四、代码详解

1.消息队列

由于本demo项目移除了消息中间件,于是需要自己创建一个本地队列模拟真实使用场景

package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
 * 本项目为演示使用本地队列 实际生产中应该使用消息中间件代替(rocketmq或rabbitmq)
 *
 * @author ReWind00
 * @date 2023/2/15 11:20
 */
public class QueueHolder {
    private static final ArrayBlockingQueue<NettyMsgModel> queue = new ArrayBlockingQueue<>(100);
    public static ArrayBlockingQueue<NettyMsgModel> get() {
        return queue;
    }
}

使用一个类保存队列的静态实例以便在任何类中都可以快速引用。接下来我们需要启动一个线程去监听队列中的消息,一但消息投递到队列中,我们就取出消息然后异步多线程处理该消息。

public class LoopThread implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
            executor.execute(() -> {
                while (true) {
                    //取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
                    try {
                        NettyMsgModel nettyMsgModel = QueueHolder.get().take();
                        messageProcessor.process(nettyMsgModel);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            });
        }
    }
}

使用take方法会使该线程一直阻塞直到队列收到消息后进入下一次循环。

2.执行类

process方法来自于MessageProcessor类,该类为单例,但是会有多线程同时执行。

public void process(NettyMsgModel nettyMsgModel) {
    String imei = nettyMsgModel.getImei();
    try {
        synchronized (this) { //为避免收到同一台设备多条消息后重复创建客户端,必须加锁
            if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) { //上一条消息处理中
                log.info("imei={}消息处理中,重新入列", imei);
                //放回队列重新等待消费 延迟x秒(实际项目中应该使用rocketmq或者rabbitmq实现延迟消费)
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        QueueHolder.get().offer(nettyMsgModel);
                    }
                }, 2000);
                log.info("imei={}消息处理中,重新入列完成", imei);
                return;
            } else {
                //如果没有在连接中的直接加锁
                redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
            }
        }
        //缓存中存在则发送消息
        if (NettyClientHolder.get().containsKey(imei)) {
            NettyClient nettyClient = NettyClientHolder.get().get(imei);
            if (null != nettyClient.getChannelFuture() && nettyClient.getChannelFuture().channel().isActive()) { //通道活跃直接发送消息
                if (!nettyClient.getChannelFuture().channel().isWritable()) {
                    log.warn("警告,通道不可写,imei={},channelId={}", nettyClient.getImei(),
                            nettyClient.getChannelFuture().channel().id());
                }
                nettyClient.send(nettyMsgModel.getMsg());
            } else {
                log.info("client imei={},通道不活跃,主动关闭", nettyClient.getImei());
                nettyClient.close();
                //重新创建客户端发送
                this.createClientAndSend(nettyMsgModel);
            }
        } else {  //缓存中不存在则创建新的客户端
            this.createClientAndSend(nettyMsgModel);
        }
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    } finally {
        //执行完后解锁
        redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
    }
}

其中imei是我们设备的唯一标识,我们可以用imei作为缓存的key来确认是否已创建过连接。由于我们消息的并发量可能会很大,所以存在当某设备的连接正在创建的过程中,另一个线程收到该设备消息也开始创建连接的情况,所以我们使用synchronized 代码块以及redis分布式锁来避免此情况的发生。当一条消息获得锁后,在锁释放前,后续消息将会被重新放回消息队列并延迟消费。

获取锁的线程会根据imei判断缓存是否存在连接,如果存在直接发送消息,如果不存在则进入创建客户端的方法。

private void createClientAndSend(NettyMsgModel nettyMsgModel) {
    log.info("创建客户端执行中imei={}", nettyMsgModel.getImei());
    //此处的DemoClientHandler可以根据自己的业务定义
    NettyClient nettyClient = SpringUtils.getBean(NettyClient.class, nettyMsgModel.getImei(), nettyMsgModel.getBizData(),
            this.createDefaultWorkGroup(this.workerThread), DemoClientHandler.class);
    executor.execute(nettyClient); //执行客户端初始化
    try {
        //利用锁等待客户端激活
        synchronized (nettyClient) {
            long c1 = System.currentTimeMillis();
            nettyClient.wait(5000); //最多阻塞5秒 5秒后客户端仍然未激活则自动解锁
            long c2 = System.currentTimeMillis();
            log.info("创建客户端wait耗时={}ms", c2 - c1);
        }
        if (null != nettyClient.getChannelFuture() && nettyClient.getChannelFuture().channel().isActive()) { //连接成功
            //存入缓存
            NettyClientHolder.get().put(nettyMsgModel.getImei(), nettyClient);
            //客户端激活后发送消息
            nettyClient.send(nettyMsgModel.getMsg());
        } else { //连接失败
            log.warn("客户端创建失败,imei={}", nettyMsgModel.getImei());
            nettyClient.close();
            //可以把消息重新入列处理
        }
    } catch (Exception e) {
        log.error("客户端初始化发送消息异常===>{}", e.getMessage(), e);
    }
}

当netty客户端实例创建后使用线程池执行初始化,由于是异步执行,我们此时立刻发送消息很可能客户端还没有完成连接,因此必须加锁等待。进入synchronized 代码块,使用wait方法等待客户端激活后解锁,参数5000为自动解锁的毫秒数,意思是如果客户端出现异常情况迟迟未能连接成功并激活通道、解锁,则最多5000毫秒后该锁自动解开。

这参数在实际使用时可以视情况调整,在并发量很大的情况下,5秒的阻塞可能会导致线程池耗尽,或内存溢出。待客户端创建成功并激活后则立即发送消息。

3.客户端

package org.example.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.example.client.handler.BaseClientHandler;
import org.example.core.util.SpringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author ReWind00
 * @date 2023/2/15 9:59
 */
@Slf4j
@Component
@Scope("prototype")
@Getter
@NoArgsConstructor
public class NettyClient implements Runnable {
    @Value("${netty.server.port}")
    private int port;
    @Value("${netty.server.host}")
    private String host;
    //客户端唯一标识
    private String imei;
    //自定义业务数据
    private Map<String, Object> bizData;
    private EventLoopGroup workGroup;
    private Class<BaseClientHandler> clientHandlerClass;
    private ChannelFuture channelFuture;
    public NettyClient(String imei, Map<String, Object> bizData, EventLoopGroup workGroup, Class<BaseClientHandler> clientHandlerClass) {
        this.imei = imei;
        this.bizData = bizData;
        this.workGroup = workGroup;
        this.clientHandlerClass = clientHandlerClass;
    }
    @Override
    public void run() {
        try {
            this.init();
            log.info("客户端启动imei={}", imei);
        } catch (Exception e) {
            log.error("客户端启动失败:{}", e.getMessage(), e);
        }
    }
    public void close() {
        if (null != this.channelFuture) {
            this.channelFuture.channel().close();
        }
        NettyClientHolder.get().remove(this.imei);
    }
    public void send(String message) {
        try {
            if (!this.channelFuture.channel().isActive()) {
                log.info("通道不活跃imei={}", this.imei);
                return;
            }
            if (!StringUtils.isEmpty(message)) {
                log.info("队列消息发送===>{}", message);
                this.channelFuture.channel().writeAndFlush(message);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    private void init() throws Exception {
        //将本实例传递到handler
        BaseClientHandler clientHandler = SpringUtils.getBean(clientHandlerClass, this);
        Bootstrap b = new Bootstrap();
        //2 通过辅助类去构造server/client
        b.group(workGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, Unpooled.copiedBuffer("\r\n".getBytes())));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));// String解码。
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));// String解码。
//                        // 心跳设置
                        ch.pipeline().addLast(new IdleStateHandler(0, 0, 600, TimeUnit.SECONDS));
                        ch.pipeline().addLast(clientHandler);
                    }
                });
        this.connect(b);
    }
    private void connect(Bootstrap b) throws InterruptedException {
        long c1 = System.currentTimeMillis();
        final int maxRetries = 2; //重连2次
        final AtomicInteger count = new AtomicInteger();
        final AtomicBoolean flag = new AtomicBoolean(false);
        try {
            this.channelFuture = b.connect(host, port).addListener(
                    new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                if (count.incrementAndGet() > maxRetries) {
                                    log.warn("imei={}重连超过{}次", imei, maxRetries);
                                } else {
                                    log.info("imei={}重连第{}次", imei, count);
                                    b.connect(host, port).addListener(this);
                                }
                            } else {
                                log.info("imei={}连接成功,连接IP:{}连接端口:{}", imei, host, port);
                                flag.set(true);
                            }
                        }
                    }).sync(); //同步连接
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        log.info("设备imei={},channelId={}连接耗时={}ms", imei, channelFuture.channel().id(), System.currentTimeMillis() - c1);
        if (flag.get()) {
            channelFuture.channel().closeFuture().sync(); //连接成功后将持续阻塞该线程
        }
    }
}
相关文章
|
1月前
|
小程序 数据可视化 Java
Java+后端Spring boot 开发的全套UWB定位方案,0.1米高精度定位系统源码
UWB定位系统由硬件定位设备、定位引擎和应用软件组成。该定位系统应用软件支持PC端和移动端访问,并提供位置实时显示、历史轨迹回放、人员考勤、电子围栏、行为分析、智能巡检等功能。定位精度高达10cm,同时具备高动态、高容量、低功耗的优点。应用场景包括:隧道、化工、工厂、煤矿、工地、电厂、养老、展馆、整车、机房、机场等。
54 8
|
27天前
|
缓存 NoSQL Java
案例 采用Springboot默认的缓存方案Simple在三层架构中完成一个手机验证码生成校验的程序
案例 采用Springboot默认的缓存方案Simple在三层架构中完成一个手机验证码生成校验的程序
65 5
|
27天前
|
缓存 监控 NoSQL
SpringBoot配置第三方专业缓存技术jetcache方法缓存方案
SpringBoot配置第三方专业缓存技术jetcache方法缓存方案
68 1
|
1月前
|
搜索推荐 前端开发 JavaScript
SpringBoot静态资源访问控制和封装集成方案
该文档描述了对基于SpringBoot的项目框架进行优化和整合的过程。原先采用前后端分离,后端兼做前端,但随着项目增多,升级维护变得复杂。因此,决定整合后台管理页面与后端代码,统一发布。设计上,框架包含后台管理资源,项目则配置具体业务页面,项目可通过覆盖框架资源实现个性化。关键步骤包括:自定义静态资源访问路径、解决图标与字体文件访问问题、设定自定义欢迎页面和页面图标,以及确保项目能正确访问框架静态资源。通过扫描jar包、解压和拷贝资源到项目目录,实现了框架静态资源的动态加载。此外,调整静态资源访问优先级,保证正确加载。最终实现支持jar和war包的项目结构优化。
68 4
|
27天前
|
JavaScript 前端开发 Java
vue使用axios与springboot通讯
vue使用axios与springboot通讯
22 0
|
27天前
|
存储 缓存 NoSQL
SpringBoot配置第三方专业缓存技术jetcache远程缓存方案和本地缓存方案
SpringBoot配置第三方专业缓存技术jetcache远程缓存方案和本地缓存方案
33 0
|
1月前
|
消息中间件 网络协议 Java
springboot+netty+kafka实现设备信息收集(完整demo复制可用)
springboot+netty+kafka实现设备信息收集(完整demo复制可用)
35 0
|
2月前
|
前端开发 JavaScript Java
Springboot+Netty+WebSocket搭建简单的消息通知
这样,你就建立了一个简单的消息通知系统,使用Spring Boot、Netty和WebSocket实现实时消息传递。你可以根据具体需求扩展和改进该系统。
89 1
|
2月前
|
Java Spring
Springboot整合Netty,自定义协议实现
以上就是在Spring Boot中整合Netty并实现自定义协议的基本步骤。你需要根据你的自定义协议的具体需求,来实现你的编码器、解码器和处理器。
40 0
|
10天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue+uniapp的房屋租赁App的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue+uniapp的房屋租赁App的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue+uniapp的房屋租赁App的详细设计和实现(源码+lw+部署文档+讲解等)