大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


Kafka 序列化器

Kafka 自定义序列化器

Kafka 分区器

Kafka 自定义分区器

cffb607f8ab920ed71ebf7323ee061e4_3022a8c3f1c5489b8228ea18283fce55.png

Producer拦截器(Interceptor)和 Consumer拦截器在 Kafka0.10 版本中引入的,主要是Client端的定制化控制逻辑。

对于Producer而言,Interceptor 使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化的需要,比如修改消息、修改时间等。

同时Producer允许指定多个Interceptor按顺序作用在同一条消息上,形成一个拦截链(Interceptor chain)。


onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在主线程中,Producer确保在消息序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何的操作,但最好不要修改消息所属的分区、topic等等,否则会影响分区的计算。

onAckonwledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息失败时调用,并且通常都是在Producer回调逻辑触发之前,onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重要的逻辑,并要用于执行一些资源清理工作。

close:关闭Interceptor,主要执行一些资源清理工作。

如果上所述,Interceptor 可能被运行在多个线程中,因为在具体实现时用户需要自行确保线程的安全。

倘若指定了多个Inteceptor,则Producer会按照顺序进行调用他们,并且其中可能抛出的异常会记录到日志中而不是向上传递!!!


自定义拦截器

根据对拦截器的观察学习,我们知道了,要实现自定义的拦截器,我们需要:


实现ProducerInterceptor接口

在KafkaProducer的设置中定义自定义的拦截器

自定义类

(上一节 大数据 Kafka 58 点击跳转)

借用我们刚才实现的 User 类,这里就不再写了。


自定义拦截器

自定义拦截器01

public class Interceptor01<K, V> implements ProducerInterceptor<K, V> {

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        System.out.println("=== 拦截器01 onSend ===");
        // 做一些操作
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("=== 拦截器01 onAcknowledgement ===");
        if (null != exception) {
            // 此处应该记录日志等操作
            exception.printStackTrace();
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

自定义拦截器02

public class Interceptor02<K, V> implements ProducerInterceptor<K, V> {

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        System.out.println("=== 拦截器02 onSend ===");
        // 做一些操作
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("=== 拦截器02 onAcknowledgement ===");
        if (null != exception) {
            // 此处应该记录日志等操作
            exception.printStackTrace();
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用拦截器

configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                "icu.wzk.model.Interceptor01,icu.wzk.model.Interceptor02"
        );

原理剖析

整体原理图

主线程

负责消息创建,拦截器,序列化器,分区器操作,并将消息追加到收集器。


RecordAccumulator:


消息收集器RecorderAccumulator每个分区维护一个Deque类型的双端队列

ProducerBatch可以理解为ProducerRecord集合,批量发送有利于提升吞吐量,降低网络影响

由于生产者客户端使用 ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用。该缓存池只针对特定大小的 ByteBuffer 进行管理,如果消息过大,不能做到重复利用。

每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入到该批次中。若可以写入则写入,若不可以写入则新建一个ProducerBatch。

该线程从消息收集器获取缓存的消息,将其处理 <Node, List> 形式,Node表示集群的Broker的节点。

进一步将 <Node, List> 转换为 <Node, Request> 形式,此时才可以向服务端发送数据。

在发送之前,Sender线程将消息以 Map<NodeId, Deque<Deque> 形式保存到 InFlightRequests中进行缓存。可以通过获取 leastLoadedNode,即当前Node中负载压力最小的一个,以实现消息的尽快发出。


目录
相关文章
|
20天前
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
2月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
69 5
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
160 0
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
42 3
|
2月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
62 3
|
2月前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
49 2
|
2月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
33 2
|
2月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
56 1
|
2月前
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
64 0
|
2月前
|
SQL 大数据
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
74 0