一起来学kafka之整合SpringBoot深入使用(一)

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 前言目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~本节给大家讲一下Kafka整合SpringBoot中如何进行消息应答以及@SendTo 和 @KafkaListener的讲解~好了, 废话不多说直接开整吧~消息应答有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController

前言

目前正在出一个Kafka专题系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~

本节给大家讲一下Kafka整合SpringBoot中如何进行消息应答以及@SendTo 和 @KafkaListener的讲解~

好了, 废话不多说直接开整吧~

消息应答

有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController

@Slf4j
@RestController
public class ReceiveCustomerController {
    private static final String topic = "hello3";
    private static final String topicCroup = "hello3Group";
    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies");
        repliesContainer.getContainerProperties().setGroupId(topicCroup);
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }
    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
        return new ReplyingKafkaTemplate(producerFactory, repliesContainer);
    }
    @Bean
    public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate(producerFactory);
    }
    @Autowired
    private ReplyingKafkaTemplate kafkaReplyTemplate;
    @GetMapping("/send/{msg}")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMsg(@PathVariable String msg) throws Exception {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record);
        ConsumerRecord<String, String> consumerRecord = replyFuture.get();
        log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1:
    }
    @KafkaListener(id = topicCroup, topics = topic)
    @SendTo
    public String listen(String msg) {
        log.info("listen receive msg >>>  {}", msg); // listen receive msg >>>  1
        return "listen: I do it >>> " + msg;
    }
}

 启动应用,测试一下,观察控制台的变化~

 // listen receive msg >>>  1
 // customer reply >>>>listen: I do it >>> 1:

@SendTo

Spring Kafka 中,@SendTo 注解可以用于指定消息被发送到的目标 Topic。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic,以供其他消费者进一步处理。

@SendTo 注解可以应用于 Kafka 消费者方法上,以指定消息的处理结果将被发送到哪个 Topic。下面通过一个例子来演示一下如何进行消息的转发~

@Slf4j
@RestController
public class SendToController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    // 接收消息
    @Transactional(rollbackFor = Exception.class)
    @KafkaListener(id = "input", topics = "inputTopic")
    @SendTo("outputTopic")
    public String processMessage(String message) {
        // 处理消息并返回结果
        log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1
        return "2";
    }
    @KafkaListener(id = "output", topics = "outputTopic")
    public String process1Message(String message) {
        // 处理消息并返回结果
        String result = "Processed message: " + message;
        log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2
        return result;
    }
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("inputTopic", "1");
        return "hello";
    }
}

观察控制台的日志信息

inputTopic >>>> 1
outputTopic >>>> Processed message: 2

可以看到消息被转发到outputTopic并且被output消费者成功消费

@KafkaListener

@KafkaListener 是一个注解,用于标记一个方法作为 Kafka 消费者。在 Spring Boot 应用程序中,使用该注解可以方便地处理Kafka 消息。

@KafkaListener 注解可以添加到类级别或方法级别。在类级别添加注解,将指定默认的 Topic消费者组 ID。在方法级别添加注解,则可以使用不同的 Topic消费者组 ID

在前面的几个例子中,带大家已经体验过了,但都是监听一个topic,那么如何去监听多个topic呢? 其实很简单,下面通过一个例子来体验下

@Slf4j
@RestController
public class ListenerController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    @Transactional(rollbackFor = Exception.class)
    @GetMapping("/hello")
    public String hello() {
        // 发送消息
        kafkaTemplate.send("topic1", "1");
        kafkaTemplate.send("topic2", "2");
        return "hello";
    }
    /**
     * 监听多个topic
     * @param message
     */
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1")
    public void listen(String message) {
        log.info("Received message >>> {}", message);
        // Received message >>> 1
        // Received message >>> 2
    }
}

如上,我们发送了两条不同的消息topic1topic2,通过指定topics = {"topic1", "topic2"}成功消费两条消息

下面一起看一下,@KafkaListener还支持哪些参数?

@KafkaListener 注解支持许多参数,以满足不同的使用场景。以下是常用参数的列表:

  • topics:指定要消费的 Topic 的名称,可以是字符串或字符串数组。必填参数。
  • groupId:指定消费者组 ID。消费者组是一组共享相同 Topic 的消费者的集合。默认值为 "",表示使用默认的消费者组 ID
  • containerFactory:指定要使用的 KafkaListenerContainerFactory 实例的名称。如果没有指定,将使用默认的 KafkaListenerContainerFactory 实例。
  • concurrency:指定要创建的并发消费者的数量。默认值为 1
  • autoStartup:指定容器是否在应用启动时自动启动。默认值为 true
  • id:指定监听器的唯一标识符。默认值为 ""
  • errorHandler:指定在处理消息时出现异常时要使用的 ErrorHandler 实例。
  • properties:指定传递给消费者工厂的 Kafka 消费者配置属性的 Map
  • partitionOffsets: 是一个 Map 类型的参数,该参数用于指定要从 Topic每个分区的哪个偏移量开始消费消息

有几个参数很好理解,没啥好讲的,我们主要看一下containerFactory,errorHandler, partitionOffsets

containerFactory

前面我们使用的都是默认消息监听器,在 Spring Kafka 中,Kafka 消费者可以使用不同的消息监听器容器,例如 ConcurrentKafkaListenerContainerFactoryKafkaMessageListenerContainer 等。每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置。 如果你需要自定义 Kafka 消费者的配置选项,可以通过在 Spring Boot 配置文件中设置属性来实现。另外一种方法是通过创建 KafkaListenerContainerFactory bean 并配置其属性来实现。

下面通过一个例子来体验一下ConcurrentKafkaListenerContainerFactory~

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
         //也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        //factory.setBatchListener(true);
        return factory;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

在上面的示例中,我们创建了一个 ConcurrentKafkaListenerContainerFactorybean,并配置了一些属性,例如:

  • 使用 DefaultKafkaConsumerFactory 作为消费者工厂
  • 设置并发消费者数量为 3
  • 设置轮询超时时间为 3000 毫秒

那么如何使用呢?很简单,只需之指定containerFactory就好了

@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello1")
public String hello1() {
    // 发送消息
    kafkaTemplate.send("topic3", "3");
    return "hello1";
}
@KafkaListener(topics = "topic3", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String message) {
    // 处理消息
    log.info("hello1 >>>>> {}", message);
}

我们可以通过containerFactory 应用于不同的消费者方法和主题,以满足不同的需求

errorHandler

在 ·Spring Kafka· 中,·ErrorHandler· 接口定义了在处理 Kafka 消息时发生错误时如何处理异常。如果不配置 ErrorHandler,则默认使用 LoggingErrorHandler 将异常记录到日志中。

如果你需要自定义异常处理逻辑,可以通过实现 ErrorHandler 接口并配置其 bean 来实现。

下面通过一个例子来体验一下:

@Slf4j
@Component
public class MyErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
        // 处理异常,例如打印错误日志、发送错误消息等,自定义逻辑处理
        log.error("Error occurred while processing message:  {}", e.getMessage());
        return null;
    }
}

使用错误处理器:

@Transactional(rollbackFor = Exception.class)
@GetMapping("/hello2")
public String hello2() {
    // 发送消息
    kafkaTemplate.send("topic4", "4");
    return "hello2";
}
@KafkaListener(id = "topic4Group", topics = "topic4", errorHandler = "myErrorHandler")
public void processMessage1(String message) {
    // 处理消息
    if(true)
        throw new RuntimeException("消息处理异常");
    log.info("hello2 >>>>> {}", message);
}

观察控制台:

Error occurred while processing message:  Listener method 'public void com.kafka.study.controller.ListenerController.processMessage1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 消息处理异常

结束语

本节还遗留一个partitionOffsets这个我们放到下节给大家讲,涉及到分区 partition偏移量 offset的概念,不是很好理解,下节给大家好好理一下这个概念~

本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注鼓励一下呗~

相关文章

项目源码(源码已更新 欢迎star⭐️)

ElasticSearch 专题学习

项目源码(源码已更新 欢迎star⭐️)

往期并发编程内容推荐

推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)

博客(阅读体验较佳)







相关文章
|
1月前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
8月前
|
消息中间件 负载均衡 Java
Kafka与SpringBoot的整合使用
Kafka与SpringBoot的整合使用
171 0
|
9月前
|
消息中间件 存储 NoSQL
kafka整合springboot以及核心参数的使用
kafka整合springboot以及核心参数的使用
300 0
|
10月前
|
消息中间件 缓存 NoSQL
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
141 0
|
6月前
|
消息中间件 数据可视化 Java
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
72 0
|
3月前
|
消息中间件 SQL druid
最新版 springboot集成kafka
最新版 springboot集成kafka
32 0
|
9月前
|
消息中间件 数据可视化 Java
SpringBoot3集成Kafka
SpringBoot3集成KafkaKafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错。
208 0
|
5月前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
56 0
spring boot 集成kafka
|
5月前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
96 2
|
8月前
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
916 0

热门文章

最新文章