Apache Kafka - KIP-42: Add Producer and Consumer Interceptors

简介:

kafka 0.10.0.0 released

 

Interceptors的概念应该来自flume

参考,http://blog.csdn.net/xiao_jun_0820/article/details/38111305

比如,flume提供的

Timestamp Interceptor

Host Interceptor

Static Interceptor

Regex Filtering Interceptor

Regex Extractor Interceptor

可以对于流过的message进行一些包装,比如插入时间,host,或做些过滤等etl操作

 

所以kafka在producer和consumer端也都提供这样的Interceptors接口,

 

ProducerInterceptor

复制代码
/**
 * A plugin interface to allow things to intercept events happening to a producer record,
 * such as sending producer record or getting an acknowledgement when a record gets published
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called when client sends record to KafkaProducer, before key and value gets serialized.
     * @param record the record from client
     * @return record that is either original record passed to this method or new record with modified key and value.
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
 
    /**
     * This is called when the send has been acknowledged
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). The metadata information may be only partially filled, if an error occurred. Topic will be always set, and if partition is not -1, partition will be set partition set/assigned to this record.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
   
    /**
     * This is called when interceptor is closed
     */
    public void close();
}
复制代码

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned.

If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called.

多个multiple interceptors之间是可以串联的

ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread.

 

ConsumerInterceptor

复制代码
/**
 * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
 * by a client.
 */
public interface ConsumerInterceptor<K, V> extends Configurable {
    /**
     * This is called when the records are about to be returned to the client.
     * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
     * @return records that is either original 'records' passed to this method or modified set of records
     */
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
   
    /**
     * This is called when offsets get committed
     * This method will be called when the commit request sent to the server has been acknowledged.
     * @param offsets A map of the offsets and associated metadata that this callback applies to
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
   
    /**
     * This is called when interceptor is closed
     */
    public void close();
}
复制代码

onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread.

 

总结,

Interceptor作为一种plugin可以做些,对message的decorate或cleaning或filtering等一些轻量的工作,最主要的用途还是用于监控,trace message

Interceptor可以串联执行

Interceptor必须要轻量,因为如果耗时就会影响链路的throughput

 

confluent公司也提供相应的interceptor产品,用于data stream的监控

http://docs.confluent.io/3.0.0/control-center/docs/clients.html

 

同时,为了更好的监控和audit

Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32). 
We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit.

For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

public final class RecordMetadata {

private final long offset;

private final TopicPartition topicPartition;

private final long checksum;                <<== NEW: checksum of the record

private final int size;                     <<== NEW: record size in bytes(before compression)

 

public final class ConsumerRecord<K, V> {

.......

private final long checksum;               <<== NEW: checksum of the record

private final int size;                    <<== NEW: record size in bytes (after decompression)

相关文章
|
10月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
591 7
|
10月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
474 5
|
10月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
297 4
|
10月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
294 4
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
521 5
|
10月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
277 1
|
10月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
300 2
|
10月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
10月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
162 0

推荐镜像

更多