springboot中kafka配置信息记录

简介: springboot中kafka配置信息记录

配置参考



<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.3.8.RELEASE</version>
     </dependency>
复制代码


#-----------------kafka----------------------------
spring.kafka.producer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
#----------------生产者配置------------------------
# 重试次数,如果设置可能改变消息顺序。例如消息1重试2成功
spring.kafka.producer.retries=0
#可选0,1,-1 默认1
#0:只要发送过去就返回ack 吞吐量高,可以容忍一些数据丢失
#1:当有个一个副本(leader的)写成功就算成功,如果leader挂了其他broker没有同步则可能出现消息丢失
#-1:保证isr写入数不小于min.insync.replicas(在server.properties 或者创建主题的时候设置)才算成功
spring.kafka.producer.acks=1
# 批量大小 16K=16*1024
spring.kafka.producer.batch-size=16384
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=100
# 生产端缓冲区大小 32M=1024*1024*32
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#请求最大字节数,防止数据过大 1M=1024*1024
spring.kafka.producer.properties.max.request.size=1048576
# 自定义分区器 默认org.apache.kafka.clients.producer.internals.DefaultPartitioner
#spring.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#----------------初始化消费者配置------------------------
spring.kafka.consumer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
# 默认的消费组ID
#spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:消费当前所有的数据(重置为分区中最小的offset);
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=10000
# 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常
spring.kafka.consumer.properties.request.timeout.ms=30000
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 设置批量消费
#spring.kafka.listener.type=batch
#manual: 表示手动提交,但是测试下来发现是批量提交
#manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual_immediate
# 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交
spring.kafka.listener.poll-timeout=50S
复制代码


demo


@Service
public class KafkaService {
  private Logger logger = LoggerFactory.getLogger(KafkaService.class);
  @Resource
    private KafkaTemplate<Object, Object> template;
  public void sendInfo(String topic,String data) {
    template.send(topic, data);
    logger.info("{}:{}",topic,data);
  }
  public void sendProducerInfo(String topic,String data) {
    ProducerRecord<Object, Object> pr = new ProducerRecord<>(topic, data);
    template.send(pr);
    logger.info("{}:{}",topic,data);
  }
  @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }
  @KafkaListener(id = "webGroup1", topics = "hello")
  public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment ack,
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("单条消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    ack.acknowledge();
  }
  @KafkaListener(id = "webGroup2", topics = "hello")
  public void onMessageButch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
    for(ConsumerRecord<?, ?> record:records) {
        System.out.println("批量消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    }
    ack.acknowledge();
  }
}


相关文章
|
2月前
|
开发框架 前端开发 网络协议
Spring Boot结合Netty和WebSocket,实现后台向前端实时推送信息
【10月更文挑战第18天】 在现代互联网应用中,实时通信变得越来越重要。WebSocket作为一种在单个TCP连接上进行全双工通信的协议,为客户端和服务器之间的实时数据传输提供了一种高效的解决方案。Netty作为一个高性能、事件驱动的NIO框架,它基于Java NIO实现了异步和事件驱动的网络应用程序。Spring Boot是一个基于Spring框架的微服务开发框架,它提供了许多开箱即用的功能和简化配置的机制。本文将详细介绍如何使用Spring Boot集成Netty和WebSocket,实现后台向前端推送信息的功能。
588 1
|
27天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
65 5
|
1月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
44 1
|
1月前
|
Java Spring 容器
SpringBoot读取配置文件的6种方式,包括:通过Environment、@PropertySource、@ConfigurationProperties、@Value读取配置信息
SpringBoot读取配置文件的6种方式,包括:通过Environment、@PropertySource、@ConfigurationProperties、@Value读取配置信息
127 3
|
2月前
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
96 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
1月前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
2月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
39 2
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
79 2
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
200 0
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
187 3