springboot中kafka配置信息记录

简介: springboot中kafka配置信息记录

配置参考



<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.3.8.RELEASE</version>
     </dependency>
复制代码


#-----------------kafka----------------------------
spring.kafka.producer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
#----------------生产者配置------------------------
# 重试次数,如果设置可能改变消息顺序。例如消息1重试2成功
spring.kafka.producer.retries=0
#可选0,1,-1 默认1
#0:只要发送过去就返回ack 吞吐量高,可以容忍一些数据丢失
#1:当有个一个副本(leader的)写成功就算成功,如果leader挂了其他broker没有同步则可能出现消息丢失
#-1:保证isr写入数不小于min.insync.replicas(在server.properties 或者创建主题的时候设置)才算成功
spring.kafka.producer.acks=1
# 批量大小 16K=16*1024
spring.kafka.producer.batch-size=16384
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=100
# 生产端缓冲区大小 32M=1024*1024*32
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#请求最大字节数,防止数据过大 1M=1024*1024
spring.kafka.producer.properties.max.request.size=1048576
# 自定义分区器 默认org.apache.kafka.clients.producer.internals.DefaultPartitioner
#spring.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#----------------初始化消费者配置------------------------
spring.kafka.consumer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
# 默认的消费组ID
#spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:消费当前所有的数据(重置为分区中最小的offset);
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=10000
# 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常
spring.kafka.consumer.properties.request.timeout.ms=30000
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 设置批量消费
#spring.kafka.listener.type=batch
#manual: 表示手动提交,但是测试下来发现是批量提交
#manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual_immediate
# 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交
spring.kafka.listener.poll-timeout=50S
复制代码


demo


@Service
public class KafkaService {
  private Logger logger = LoggerFactory.getLogger(KafkaService.class);
  @Resource
    private KafkaTemplate<Object, Object> template;
  public void sendInfo(String topic,String data) {
    template.send(topic, data);
    logger.info("{}:{}",topic,data);
  }
  public void sendProducerInfo(String topic,String data) {
    ProducerRecord<Object, Object> pr = new ProducerRecord<>(topic, data);
    template.send(pr);
    logger.info("{}:{}",topic,data);
  }
  @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }
  @KafkaListener(id = "webGroup1", topics = "hello")
  public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment ack,
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("单条消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    ack.acknowledge();
  }
  @KafkaListener(id = "webGroup2", topics = "hello")
  public void onMessageButch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
    for(ConsumerRecord<?, ?> record:records) {
        System.out.println("批量消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    }
    ack.acknowledge();
  }
}


相关文章
|
12天前
|
JavaScript Java 测试技术
基于springboot+vue.js+uniapp小程序的高校实习信息发布网站附带文章源码部署视频讲解等
基于springboot+vue.js+uniapp小程序的高校实习信息发布网站附带文章源码部署视频讲解等
55 20
|
9天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue+uniapp的电影信息推荐APP的详细设计和实现
基于SpringBoot+Vue+uniapp的电影信息推荐APP的详细设计和实现
37 16
|
4天前
|
消息中间件 Java Kafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
25 7
|
5天前
|
消息中间件 Java Kafka
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
SpringBoot实用开发篇第六章(整合第三方技术,ActiveMQ,RabbitMQ,RocketMQ,Kafka)
|
3天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue的研究生导师管理信息系统的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue的研究生导师管理信息系统的详细设计和实现(源码+lw+部署文档+讲解等)
19 2
|
3天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue的校园跑腿信息系统的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue的校园跑腿信息系统的详细设计和实现(源码+lw+部署文档+讲解等)
18 1
|
4天前
|
Java
基于SpringBoot的餐厅会员管理信息系统【程序资源下载】
基于SpringBoot的餐厅会员管理信息系统【程序资源下载】
6 1
|
11天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue的ITS 信息平台的详细设计和实现
基于SpringBoot+Vue的ITS 信息平台的详细设计和实现
5 0
基于SpringBoot+Vue的ITS 信息平台的详细设计和实现
|
4天前
|
JavaScript Java 测试技术
基于SpringBoot+Vue+uniapp的速达物流信息查询微信小程序的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue+uniapp的速达物流信息查询微信小程序的详细设计和实现(源码+lw+部署文档+讲解等)
|
4天前
|
开发框架 安全 Java
信息打点-语言框架&开发组件&FastJson&Shiro&Log4j&SpringBoot等
信息打点-语言框架&开发组件&FastJson&Shiro&Log4j&SpringBoot等