SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

简介: SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

引入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

添加配置

server:  port: 5896spring:  kafka:    bootstrap-servers: 192.168.16.23:9092# SASL认证,如果kafka有增加SASL认证需要的相关配置#    properties:#      security:#        protocol: SASL_PLAINTEXT#      sasl:#        mechanism: PLAIN#        jaas:#          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";'# 生产者配置    producer:      retries: 0 # 重试次数      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)      batch-size: 16384 # 一次最多发送数据量      buffer-memory: 33554432 # 生产端缓冲区大小# 序列化类      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 消费者配置    consumer:      group-id: test
# 设置手动提交offset      enable-auto-commit: false# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest      auto-offset-reset: latest
#     异常处理,处理毒丸(Poison Pill)消息      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:        spring.json.trusted.packages: '*'#       序列化、反序列化必须一致,否则会出现无法序列化        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
# 监听    listener:# record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# batch:当每一批poll()的数据被ListenerConsumer处理之后提交# time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交# count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交# count_time:TIME或COUNT中有一个条件满足时提交# manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交# manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种      ack-mode: manual_immediate


简单消息发送

packagecom.example.demokafka.service;
importorg.springframework.kafka.core.KafkaTemplate;
importorg.springframework.stereotype.Service;
importjavax.annotation.Resource;
/*** @Author: CQL* @Date: 2023/8/29 17:41*/@ServicepublicclassMessageServiceKafka {
@ResourceprivateKafkaTemplate<String,String>kafkaTemplate;
publicvoidsendMessage(Stringmsg) {
System.out.println("待发送的信息已纳入处理队列(kafka),msg:"+msg);
//使用send方法发送消息,需要传入topic名称kafkaTemplate.send("test", msg);
    }
}


简单消息接收

packagecom.example.demokafka.listener;
importcom.alibaba.fastjson.JSONObject;
importlombok.extern.slf4j.Slf4j;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.springframework.kafka.annotation.KafkaListener;
importorg.springframework.kafka.support.Acknowledgment;
importorg.springframework.stereotype.Component;
/*** @Author: CQL* @Date: 2023/8/29 17:43*/@Slf4j@ComponentpublicclassMessageListener {
@KafkaListener(topics="test", groupId="test")
publicvoidkafkaListener(ConsumerRecord<?, ?>item, Acknowledgmentack) {
System.out.printf("topic主题=%s, offset偏移量=%d,partition分区=%s, 内容=%s \n", item.topic(), item.offset(), item.partition(), item.value());
log.info("topic主题={}, offset偏移量={},partition分区={}, 内容={}", item.topic(), item.offset(), item.partition(), item.value());
// 设置手动提交需要确认ack.acknowledge();
    }
}
目录
相关文章
|
1月前
|
Java 开发者 微服务
手写模拟Spring Boot自动配置功能
【11月更文挑战第19天】随着微服务架构的兴起,Spring Boot作为一种快速开发框架,因其简化了Spring应用的初始搭建和开发过程,受到了广大开发者的青睐。自动配置作为Spring Boot的核心特性之一,大大减少了手动配置的工作量,提高了开发效率。
50 0
|
2月前
|
Java API 数据库
构建RESTful API已经成为现代Web开发的标准做法之一。Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐。
【10月更文挑战第11天】本文介绍如何使用Spring Boot构建在线图书管理系统的RESTful API。通过创建Spring Boot项目,定义`Book`实体类、`BookRepository`接口和`BookService`服务类,最后实现`BookController`控制器来处理HTTP请求,展示了从基础环境搭建到API测试的完整过程。
58 4
|
2月前
|
Java API 数据库
Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐
本文通过在线图书管理系统案例,详细介绍如何使用Spring Boot构建RESTful API。从项目基础环境搭建、实体类与数据访问层定义,到业务逻辑实现和控制器编写,逐步展示了Spring Boot的简洁配置和强大功能。最后,通过Postman测试API,并介绍了如何添加安全性和异常处理,确保API的稳定性和安全性。
44 0
|
4天前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
42 14
|
27天前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
42 1
SpringBoot入门(7)- 配置热部署devtools工具
|
1月前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
43 2
 SpringBoot入门(7)- 配置热部署devtools工具
|
21天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
53 5
|
26天前
|
JSON 安全 算法
Spring Boot 应用如何实现 JWT 认证?
Spring Boot 应用如何实现 JWT 认证?
59 8
|
24天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
36 1
|
29天前
|
存储 前端开发 JavaScript
springboot中路径默认配置与重定向/转发所存在的域对象
Spring Boot 提供了简便的路径默认配置和强大的重定向/转发机制,通过合理使用这些功能,可以实现灵活的请求处理和数据传递。理解并掌握不同域对象的生命周期和使用场景,是构建高效、健壮 Web 应用的关键。通过上述详细介绍和示例,相信读者能够更好地应用这些知识,优化自己的 Spring Boot 应用。
30 3
下一篇
DataWorks