SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

简介: SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

引入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

添加配置

server:  port: 5896spring:  kafka:    bootstrap-servers: 192.168.16.23:9092# SASL认证,如果kafka有增加SASL认证需要的相关配置#    properties:#      security:#        protocol: SASL_PLAINTEXT#      sasl:#        mechanism: PLAIN#        jaas:#          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";'# 生产者配置    producer:      retries: 0 # 重试次数      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)      batch-size: 16384 # 一次最多发送数据量      buffer-memory: 33554432 # 生产端缓冲区大小# 序列化类      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 消费者配置    consumer:      group-id: test
# 设置手动提交offset      enable-auto-commit: false# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest      auto-offset-reset: latest
#     异常处理,处理毒丸(Poison Pill)消息      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:        spring.json.trusted.packages: '*'#       序列化、反序列化必须一致,否则会出现无法序列化        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
# 监听    listener:# record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# batch:当每一批poll()的数据被ListenerConsumer处理之后提交# time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交# count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交# count_time:TIME或COUNT中有一个条件满足时提交# manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交# manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种      ack-mode: manual_immediate


简单消息发送

packagecom.example.demokafka.service;
importorg.springframework.kafka.core.KafkaTemplate;
importorg.springframework.stereotype.Service;
importjavax.annotation.Resource;
/*** @Author: CQL* @Date: 2023/8/29 17:41*/@ServicepublicclassMessageServiceKafka {
@ResourceprivateKafkaTemplate<String,String>kafkaTemplate;
publicvoidsendMessage(Stringmsg) {
System.out.println("待发送的信息已纳入处理队列(kafka),msg:"+msg);
//使用send方法发送消息,需要传入topic名称kafkaTemplate.send("test", msg);
    }
}


简单消息接收

packagecom.example.demokafka.listener;
importcom.alibaba.fastjson.JSONObject;
importlombok.extern.slf4j.Slf4j;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.springframework.kafka.annotation.KafkaListener;
importorg.springframework.kafka.support.Acknowledgment;
importorg.springframework.stereotype.Component;
/*** @Author: CQL* @Date: 2023/8/29 17:43*/@Slf4j@ComponentpublicclassMessageListener {
@KafkaListener(topics="test", groupId="test")
publicvoidkafkaListener(ConsumerRecord<?, ?>item, Acknowledgmentack) {
System.out.printf("topic主题=%s, offset偏移量=%d,partition分区=%s, 内容=%s \n", item.topic(), item.offset(), item.partition(), item.value());
log.info("topic主题={}, offset偏移量={},partition分区={}, 内容={}", item.topic(), item.offset(), item.partition(), item.value());
// 设置手动提交需要确认ack.acknowledge();
    }
}
目录
相关文章
|
1天前
|
缓存 Java 数据库
springboot数据库及缓存常用依赖及配置
springboot数据库及缓存常用依赖及配置
29 9
|
2天前
|
存储 缓存 NoSQL
SpringBoot配置第三方专业缓存框架j2cache
SpringBoot配置第三方专业缓存框架j2cache
13 5
|
2天前
|
存储 缓存 NoSQL
SpringBoot配置第三方专业缓存技术Redis
SpringBoot配置第三方专业缓存技术Redis
9 4
|
2天前
|
Java 应用服务中间件 Maven
浅谈后端整合Springboot框架后操作基础配置
浅谈后端整合Springboot框架后操作基础配置
9 3
|
6天前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
33 7
|
2天前
|
缓存 监控 NoSQL
SpringBoot配置第三方专业缓存技术jetcache方法缓存方案
SpringBoot配置第三方专业缓存技术jetcache方法缓存方案
7 1
|
2天前
|
存储 缓存 监控
SpringBoot配置第三方专业缓存技术Ehcache
SpringBoot配置第三方专业缓存技术Ehcache
8 1
|
6天前
|
druid Java 关系型数据库
在Spring Boot中集成Druid实现多数据源有两种常用的方式:使用Spring Boot的自动配置和手动配置。
在Spring Boot中集成Druid实现多数据源有两种常用的方式:使用Spring Boot的自动配置和手动配置。
58 5
|
7天前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
1天前
|
关系型数据库 数据库 数据安全/隐私保护
springboot+dynamic-datasource多数据源配置动态切换
springboot+dynamic-datasource多数据源配置动态切换
8 0

热门文章

最新文章