前言
想象一下,你正在开发一款智能设备监控系统,需要实时收集设备的各种信息,但传统的HTTP请求方式无法满足实时性和效率的要求。在这个挑战中,Spring Boot、Netty和Kafka就像是你的得力助手,它们共同构建了一个高效稳定的信息收集系统。本文将带你进入这个充满创新和挑战的领域,探索如何利用这三种技术,实现设备信息的实时采集与处理。
技术点
- springboot
- netty
- tcp
- kafka
场景再现
一般情况设备会绑定一个ip和一个port来发送给tcp服务端消息,至于消息的格式,有的是16进制的,有的直接就是解析后的json,如图所示
项目搭建
maven依赖引入
<spring.boot.version>2.7.8</spring.boot.version> <kafka.version>2.8.2</kafka.version> <netty.version>4.1.73.Final</netty.version> <lombok.version>1.18.24</lombok.version> <!-- 版本号自己添加 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency>
application.yml
文件
# 生产者配置-one server: port: 18011 spring: application: name: produce-one kafka: bootstrap-servers: ubtone.local:9092,ubttwo.local:9092,ubtthree.local:9092 producer: retries: 3 # 重试次数 batch-size: 16384 # 3k buffer-memory: 33554432 # 32M key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: 1 # 1代表leader节点写入成功即认为写入成功
kafka生产者
kafkaProduceConfig配置
package fun.acowbo.config; import lombok.Data; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; /** * @author xiaobo */ @Configuration @ConfigurationProperties(prefix = "spring.kafka.producer") @Data public class KafkaProducerConfig<K,V> { // Kafka 服务器地址 @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; // 序列化器配置 private String keySerializer; private String valueSerializer; // 重试次数配置 private String retries; // 批处理大小配置 private String batchSize; // 缓冲区内存大小配置 private String bufferMemory; // 确认配置 private String acks; // 创建并配置生产者工厂 @Bean public ProducerFactory<K,V> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); configProps.put(ProducerConfig.RETRIES_CONFIG, retries); configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); configProps.put(ProducerConfig.ACKS_CONFIG, acks); return new DefaultKafkaProducerFactory<>(configProps); } // 创建并配置 KafkaTemplate @Bean public KafkaTemplate<K,V> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
这样配置的好处
编写这样一个配置类有几个好处:
- 集中管理配置:将 Kafka 生产者的配置信息集中在一个地方管理,使得配置更加清晰和易于维护。
- 解耦合:将配置信息从业务逻辑中分离出来,使得应用程序的其他部分不需要关心具体的 Kafka 配置细节,提高了代码的模块化程度和可重用性。
- 灵活性:通过使用 Spring 的
@ConfigurationProperties
注解,可以方便地从外部配置文件中加载配置信息,使得配置更加灵活,可以在不同的环境中使用不同的配置。 - 可测试性:将配置信息集中在一个类中,便于进行单元测试和集成测试,提高了代码的可测试性。
为什么使用 <K,V>
而不是确定的类型呢?
使用 <K,V>
作为泛型类型参数的好处在于增强了代码的通用性和灵活性。这样设计的主要考虑是为了让这个配置类适用于不同类型的键和值。因为在实际的业务场景中,Kafka 生产者可能需要发送不同类型的消息,例如字符串、整数、自定义对象等。通过使用泛型类型参数 <K,V>
,使得这个配置类可以适用于不同类型的消息,同时保持了代码的简洁性和灵活性。
另外,通过使用泛型类型参数,还可以在编译时进行类型检查,避免了在运行时出现类型错误的可能性,提高了代码的安全性和稳定性。
netty创建TCP服务
kafkaProduceService实现
package fun.acowbo.service; import fun.acowbo.utils.BoCommonUtil; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Random; /** * @author todoitbo * @date 2024/3/14 */ @Service @Slf4j public class ProduceOneService { @Resource private KafkaTemplate<String,String> kafkaTemplate; public static final String ERROR_FILE_PATH = "/Users/xiaobo/Downloads/error.log"; public static final String SUCCESS_FILE_PATH = "/Users/xiaobo/Downloads/success.log"; // 发送消息到 Kafka public void sendMessage(String topic, String message) { // 获取当前时间 LocalDateTime dateTime = LocalDateTime.now(); // 定义日期时间格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"); // 格式化日期时间 String formattedDateTime = dateTime.format(formatter); // 生成5位随机数作为消息的键 Random random = new Random(); String key = formattedDateTime + String.format("%05d", random.nextInt(100000)); // 创建 ProducerRecord 对象 ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message); // 发送消息并添加回调 ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(record); // 添加回调函数 listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { try { // 将成功发送的消息写入成功日志文件 BoCommonUtil.writeFile(result.getRecordMetadata().toString(), SUCCESS_FILE_PATH); } catch (Exception e) { log.error("write success file error:{}", e.getMessage()); } } @Override public void onFailure(Throwable ex) { try { // 将发送失败的消息写入错误日志文件 BoCommonUtil.writeFile(ex.getMessage(), ERROR_FILE_PATH); } catch (Exception e) { log.error("write success file error:{}", e.getMessage()); } } }); } }
解释:
为什么使用 ListenableFutureCallback
?
- 使用
ListenableFutureCallback
的主要好处是异步处理发送消息的结果。当消息发送成功或失败时,可以通过回调函数得知结果,并执行相应的逻辑,例如写入日志文件等。这样可以提高系统的可靠性和健壮性,同时不会阻塞当前线程。
在生产环境中的好处:
- 在生产环境中,由于消息发送可能会受到网络延迟、Kafka 集群负载等因素的影响,因此异步处理发送结果非常重要。通过使用
ListenableFutureCallback
,可以确保消息发送的结果能够被及时处理,并根据实际情况采取相应的措施,例如重试、记录错误日志等,从而保证系统的稳定性和可靠性。