springboot中kafka配置信息记录

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: springboot中kafka配置信息记录

配置参考



<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.3.8.RELEASE</version>
     </dependency>
复制代码


#-----------------kafka----------------------------
spring.kafka.producer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
#----------------生产者配置------------------------
# 重试次数,如果设置可能改变消息顺序。例如消息1重试2成功
spring.kafka.producer.retries=0
#可选0,1,-1 默认1
#0:只要发送过去就返回ack 吞吐量高,可以容忍一些数据丢失
#1:当有个一个副本(leader的)写成功就算成功,如果leader挂了其他broker没有同步则可能出现消息丢失
#-1:保证isr写入数不小于min.insync.replicas(在server.properties 或者创建主题的时候设置)才算成功
spring.kafka.producer.acks=1
# 批量大小 16K=16*1024
spring.kafka.producer.batch-size=16384
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=100
# 生产端缓冲区大小 32M=1024*1024*32
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#请求最大字节数,防止数据过大 1M=1024*1024
spring.kafka.producer.properties.max.request.size=1048576
# 自定义分区器 默认org.apache.kafka.clients.producer.internals.DefaultPartitioner
#spring.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#----------------初始化消费者配置------------------------
spring.kafka.consumer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
# 默认的消费组ID
#spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:消费当前所有的数据(重置为分区中最小的offset);
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=10000
# 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常
spring.kafka.consumer.properties.request.timeout.ms=30000
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 设置批量消费
#spring.kafka.listener.type=batch
#manual: 表示手动提交,但是测试下来发现是批量提交
#manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual_immediate
# 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交
spring.kafka.listener.poll-timeout=50S
复制代码


demo


@Service
public class KafkaService {
  private Logger logger = LoggerFactory.getLogger(KafkaService.class);
  @Resource
    private KafkaTemplate<Object, Object> template;
  public void sendInfo(String topic,String data) {
    template.send(topic, data);
    logger.info("{}:{}",topic,data);
  }
  public void sendProducerInfo(String topic,String data) {
    ProducerRecord<Object, Object> pr = new ProducerRecord<>(topic, data);
    template.send(pr);
    logger.info("{}:{}",topic,data);
  }
  @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }
  @KafkaListener(id = "webGroup1", topics = "hello")
  public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment ack,
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("单条消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    ack.acknowledge();
  }
  @KafkaListener(id = "webGroup2", topics = "hello")
  public void onMessageButch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
    for(ConsumerRecord<?, ?> record:records) {
        System.out.println("批量消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    }
    ack.acknowledge();
  }
}


相关文章
|
6天前
|
SQL 监控 Java
在IDEA 、springboot中使用切面aop实现日志信息的记录到数据库
这篇文章介绍了如何在IDEA和Spring Boot中使用AOP技术实现日志信息的记录到数据库的详细步骤和代码示例。
在IDEA 、springboot中使用切面aop实现日志信息的记录到数据库
|
4天前
|
前端开发 数据库
SpringBoot+Vue实现商品不能重复加入购物车、购物车中展示商品的信息、删除商品重点提示等操作。如何点击图片实现图片放大
这篇文章介绍了如何在SpringBoot+Vue框架下实现购物车功能,包括防止商品重复加入、展示商品信息、删除商品时的提示,以及点击图片放大的前端实现。
SpringBoot+Vue实现商品不能重复加入购物车、购物车中展示商品的信息、删除商品重点提示等操作。如何点击图片实现图片放大
|
9天前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
37 7
|
4天前
|
JSON JavaScript 前端开发
基于SpringBoot + Vue实现单个文件上传(带上Token和其它表单信息)的前后端完整过程
本文介绍了在SpringBoot + Vue项目中实现单个文件上传的同时携带Token和其它表单信息的前后端完整流程,包括后端SpringBoot的文件上传处理和前端Vue使用FormData进行表单数据和文件的上传。
19 0
基于SpringBoot + Vue实现单个文件上传(带上Token和其它表单信息)的前后端完整过程
|
5天前
|
数据库
elementUi使用dialog的进行信息的添加、删除表格数据时进行信息提示。删除或者添加成功的信息提示(SpringBoot+Vue+MybatisPlus)
这篇文章介绍了如何在基于SpringBoot+Vue+MybatisPlus的项目中使用elementUI的dialog组件进行用户信息的添加和删除操作,包括弹窗表单的设置、信息提交、数据库操作以及删除前的信息提示和确认。
elementUi使用dialog的进行信息的添加、删除表格数据时进行信息提示。删除或者添加成功的信息提示(SpringBoot+Vue+MybatisPlus)
|
5天前
|
XML Java Maven
logback在springBoot项目中的使用 springboot中使用日志进行持久化保存日志信息
这篇文章详细介绍了如何在Spring Boot项目中使用logback进行日志记录,包括Maven依赖配置、logback配置文件的编写,以及实现的日志持久化和控制台输出效果。
logback在springBoot项目中的使用 springboot中使用日志进行持久化保存日志信息
|
7天前
|
消息中间件 Java Kafka
|
14天前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
|
23天前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
23 8
|
5天前
|
前端开发 JavaScript Java
解决springboot+vue+mybatis中,将后台数据分页显示在前台,并且根据页码自动跳转对应页码信息
该博客文章讲述了如何在Spring Boot + Vue + MyBatis的项目中实现后台数据的分页查询,并在前端进行显示和页码跳转,包括后端的分页查询实现、前端与后端的交互以及使用Element UI进行分页展示的方法。