springboot+netty+kafka实现设备信息收集(完整demo复制可用)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: springboot+netty+kafka实现设备信息收集(完整demo复制可用)

前言

想象一下,你正在开发一款智能设备监控系统,需要实时收集设备的各种信息,但传统的HTTP请求方式无法满足实时性和效率的要求。在这个挑战中,Spring Boot、Netty和Kafka就像是你的得力助手,它们共同构建了一个高效稳定的信息收集系统。本文将带你进入这个充满创新和挑战的领域,探索如何利用这三种技术,实现设备信息的实时采集与处理。

技术点

  • springboot
  • netty
  • tcp
  • kafka

场景再现

一般情况设备会绑定一个ip和一个port来发送给tcp服务端消息,至于消息的格式,有的是16进制的,有的直接就是解析后的json,如图所示

项目搭建

maven依赖引入

<spring.boot.version>2.7.8</spring.boot.version>
<kafka.version>2.8.2</kafka.version>
<netty.version>4.1.73.Final</netty.version>
<lombok.version>1.18.24</lombok.version>
<!-- 版本号自己添加 -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
</dependency>

application.yml文件

# 生产者配置-one
server:
  port: 18011
spring:
  application:
    name: produce-one
  kafka:
    bootstrap-servers: ubtone.local:9092,ubttwo.local:9092,ubtthree.local:9092
    producer:
      retries: 3 # 重试次数
      batch-size: 16384 # 3k
      buffer-memory: 33554432 # 32M
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1 # 1代表leader节点写入成功即认为写入成功

kafka生产者

kafkaProduceConfig配置
package fun.acowbo.config;
import lombok.Data;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
 * @author xiaobo
 */
@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer")
@Data
public class KafkaProducerConfig<K,V> {
    // Kafka 服务器地址
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    // 序列化器配置
    private String keySerializer;
    private String valueSerializer;
    // 重试次数配置
    private String retries;
    // 批处理大小配置
    private String batchSize;
    // 缓冲区内存大小配置
    private String bufferMemory;
    // 确认配置
    private String acks;
    // 创建并配置生产者工厂
    @Bean
    public ProducerFactory<K,V> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        configProps.put(ProducerConfig.ACKS_CONFIG, acks);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    // 创建并配置 KafkaTemplate
    @Bean
    public KafkaTemplate<K,V> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
这样配置的好处

编写这样一个配置类有几个好处:

  1. 集中管理配置:将 Kafka 生产者的配置信息集中在一个地方管理,使得配置更加清晰和易于维护。
  2. 解耦合:将配置信息从业务逻辑中分离出来,使得应用程序的其他部分不需要关心具体的 Kafka 配置细节,提高了代码的模块化程度和可重用性。
  3. 灵活性:通过使用 Spring 的 @ConfigurationProperties 注解,可以方便地从外部配置文件中加载配置信息,使得配置更加灵活,可以在不同的环境中使用不同的配置。
  4. 可测试性:将配置信息集中在一个类中,便于进行单元测试和集成测试,提高了代码的可测试性。

为什么使用 <K,V> 而不是确定的类型呢?

使用 <K,V> 作为泛型类型参数的好处在于增强了代码的通用性和灵活性。这样设计的主要考虑是为了让这个配置类适用于不同类型的键和值。因为在实际的业务场景中,Kafka 生产者可能需要发送不同类型的消息,例如字符串、整数、自定义对象等。通过使用泛型类型参数 <K,V>,使得这个配置类可以适用于不同类型的消息,同时保持了代码的简洁性和灵活性。

另外,通过使用泛型类型参数,还可以在编译时进行类型检查,避免了在运行时出现类型错误的可能性,提高了代码的安全性和稳定性。

netty创建TCP服务

Netty打造TCP服务端(解决粘包问题)

kafkaProduceService实现
package fun.acowbo.service;
import fun.acowbo.utils.BoCommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
/**
 * @author todoitbo
 * @date 2024/3/14
 */
@Service
@Slf4j
public class ProduceOneService {
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;
    public static final String ERROR_FILE_PATH = "/Users/xiaobo/Downloads/error.log";
    public static final String SUCCESS_FILE_PATH = "/Users/xiaobo/Downloads/success.log";
    // 发送消息到 Kafka
    public void sendMessage(String topic, String message) {
        // 获取当前时间
        LocalDateTime dateTime = LocalDateTime.now();
        // 定义日期时间格式
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
        // 格式化日期时间
        String formattedDateTime = dateTime.format(formatter);
        // 生成5位随机数作为消息的键
        Random random = new Random();
        String key = formattedDateTime + String.format("%05d", random.nextInt(100000));
        // 创建 ProducerRecord 对象
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
        // 发送消息并添加回调
        ListenableFuture<SendResult<String, String>> listenableFuture =  kafkaTemplate.send(record);
        // 添加回调函数
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                try {
                    // 将成功发送的消息写入成功日志文件
                    BoCommonUtil.writeFile(result.getRecordMetadata().toString(), SUCCESS_FILE_PATH);
                } catch (Exception e) {
                    log.error("write success file error:{}", e.getMessage());
                }
            }
            @Override
            public void onFailure(Throwable ex) {
                try {
                    // 将发送失败的消息写入错误日志文件
                    BoCommonUtil.writeFile(ex.getMessage(), ERROR_FILE_PATH);
                } catch (Exception e) {
                    log.error("write success file error:{}", e.getMessage());
                }
            }
        });
    }
}

解释

为什么使用 ListenableFutureCallback

  • 使用 ListenableFutureCallback 的主要好处是异步处理发送消息的结果。当消息发送成功或失败时,可以通过回调函数得知结果,并执行相应的逻辑,例如写入日志文件等。这样可以提高系统的可靠性和健壮性,同时不会阻塞当前线程。

在生产环境中的好处:

  • 在生产环境中,由于消息发送可能会受到网络延迟、Kafka 集群负载等因素的影响,因此异步处理发送结果非常重要。通过使用 ListenableFutureCallback,可以确保消息发送的结果能够被及时处理,并根据实际情况采取相应的措施,例如重试、记录错误日志等,从而保证系统的稳定性和可靠性。
相关文章
|
2月前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
522 1
|
24天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
59 5
|
26天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
39 1
|
1月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
2月前
|
NoSQL Java Redis
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
这篇文章介绍了Redis的基本命令,并展示了如何使用Netty框架直接与Redis服务器进行通信,包括设置Netty客户端、编写处理程序以及初始化Channel的完整示例代码。
71 1
redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
76 2
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
180 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
120 0
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
103 1