SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

简介: SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

引入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

添加配置

server:  port: 5896spring:  kafka:    bootstrap-servers: 192.168.16.23:9092# SASL认证,如果kafka有增加SASL认证需要的相关配置#    properties:#      security:#        protocol: SASL_PLAINTEXT#      sasl:#        mechanism: PLAIN#        jaas:#          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";'# 生产者配置    producer:      retries: 0 # 重试次数      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)      batch-size: 16384 # 一次最多发送数据量      buffer-memory: 33554432 # 生产端缓冲区大小# 序列化类      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 消费者配置    consumer:      group-id: test
# 设置手动提交offset      enable-auto-commit: false# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest      auto-offset-reset: latest
#     异常处理,处理毒丸(Poison Pill)消息      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:        spring.json.trusted.packages: '*'#       序列化、反序列化必须一致,否则会出现无法序列化        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
# 监听    listener:# record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# batch:当每一批poll()的数据被ListenerConsumer处理之后提交# time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交# count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交# count_time:TIME或COUNT中有一个条件满足时提交# manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交# manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种      ack-mode: manual_immediate


简单消息发送

packagecom.example.demokafka.service;
importorg.springframework.kafka.core.KafkaTemplate;
importorg.springframework.stereotype.Service;
importjavax.annotation.Resource;
/*** @Author: CQL* @Date: 2023/8/29 17:41*/@ServicepublicclassMessageServiceKafka {
@ResourceprivateKafkaTemplate<String,String>kafkaTemplate;
publicvoidsendMessage(Stringmsg) {
System.out.println("待发送的信息已纳入处理队列(kafka),msg:"+msg);
//使用send方法发送消息,需要传入topic名称kafkaTemplate.send("test", msg);
    }
}


简单消息接收

packagecom.example.demokafka.listener;
importcom.alibaba.fastjson.JSONObject;
importlombok.extern.slf4j.Slf4j;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.springframework.kafka.annotation.KafkaListener;
importorg.springframework.kafka.support.Acknowledgment;
importorg.springframework.stereotype.Component;
/*** @Author: CQL* @Date: 2023/8/29 17:43*/@Slf4j@ComponentpublicclassMessageListener {
@KafkaListener(topics="test", groupId="test")
publicvoidkafkaListener(ConsumerRecord<?, ?>item, Acknowledgmentack) {
System.out.printf("topic主题=%s, offset偏移量=%d,partition分区=%s, 内容=%s \n", item.topic(), item.offset(), item.partition(), item.value());
log.info("topic主题={}, offset偏移量={},partition分区={}, 内容={}", item.topic(), item.offset(), item.partition(), item.value());
// 设置手动提交需要确认ack.acknowledge();
    }
}
目录
相关文章
|
3月前
|
Java 开发者 微服务
手写模拟Spring Boot自动配置功能
【11月更文挑战第19天】随着微服务架构的兴起,Spring Boot作为一种快速开发框架,因其简化了Spring应用的初始搭建和开发过程,受到了广大开发者的青睐。自动配置作为Spring Boot的核心特性之一,大大减少了手动配置的工作量,提高了开发效率。
79 0
|
1月前
|
JavaScript Java 程序员
SpringBoot自动配置及自定义Starter
Java程序员依赖Spring框架简化开发,但复杂的配置文件增加了负担。SpringBoot以“约定大于配置”理念简化了这一过程,通过引入各种Starter并加载默认配置,几乎做到开箱即用。
116 10
SpringBoot自动配置及自定义Starter
|
1月前
|
XML JavaScript Java
SpringBoot集成Shiro权限+Jwt认证
本文主要描述如何快速基于SpringBoot 2.5.X版本集成Shiro+JWT框架,让大家快速实现无状态登陆和接口权限认证主体框架,具体业务细节未实现,大家按照实际项目补充。
82 11
|
2月前
|
Java Maven Spring
SpringBoot配置跨模块扫描问题解决方案
在分布式项目中,使用Maven进行多模块开发时,某些模块(如xxx-common)没有启动类。如何将这些模块中的类注册为Spring管理的Bean对象?本文通过案例分析,介绍了两种解决方案:常规方案是通过`@SpringBootApplication(scanBasePackages)`指定扫描路径;推荐方案是保持各模块包结构一致(如com.xxx),利用SpringBoot默认扫描规则自动识别其他模块中的组件,简化配置。
SpringBoot配置跨模块扫描问题解决方案
|
2月前
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
153 14
|
3月前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
151 1
SpringBoot入门(7)- 配置热部署devtools工具
|
3月前
|
缓存 IDE Java
SpringBoot入门(7)- 配置热部署devtools工具
SpringBoot入门(7)- 配置热部署devtools工具
66 2
 SpringBoot入门(7)- 配置热部署devtools工具
|
3月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
109 5
|
3月前
|
JSON 安全 算法
Spring Boot 应用如何实现 JWT 认证?
Spring Boot 应用如何实现 JWT 认证?
108 8
|
3月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
86 1

热门文章

最新文章