一起来学kafka之整合SpringBoot深入使用(一)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka整合SpringBoot中如何进行消息应答以及@SendTo 和 @KafkaListener的讲解~好了, 废话不多说直接开整吧~消息应答有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka整合SpringBoot中如何进行消息应答以及@SendTo 和 @KafkaListener的讲解~

好了, 废话不多说直接开整吧~

消息应答

有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController

@Slf4j
@RestController
public class ReceiveCustomerController {
    private static final String topic = "hello3";
    private static final String topicCroup = "hello3Group";
    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies");
        repliesContainer.getContainerProperties().setGroupId(topicCroup);
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }
    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
    }
    @Bean
    public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate(producerFactory);
    }
    @Autowired
    private ReplyingKafkaTemplate kafkaReplyTemplate;
    @GetMapping("/send/{msg}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMsg(@PathVariable String msg) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1:
    }
    @KafkaListener(id = topicCroup, topics = topic)
    @SendTo
    public String listen(String msg) {
        log.info("listen receive msg >>>  {}", msg); // listen receive msg >>>  1
        return "listen: I do it >>> " + msg;
    }
}

 启动应用,测试一下,观察控制台的变化~

 // listen receive msg >>>  1
 // customer reply >>>>listen: I do it >>> 1:

@SendTo

Spring Kafka 中,@SendTo 注解可以用于指定消息被发送到的目标 Topic。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic,以供其他消费者进一步处理。

@SendTo 注解可以应用于 Kafka 消费者方法上,以指定消息的处理结果将被发送到哪个 Topic。下面通过一个例子来演示一下如何进行消息的转发~

@Slf4j
@RestController
public class SendToController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    // 接收消息
    @Transactional(rollbackFor = Exception.class)
    @KafkaListener(id = "input", topics = "inputTopic")
    @SendTo("outputTopic")
    public String processMessage(String message) {
        // 处理消息并返回结果
        log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1
        return "2";
    }
    @KafkaListener(id = "output", topics = "outputTopic")
    public String process1Message(String message) {
        // 处理消息并返回结果
        String result = "Processed message: " + message;
        log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2
        return result;
    }
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("inputTopic", "1");
        return "hello";
    }
}

观察控制台的日志信息

inputTopic >>>> 1
outputTopic >>>> Processed message: 2

可以看到消息被转发到outputTopic并且被output消费者成功消费

@KafkaListener

@KafkaListener 是一个注解,用于标记一个方法作为 Kafka 消费者。在 Spring Boot 应用程序中,使用该注解可以方便地处理Kafka 消息。

@KafkaListener 注解可以添加到类级别或方法级别。在类级别添加注解,将指定默认的 Topic消费者组 ID。在方法级别添加注解,则可以使用不同的 Topic消费者组 ID

在前面的几个例子中,带大家已经体验过了,但都是监听一个topic,那么如何去监听多个topic呢? 其实很简单,下面通过一个例子来体验下

@Slf4j
@RestController
public class ListenerController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("topic1", "1");
        kafkaTemplate.send("topic2", "2");
        return "hello";
    }
    /**
     * 监听多个topic
     * @param message
     */
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1")
    public void listen(String message) {
        log.info("Received message >>> {}", message);
        // Received message >>> 1
        // Received message >>> 2
    }
}

如上,我们发送了两条不同的消息topic1topic2,通过指定topics = {"topic1", "topic2"}成功消费两条消息

下面一起看一下,@KafkaListener还支持哪些参数?

@KafkaListener 注解支持许多参数,以满足不同的使用场景。以下是常用参数的列表:

  • topics:指定要消费的 Topic 的名称,可以是字符串或字符串数组。必填参数。
  • groupId:指定消费者组 ID。消费者组是一组共享相同 Topic 的消费者的集合。默认值为 "",表示使用默认的消费者组 ID
  • containerFactory:指定要使用的 KafkaListenerContainerFactory 实例的名称。如果没有指定,将使用默认的 KafkaListenerContainerFactory 实例。
  • concurrency:指定要创建的并发消费者的数量。默认值为 1
  • autoStartup:指定容器是否在应用启动时自动启动。默认值为 true
  • id:指定监听器的唯一标识符。默认值为 ""
  • errorHandler:指定在处理消息时出现异常时要使用的 ErrorHandler 实例。
  • properties:指定传递给消费者工厂的 Kafka 消费者配置属性的 Map
  • partitionOffsets: 是一个 Map 类型的参数,该参数用于指定要从 Topic每个分区的哪个偏移量开始消费消息

有几个参数很好理解,没啥好讲的,我们主要看一下containerFactory,errorHandler, partitionOffsets

containerFactory

前面我们使用的都是默认消息监听器,在 Spring Kafka 中,Kafka 消费者可以使用不同的消息监听器容器,例如 ConcurrentKafkaListenerContainerFactoryKafkaMessageListenerContainer 等。每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置。 如果你需要自定义 Kafka 消费者的配置选项,可以通过在 Spring Boot 配置文件中设置属性来实现。另外一种方法是通过创建 KafkaListenerContainerFactory bean 并配置其属性来实现。

下面通过一个例子来体验一下ConcurrentKafkaListenerContainerFactory~

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
         //也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        //factory.setBatchListener(true);
        return factory;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

在上面的示例中,我们创建了一个 ConcurrentKafkaListenerContainerFactorybean,并配置了一些属性,例如:

  • 使用 DefaultKafkaConsumerFactory 作为消费者工厂
  • 设置并发消费者数量为 3
  • 设置轮询超时时间为 3000 毫秒

那么如何使用呢?很简单,只需之指定containerFactory就好了

@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello1")
public String hello1() {
    // 发送消息
    kafkaTemplate.send("topic3", "3");
    return "hello1";
}
@KafkaListener(topics = "topic3", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String message) {
    // 处理消息
    log.info("hello1 >>>>> {}", message);
}

我们可以通过containerFactory 应用于不同的消费者方法和主题,以满足不同的需求

errorHandler

在 ·Spring Kafka· 中,·ErrorHandler· 接口定义了在处理 Kafka 消息时发生错误时如何处理异常。如果不配置 ErrorHandler,则默认使用 LoggingErrorHandler 将异常记录到日志中。

如果你需要自定义异常处理逻辑,可以通过实现 ErrorHandler 接口并配置其 bean 来实现。

下面通过一个例子来体验一下:

@Slf4j
@Component
public class MyErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
        // 处理异常,例如打印错误日志、发送错误消息等,自定义逻辑处理
        log.error("Error occurred while processing message:  {}", e.getMessage());
        return null;
    }
}

使用错误处理器:

@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello2")
public String hello2() {
    // 发送消息
    kafkaTemplate.send("topic4", "4");
    return "hello2";
}
@KafkaListener(id = "topic4Group", topics = "topic4", errorHandler = "myErrorHandler")
public void processMessage1(String message) {
    // 处理消息
    if(true)
        throw new RuntimeException("消息处理异常");
    log.info("hello2 >>>>> {}", message);
}

观察控制台:

Error occurred while processing message:  Listener method 'public void com.kafka.study.controller.ListenerController.processMessage1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 消息处理异常

结束语

本节还遗留一个partitionOffsets这个我们放到下节给大家讲,涉及到分区 partition偏移量 offset的概念,不是很好理解,下节给大家好好理一下这个概念~

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

项目源码(源码已更新 欢迎star⭐️)

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)







相关文章
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
93 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
75 1
|
3月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
92 2
|
5月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
248 3
|
5月前
|
消息中间件 Java Kafka
|
5月前
|
消息中间件 Java Kafka
|
5月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
219 4
|
6月前
|
消息中间件 Java Kafka
spring boot 整合kafka
spring boot 整合kafka
75 8
|
5月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
131 0
|
5月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
50 0