前言
目前正在出一个Kafka专题
系列教程, 篇幅会较多, 喜欢的话,给个关注❤️ ~
本节给大家讲一下Kafka整合SpringBoot
中如何进行消息应答以及@SendTo 和 @KafkaListener
的讲解~
好了, 废话不多说直接开整吧~
消息应答
有时候,消费者消费消息的时候,我们需要知道它有没有消费完,需要它给我们一个回应,该怎么做呢? 我们可以通过提供的ReplyingKafkaTemplate
, 下面通过一个例子来体验一下,新建一个ReceiveCustomerController
@Slf4j @RestController public class ReceiveCustomerController { private static final String topic = "hello3"; private static final String topicCroup = "hello3Group"; @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer(topic + "_replies"); repliesContainer.getContainerProperties().setGroupId(topicCroup); repliesContainer.setAutoStartup(false); return repliesContainer; } @Bean public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> producerFactory, ConcurrentMessageListenerContainer<String, String> repliesContainer) { return new ReplyingKafkaTemplate(producerFactory, repliesContainer); } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> producerFactory) { return new KafkaTemplate(producerFactory); } @Autowired private ReplyingKafkaTemplate kafkaReplyTemplate; @GetMapping("/send/{msg}") @Transactional(rollbackFor = RuntimeException.class) public void sendMsg(@PathVariable String msg) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg); RequestReplyFuture<String, String, String> replyFuture = kafkaReplyTemplate.sendAndReceive(record); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); log.info("customer reply >>>> {}: ", consumerRecord.value()); // customer reply >>>>listen: I do it >>> 1: } @KafkaListener(id = topicCroup, topics = topic) @SendTo public String listen(String msg) { log.info("listen receive msg >>> {}", msg); // listen receive msg >>> 1 return "listen: I do it >>> " + msg; } }
启动应用,测试一下,观察控制台的变化~
// listen receive msg >>> 1 // customer reply >>>>listen: I do it >>> 1:
@SendTo
在 Spring Kafka
中,@SendTo
注解可以用于指定消息被发送到的目标 Topic
。当消费者成功消费一个消息后,可以将结果发送到指定的目标 Topic
,以供其他消费者进一步处理。
@SendTo
注解可以应用于 Kafka
消费者方法上,以指定消息的处理结果将被发送到哪个 Topic
。下面通过一个例子来演示一下如何进行消息的转发~
@Slf4j @RestController public class SendToController { @Autowired private KafkaTemplate<Object, Object> kafkaTemplate; // 接收消息 @Transactional(rollbackFor = Exception.class) @KafkaListener(id = "input", topics = "inputTopic") @SendTo("outputTopic") public String processMessage(String message) { // 处理消息并返回结果 log.info("inputTopic >>>> {}", message); // inputTopic >>>> 1 return "2"; } @KafkaListener(id = "output", topics = "outputTopic") public String process1Message(String message) { // 处理消息并返回结果 String result = "Processed message: " + message; log.info("outputTopic >>>> {}", result); // outputTopic >>>> Processed message: 2 return result; } @Transactional(rollbackFor = Exception.class) @GetMapping("/hello") public String hello() { // 发送消息 kafkaTemplate.send("inputTopic", "1"); return "hello"; } }
观察控制台的日志信息
inputTopic >>>> 1 outputTopic >>>> Processed message: 2
可以看到消息被转发到outputTopic
并且被output
消费者成功消费
@KafkaListener
@KafkaListener
是一个注解
,用于标记一个方法作为 Kafka 消费者
。在 Spring Boot
应用程序中,使用该注解可以方便地处理Kafka
消息。
@KafkaListener
注解可以添加到类级别或方法级别。在类级别添加注解,将指定默认的 Topic
和消费者组 ID
。在方法级别添加注解,则可以使用不同的 Topic
和消费者组 ID
。
在前面的几个例子中,带大家已经体验过了,但都是监听一个topic
,那么如何去监听多个topic
呢? 其实很简单,下面通过一个例子来体验下
@Slf4j @RestController public class ListenerController { @Autowired private KafkaTemplate<Object, Object> kafkaTemplate; @Transactional(rollbackFor = Exception.class) @GetMapping("/hello") public String hello() { // 发送消息 kafkaTemplate.send("topic1", "1"); kafkaTemplate.send("topic2", "2"); return "hello"; } /** * 监听多个topic * @param message */ @KafkaListener(topics = {"topic1", "topic2"}, groupId = "group1") public void listen(String message) { log.info("Received message >>> {}", message); // Received message >>> 1 // Received message >>> 2 } }
如上,我们发送了两条不同的消息topic1
和topic2
,通过指定topics = {"topic1", "topic2"}
成功消费两条消息
下面一起看一下,@KafkaListener
还支持哪些参数?
@KafkaListener
注解支持许多参数,以满足不同的使用场景。以下是常用参数的列表:
topics
:指定要消费的Topic
的名称,可以是字符串或字符串数组。必填参数。groupId
:指定消费者组ID
。消费者组是一组共享相同Topic
的消费者的集合。默认值为""
,表示使用默认的消费者组ID
。containerFactory
:指定要使用的KafkaListenerContainerFactory
实例的名称。如果没有指定,将使用默认的KafkaListenerContainerFactory
实例。concurrency
:指定要创建的并发消费者的数量。默认值为1
。autoStartup
:指定容器是否在应用启动时自动启动。默认值为true
。id
:指定监听器的唯一标识符。默认值为""
。errorHandler
:指定在处理消息时出现异常时要使用的ErrorHandler
实例。properties
:指定传递给消费者工厂的Kafka
消费者配置属性的Map
。partitionOffsets
: 是一个Map
类型的参数,该参数用于指定要从Topic
的每个分区
的哪个偏移量
开始消费消息
有几个参数很好理解,没啥好讲的,我们主要看一下containerFactory
,errorHandler
, partitionOffsets
containerFactory
前面我们使用的都是默认
的消息监听器
,在 Spring Kafka
中,Kafka 消费者
可以使用不同的消息监听器容器
,例如 ConcurrentKafkaListenerContainerFactory
、KafkaMessageListenerContainer
等。每个容器都提供了不同的功能和配置选项,可以根据实际需求进行选择和配置。 如果你需要自定义 Kafka 消费者
的配置选项,可以通过在 Spring Boot
配置文件中设置属性来实现。另外一种方法是通过创建 KafkaListenerContainerFactory bean
并配置其属性来实现。
下面通过一个例子来体验一下ConcurrentKafkaListenerContainerFactory
~
@Configuration @EnableKafka public class KafkaConfig { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); //也可以设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG //factory.setBatchListener(true); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }
在上面的示例中,我们创建了一个 ConcurrentKafkaListenerContainerFactory
的 bean
,并配置了一些属性,例如:
- 使用
DefaultKafkaConsumerFactory
作为消费者工厂
; - 设置并发消费者数量为
3
; - 设置轮询超时时间为
3000 毫秒
。
那么如何使用呢?很简单,只需之指定containerFactory
就好了
@Transactional(rollbackFor = Exception.class) @GetMapping("/hello1") public String hello1() { // 发送消息 kafkaTemplate.send("topic3", "3"); return "hello1"; } @KafkaListener(topics = "topic3", containerFactory = "kafkaListenerContainerFactory") public void processMessage(String message) { // 处理消息 log.info("hello1 >>>>> {}", message); }
我们可以通过containerFactory
应用于不同的消费者方法和主题,以满足不同的需求
errorHandler
在 ·Spring Kafka· 中,·ErrorHandler· 接口定义了在处理 Kafka
消息时发生错误时如何处理异常。如果不配置 ErrorHandler
,则默认使用 LoggingErrorHandler
将异常记录到日志中。
如果你需要自定义异常处理逻辑,可以通过实现 ErrorHandler
接口并配置其 bean
来实现。
下面通过一个例子来体验一下:
@Slf4j @Component public class MyErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException e) { // 处理异常,例如打印错误日志、发送错误消息等,自定义逻辑处理 log.error("Error occurred while processing message: {}", e.getMessage()); return null; } }
使用错误处理器:
@Transactional(rollbackFor = Exception.class) @GetMapping("/hello2") public String hello2() { // 发送消息 kafkaTemplate.send("topic4", "4"); return "hello2"; } @KafkaListener(id = "topic4Group", topics = "topic4", errorHandler = "myErrorHandler") public void processMessage1(String message) { // 处理消息 if(true) throw new RuntimeException("消息处理异常"); log.info("hello2 >>>>> {}", message); }
观察控制台:
Error occurred while processing message: Listener method 'public void com.kafka.study.controller.ListenerController.processMessage1(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: 消息处理异常
结束语
本节还遗留一个partitionOffsets
这个我们放到下节给大家讲,涉及到分区 partition
和偏移量 offset
的概念,不是很好理解,下节给大家好好理一下这个概念~
本着把自己知道的都告诉大家,如果本文对您有所帮助,点赞+关注
鼓励一下呗~
相关文章
项目源码(源码已更新 欢迎star⭐️)
ElasticSearch 专题学习
- 利用docker搭建es集群
- 一起来学ElasticSearch(一)
- 一起来学ElasticSearch(二)
- 一起来学ElasticSearch(三)
- 一起来学ElasticSearch(四)
- 一起来学ElasticSearch(五)
- 一起来学ElasticSearch(六)
- 一起来学ElasticSearch(七)
- 一起来学ElasticSearch(八)
- 一起来学ElasticSearch(九)
- 一起来学ElasticSearch(十)
- 一起来学ElasticSearch之整合SpringBoot(一)
- 一起来学ElasticSearch之整合SpringBoot(二)
- 一起来学ElasticSearch之整合SpringBoot(三)
项目源码(源码已更新 欢迎star⭐️)
往期并发编程内容推荐
- Java多线程专题之线程与进程概述
- Java多线程专题之线程类和接口入门
- Java多线程专题之进阶学习Thread(含源码分析)
- Java多线程专题之Callable、Future与FutureTask(含源码分析)
- 面试官: 有了解过线程组和线程优先级吗
- 面试官: 说一下线程的生命周期过程
- 面试官: 说一下线程间的通信
- 面试官: 说一下Java的共享内存模型
- 面试官: 有了解过指令重排吗,什么是happens-before
- 面试官: 有了解过volatile关键字吗 说说看
- 面试官: 有了解过Synchronized吗 说说看
- Java多线程专题之Lock锁的使用
- 面试官: 有了解过ReentrantLock的底层实现吗?说说看
- 面试官: 有了解过CAS和原子操作吗?说说看
- Java多线程专题之线程池的基本使用
- 面试官: 有了解过线程池的工作原理吗?说说看
- 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看
- 面试官: 阻塞队列有了解过吗?说说看
- 面试官: 阻塞队列的底层实现有了解过吗? 说说看
- 面试官: 同步容器和并发容器有用过吗? 说说看
- 面试官: CopyOnWrite容器有了解过吗? 说说看
- 面试官: Semaphore在项目中有使用过吗?说说看(源码剖析)
- 面试官: Exchanger在项目中有使用过吗?说说看(源码剖析)
- 面试官: CountDownLatch有了解过吗?说说看(源码剖析)
- 面试官: CyclicBarrier有了解过吗?说说看(源码剖析)
- 面试官: Phaser有了解过吗?说说看
- 面试官: Fork/Join 有了解过吗?说说看(含源码分析)
- 面试官: Stream并行流有了解过吗?说说看
推荐 SpringBoot & SpringCloud (源码已更新 欢迎star⭐️)
- springboot-all
地址
: github.com/qiuChenglei…- SpringBoot系列教程合集
- 一起来学SpringCloud合集
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(一)
- SpringCloud整合 Oauth2+Gateway+Jwt+Nacos 实现授权码模式的服务认证(二)