Pre
Apache Kafka - ConsumerInterceptor 实战 (1) 用代码的方式实现了ConsumerInterceptor , 接下来我们用 配置的方式来实现一下 。
思路
如何找配置类
KafkaProperties
有些属性是很明显的有的,其他没有的一般都在 Map里
那map的 key value 从哪里找呢?
找原生的配置 Kafka Consumer的 都在 ConsumerConfig
找到
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
OK,继续
示例
配置文件
自定义 拦截器
package net.zf.module.system.kafka.interceptor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Component; import java.util.Map; /** * @author artisan */ @Slf4j @Component public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> { /** * 消息消费前的拦截处理 * * @param consumerRecords * @return */ @Override public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) { // TODO log.info("FailureRateInterceptor#onConsume"); // 根据设定的规则计算失败率,并进行判断是否跳过消息的消费 // 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象 (ConsumerRecords.EMPTY) return consumerRecords; } /** * 消息提交前进行拦截处理 * * @param map */ @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) { log.info("FailureRateInterceptor#onCommit"); } /** * 拦截器关闭前进行拦截处理(如果有的话) */ @Override public void close() { log.info("FailureRateInterceptor#close"); } /** * 初始化配置(如果有的话) * * @param map */ @Override public void configure(Map<String, ?> map) { log.info("FailureRateInterceptor#configure"); } }
使用
测试
启动服务,发送消息,进行消费
小结
在Spring Boot中配置Kafka消费者的拦截器需要进行以下步骤:
首先,创建一个拦截器类,实现Kafka的ConsumerInterceptor接口,定义拦截器的逻辑。
在应用的配置文件(例如application.properties或application.yml)中,添加拦截器相关的配置项,其中包括设置interceptor.class属性为拦截器类的全限定名。
下面是一个示例,演示如何在Spring Boot中配置Kafka消费者的拦截器:
创建拦截器类:
@Slf4j @Component public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> { @Override public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) { // 在消息消费前的处理逻辑 // ... return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { // 在消息提交前的处理逻辑 // ... } @Override public void close() { // 拦截器关闭前的处理逻辑 // ... } @Override public void configure(Map<String, ?> configs) { // 初始化配置的处理逻辑 // ... } }
- 在应用的配置文件中设置拦截器相关的配置项:
spring.kafka.consumer.properties.interceptor.classes=com.example.MyConsumerInterceptor
或者在application.yml
文件中:
spring: kafka: consumer: properties: interceptor.classes: com.example.MyConsumerInterceptor
这样配置之后,Spring Boot会自动创建Kafka消费者,并将指定的拦截器应用于消费者。在消费者处理消息的过程中,拦截器的方法将会被调用,可以在这些方法中编写自定义的逻辑来处理消息或拦截操作。