Netty(一) SpringBoot 整合长连接心跳机制(中)

简介: Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。

这里有点需要注意


当有多个客户端连上来时,服务端需要区分开,不然响应消息就会发生混乱。


所以每当有个连接上来的时候,我们都将当前的 Channel 与连上的客户端 ID 进行关联(因此每个连上的客户端 ID 都必须唯一)。


这里采用了一个 Map 来保存这个关系,并且在断开连接时自动取消这个关联。


public class NettySocketHolder {
    private static final Map<Long, NioSocketChannel> MAP = new ConcurrentHashMap<>(16);
    public static void put(Long id, NioSocketChannel socketChannel) {
        MAP.put(id, socketChannel);
    }
    public static NioSocketChannel get(Long id) {
        return MAP.get(id);
    }
    public static Map<Long, NioSocketChannel> getMAP() {
        return MAP;
    }
    public static void remove(NioSocketChannel nioSocketChannel) {
        MAP.entrySet().stream().filter(entry -> entry.getValue() == nioSocketChannel).forEach(entry -> MAP.remove(entry.getKey()));
    }
}


启动引导程序:


Component
public class HeartBeatServer {
    private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatServer.class);
    private EventLoopGroup boss = new NioEventLoopGroup();
    private EventLoopGroup work = new NioEventLoopGroup();
    @Value("${netty.server.port}")
    private int nettyPort;
    /**
     * 启动 Netty
     *
     * @return
     * @throws InterruptedException
     */
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(boss, work)
                .channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(nettyPort))
                //保持长连接
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new HeartbeatInitializer());
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            LOGGER.info("启动 Netty 成功");
        }
    }
    /**
     * 销毁
     */
    @PreDestroy
    public void destroy() {
        boss.shutdownGracefully().syncUninterruptibly();
        work.shutdownGracefully().syncUninterruptibly();
        LOGGER.info("关闭 Netty 成功");
    }
}    
public class HeartbeatInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline()
                //五秒没有收到消息 将IdleStateHandler 添加到 ChannelPipeline 中
                .addLast(new IdleStateHandler(5, 0, 0))
                .addLast(new HeartbeatDecoder())
                .addLast(new HeartBeatSimpleHandle());
    }
}


也是同样将IdleStateHandler 添加到 ChannelPipeline 中,也会有一个定时任务,每5秒校验一次是否有收到消息,否则就主动发送一次请求。



因为测试是有两个客户端连上所以有两个日志。


自定义协议


上文其实都看到了:服务端与客户端采用的是自定义的 POJO 进行通讯的。


所以需要在客户端进行编码,服务端进行解码,也都只需要各自实现一个编解码器即可。


CustomProtocol:


public class CustomProtocol implements Serializable{
    private static final long serialVersionUID = 4671171056588401542L;
    private long id ;
    private String content ;
    //省略 getter/setter
}


客户端的编码器:


public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
        out.writeLong(msg.getId()) ;
        out.writeBytes(msg.getContent().getBytes()) ;
    }
}


也就是说消息的前八个字节为 header,剩余的全是 content。


服务端的解码器:


public class HeartbeatDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        long id = in.readLong() ;
        byte[] bytes = new byte[in.readableBytes()] ;
        in.readBytes(bytes) ;
        String content = new String(bytes) ;
        CustomProtocol customProtocol = new CustomProtocol() ;
        customProtocol.setId(id);
        customProtocol.setContent(content) ;
        out.add(customProtocol) ;
    }
}


只需要按照刚才的规则进行解码即可。


实现原理


其实联想到 IdleStateHandler 的功能,自然也能想到它实现的原理:


应该会存在一个定时任务的线程去处理这些消息。


来看看它的源码:


首先是构造函数:


public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {
        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }


其实就是初始化了几个数据:


  • readerIdleTimeSeconds:一段时间内没有数据读取


  • writerIdleTimeSeconds:一段时间内没有数据发送


  • allIdleTimeSeconds:以上两种满足其中一个即可


因为 IdleStateHandler 也是一种 ChannelHandler,所以会在 channelActive 中初始化任务:


@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }
    private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }
        state = 1;
        initOutputChanged(ctx);
        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }    


也就是会按照我们给定的时间初始化出定时任务。


相关文章
|
5月前
|
网络协议 Java
在SpringBoot项目中使用Netty实现远程调用
本文介绍了使用Netty解决网络连接性能问题的方法,重点讲解了Netty的NIO特性及其在SpringBoot中的应用。Netty作为高效的NIO框架,支持非阻塞IO,能通过单线程管理多个客户端连接,简化TCP/UDP套接字服务器开发。文章详细展示了Netty在SpringBoot中实现远程调用的过程,包括服务端与客户端代码实现、依赖配置及测试验证。通过示例代码,如`NettyServer`、`NettyClientUtil`等,清晰说明了Netty的工作原理和实际应用,解决了半包等问题,并提供了完整的测试结果。
680 3
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
3106 1
|
11月前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
1867 15
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
330 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
架构师 Java 开发者
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
在40岁老架构师尼恩的读者交流群中,近期多位读者成功获得了知名互联网企业的面试机会,如得物、阿里、滴滴等。然而,面对“Spring Boot自动装配机制”等核心面试题,部分读者因准备不足而未能顺利通过。为此,尼恩团队将系统化梳理和总结这一主题,帮助大家全面提升技术水平,让面试官“爱到不能自已”。
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
|
Java Spring 监控
Spring Boot Actuator:守护你的应用心跳,让监控变得触手可及!
【8月更文挑战第31天】Spring Boot Actuator 是 Spring Boot 框架的核心模块之一,提供了生产就绪的特性,用于监控和管理 Spring Boot 应用程序。通过 Actuator,开发者可以轻松访问应用内部状态、执行健康检查、收集度量指标等。启用 Actuator 需在 `pom.xml` 中添加 `spring-boot-starter-actuator` 依赖,并通过配置文件调整端点暴露和安全性。Actuator 还支持与外部监控工具(如 Prometheus)集成,实现全面的应用性能监控。正确配置 Actuator 可显著提升应用的稳定性和安全性。
700 1
|
安全 Java UED
掌握SpringBoot单点登录精髓,单点登录是一种身份认证机制
【8月更文挑战第31天】单点登录(Single Sign-On,简称SSO)是一种身份认证机制,它允许用户只需在多个相互信任的应用系统中登录一次,即可访问所有系统,而无需重复输入用户名和密码。在微服务架构日益盛行的今天,SSO成为提升用户体验和系统安全性的重要手段。本文将详细介绍如何在SpringBoot中实现SSO,并附上示例代码。
334 0
|
消息中间件 Java Kafka
深入SpringBoot的心脏地带:掌握其核心机制的全方位指南
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型,具有高可靠性及稳定性;RocketMQ 则是由阿里巴巴开源的高性能分布式消息队列,支持事务消息等多种特性;而 Kafka 是 LinkedIn 开源的分布式流处理平台,以其高吞吐量和良好的可扩展性著称。文中还提供了使用这三种消息队列产品的示例代码。总之,这三款产品各有优势,适用于不同场景。
100 0
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13832 1
|
6月前
|
算法 Java 容器
Netty源码—4.客户端接入流程
本文主要介绍了关于Netty客户端连接接入问题整理、Reactor线程模型和服务端启动流程、Netty新连接接入的整体处理逻辑、新连接接入之检测新连接、新连接接入之创建NioSocketChannel、新连接接入之绑定NioEventLoop线程、新连接接入之注册Selector和注册读事件、注册Reactor线程总结、新连接接入总结