springboot中kafka配置信息记录

简介: springboot中kafka配置信息记录

配置参考



<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.3.8.RELEASE</version>
     </dependency>
复制代码


#-----------------kafka----------------------------
spring.kafka.producer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
#----------------生产者配置------------------------
# 重试次数,如果设置可能改变消息顺序。例如消息1重试2成功
spring.kafka.producer.retries=0
#可选0,1,-1 默认1
#0:只要发送过去就返回ack 吞吐量高,可以容忍一些数据丢失
#1:当有个一个副本(leader的)写成功就算成功,如果leader挂了其他broker没有同步则可能出现消息丢失
#-1:保证isr写入数不小于min.insync.replicas(在server.properties 或者创建主题的时候设置)才算成功
spring.kafka.producer.acks=1
# 批量大小 16K=16*1024
spring.kafka.producer.batch-size=16384
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=100
# 生产端缓冲区大小 32M=1024*1024*32
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#请求最大字节数,防止数据过大 1M=1024*1024
spring.kafka.producer.properties.max.request.size=1048576
# 自定义分区器 默认org.apache.kafka.clients.producer.internals.DefaultPartitioner
#spring.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
#----------------初始化消费者配置------------------------
spring.kafka.consumer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094
# 默认的消费组ID
#spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:消费当前所有的数据(重置为分区中最小的offset);
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=10000
# 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常
spring.kafka.consumer.properties.request.timeout.ms=30000
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 设置批量消费
#spring.kafka.listener.type=batch
#manual: 表示手动提交,但是测试下来发现是批量提交
#manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.ack-mode=manual_immediate
# 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交
spring.kafka.listener.poll-timeout=50S
复制代码


demo


@Service
public class KafkaService {
  private Logger logger = LoggerFactory.getLogger(KafkaService.class);
  @Resource
    private KafkaTemplate<Object, Object> template;
  public void sendInfo(String topic,String data) {
    template.send(topic, data);
    logger.info("{}:{}",topic,data);
  }
  public void sendProducerInfo(String topic,String data) {
    ProducerRecord<Object, Object> pr = new ProducerRecord<>(topic, data);
    template.send(pr);
    logger.info("{}:{}",topic,data);
  }
  @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }
  @KafkaListener(id = "webGroup1", topics = "hello")
  public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment ack,
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    System.out.println("单条消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    ack.acknowledge();
  }
  @KafkaListener(id = "webGroup2", topics = "hello")
  public void onMessageButch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
    for(ConsumerRecord<?, ?> record:records) {
        System.out.println("批量消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value());
    }
    ack.acknowledge();
  }
}


相关文章
消息中间件 Java Kafka
35 0
|
16天前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
1月前
|
安全 算法 Java
在Spring Boot中应用Jasypt以加密配置信息。
通过以上步骤,可以在Spring Boot应用中有效地利用Jasypt对配置信息进行加密,这样即使配置文件被泄露,其中的敏感信息也不会直接暴露给攻击者。这是一种在不牺牲操作复杂度的情况下提升应用安全性的简便方法。
677 10
|
2月前
|
人工智能 安全 Java
Spring Boot yml 配置敏感信息加密
本文介绍了如何在 Spring Boot 项目中使用 Jasypt 实现配置文件加密,包含添加依赖、配置密钥、生成加密值、在配置中使用加密值及验证步骤,并提供了注意事项,确保敏感信息的安全管理。
774 1
|
5月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
1027 7
|
6月前
|
Java 微服务 Spring
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——少量配置信息的情形
在微服务架构中,随着业务复杂度增加,项目可能需要调用多个微服务。为避免使用`@Value`注解逐一引入配置的繁琐,可通过定义配置类(如`MicroServiceUrl`)并结合`@ConfigurationProperties`注解实现批量管理。此方法需在配置文件中设置微服务地址(如订单、用户、购物车服务),并通过`@Component`将配置类纳入Spring容器。最后,在Controller中通过`@Resource`注入配置类即可便捷使用,提升代码可维护性。
101 0
|
6月前
|
Java 测试技术 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——少量配置信息的情形
本课主要讲解Spring Boot项目中的属性配置方法。在实际开发中,测试与生产环境的配置往往不同,因此不应将配置信息硬编码在代码中,而应使用配置文件管理,如`application.yml`。例如,在微服务架构下,可通过配置文件设置调用其他服务的地址(如订单服务端口8002),并利用`@Value`注解在代码中读取这些配置值。这种方式使项目更灵活,便于后续修改和维护。
94 0
|
6月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
254 10
|
7月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
122 5
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
545 5