kafka SpringBoot

简介: kafka SpringBoot

kafka SpringBoot

启动zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

启动kafka

 ./bin/kafka-server-start.sh ./config/server.properties

创建topic

 bin/kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092

SpringBoot 引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml配置

server:
  port: 8080


spring:
  application:
    name: paw-kafka
  #kafka
  kafka:
    # 连接kafka的地址,多个地址用逗号分隔
    bootstrap-servers: localhost:9092
    #producer
    producer:
      #若设置大于0的值,客户端会将发送失败的记录重新发送
      retries: 0
      #当将多个记录被发送到同一个分区时Producer将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
      batch-size: 16384
      #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
      buffer-memory: 33554432
      #关键字的序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #值的序列化类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1

    #cousumer
    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-group-id
      properties:
        session.timeout.ms: 15000

创建生产者、消费者

生产者通过KafkaTemplate.send发送消息,send可以指定key, 同一key发往同一分区,会发给同一消费者保证时序性。
消费者通过@KafkaListener监听topic,groupId指定分组,kafka将消息发往所有的分组,同一分组会选择一个消费者。

@Service
@Slf4j
public class KfkService {
  @Autowired
  private KafkaTemplate<Integer,String> kafkaTemplate;


  //消费者:监听topic1,groupId1
  @KafkaListener(topics = {"topic1"},groupId = "groupId1")
  public void consumer1(ConsumerRecord<Integer,String> record){
    log.info("consumer1 kfk consume message start...");
    log.info("consumer1 kfk consume message topic:{},msg:{}",record.topic(),record.value());
    log.info("consumer1 kfk consume message end...");
  }
  //消费者:监听topic1,groupId2
  @KafkaListener(topics = {"topic1"},groupId = "groupId2")
  public void consumer3(ConsumerRecord<Integer,String> record){
    log.info("consumer3 kfk consume message start...");
    log.info("consumer3 kfk consume message topic:{},msg:{}",record.topic(),record.value());
    log.info("consumer3 kfk consume message end...");
  }
  //消费者:监听topic1,groupId2
  @KafkaListener(topics = {"topic1"},groupId = "groupId2")
  public void consumer2(ConsumerRecord<Integer,String> record){
    log.info("consumer2 kfk consume message start...");
    log.info("consumer2 kfk consume message topic:{},msg:{}",record.topic(),record.value());
    log.info("consumer2 kfk consume message end...");
  }

  //生产者
  public void sendMsg(String topic , String msg){
    log.info("开始发送kfk消息,topic:{},msg:{}",topic,msg);

    ListenableFuture<SendResult<Integer, String>> sendMsg = kafkaTemplate.send(topic, msg);
    //消息确认
    sendMsg.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
      @Override
      public void onFailure(Throwable throwable) {
        log.error("send error,ex:{},topic:{},msg:{}",throwable,topic,msg);
      }

      @Override
      public void onSuccess(SendResult<Integer, String> stringStringSendResult) {
        log.info("send success,topic:{},msg:{}",topic,msg);
      }
    });
    log.info("kfk send end!");
  }

}

测试类

@RestController
public class KfkController {
    @Autowired
    private KfkService kfkService;
    @GetMapping("/send")
    public String send(){
        kfkService.sendMsg("topic1","I am topic msg");
        return "success-topic1";
    }
}

gitee: https://gitee.com/tg_seahorse/paw-demos paw-kafka项目

相关文章
|
22天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
55 5
|
24天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
37 1
|
2月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
76 2
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
171 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
185 4
|
5月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
66 8
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
120 0
|
4月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
41 0