Apache Kafka - ConsumerInterceptor 实战(2)

简介: Apache Kafka - ConsumerInterceptor 实战(2)

20191116123525638.png



Pre

Apache Kafka - ConsumerInterceptor 实战 (1) 用代码的方式实现了ConsumerInterceptor , 接下来我们用 配置的方式来实现一下 。


思路

如何找配置类

KafkaProperties

5b0110c7e372455ea98d692ededf89e8.png


有些属性是很明显的有的,其他没有的一般都在 Map里


8859f6b86d47461fa81376f2ff8bd6d5.png


那map的 key value 从哪里找呢?

找原生的配置 Kafka Consumer的 都在 ConsumerConfig


979eab0aec934f2e89a8b6ab7915278e.png


找到

public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";


c81113a14b844dd897e09de4b6d27424.png

OK,继续


示例

配置文件



652452561ed74b6b945782037ca78041.png


自定义 拦截器

package net.zf.module.system.kafka.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @author artisan
 */
@Slf4j
@Component
public class FailureRateInterceptor implements ConsumerInterceptor<Object, Object> {
    /**
     * 消息消费前的拦截处理
     *
     * @param consumerRecords
     * @return
     */
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        // TODO
        log.info("FailureRateInterceptor#onConsume");
        // 根据设定的规则计算失败率,并进行判断是否跳过消息的消费
        // 返回ConsumerRecords对象, 继续执行下游的消费逻辑或者直接返回空的ConsumerRecords对象 (ConsumerRecords.EMPTY)
        return consumerRecords;
    }
    /**
     * 消息提交前进行拦截处理
     *
     * @param map
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        log.info("FailureRateInterceptor#onCommit");
    }
    /**
     * 拦截器关闭前进行拦截处理(如果有的话)
     */
    @Override
    public void close() {
        log.info("FailureRateInterceptor#close");
    }
    /**
     * 初始化配置(如果有的话)
     *
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {
        log.info("FailureRateInterceptor#configure");
    }
}


使用


ccf608e13a1c4cb9b23b27875ee4ea1d.png


测试

启动服务,发送消息,进行消费


405a54138cf049c18878ff2cb3079aa6.png

小结

在Spring Boot中配置Kafka消费者的拦截器需要进行以下步骤:


首先,创建一个拦截器类,实现Kafka的ConsumerInterceptor接口,定义拦截器的逻辑。

在应用的配置文件(例如application.properties或application.yml)中,添加拦截器相关的配置项,其中包括设置interceptor.class属性为拦截器类的全限定名。

下面是一个示例,演示如何在Spring Boot中配置Kafka消费者的拦截器:


创建拦截器类:

@Slf4j
@Component
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        // 在消息消费前的处理逻辑
        // ...
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 在消息提交前的处理逻辑
        // ...
    }
    @Override
    public void close() {
        // 拦截器关闭前的处理逻辑
        // ...
    }
    @Override
    public void configure(Map<String, ?> configs) {
        // 初始化配置的处理逻辑
        // ...
    }
}


  1. 在应用的配置文件中设置拦截器相关的配置项:
spring.kafka.consumer.properties.interceptor.classes=com.example.MyConsumerInterceptor


或者在application.yml文件中:

spring:
  kafka:
    consumer:
      properties:
        interceptor.classes: com.example.MyConsumerInterceptor


这样配置之后,Spring Boot会自动创建Kafka消费者,并将指定的拦截器应用于消费者。在消费者处理消息的过程中,拦截器的方法将会被调用,可以在这些方法中编写自定义的逻辑来处理消息或拦截操作。

相关文章
|
2月前
|
消息中间件 前端开发 Kafka
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
【Azure 事件中心】使用Apache Flink 连接 Event Hubs 出错 Kafka error: No resolvable bootstrap urls
|
9天前
|
消息中间件 监控 Kafka
Apache Kafka 成为实时数据流处理的关键组件
【10月更文挑战第8天】随着大数据技术的发展,Apache Kafka 成为实时数据流处理的关键组件。Kafka Manager 提供了一个简洁易用的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件修改、启动服务、创建和管理 Topic 等操作,帮助你快速上手。
22 3
|
13天前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
29 3
|
13天前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
21 2
|
13天前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
37 1
|
14天前
|
消息中间件 JavaScript 前端开发
用于全栈数据流的 JavaScript、Node.js 和 Apache Kafka
用于全栈数据流的 JavaScript、Node.js 和 Apache Kafka
35 1
|
16天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
43 0
|
2月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
61 0
|
2月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)

推荐镜像

更多