【源码解读】Flink-Kafka中的序列器和分区器

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【源码解读】Flink-Kafka中的序列器和分区器

开篇导语

Flink将数据sink至Kafka的过程中,在初始化生产者对象FlinkKafkaProducer时通常会采用默认的分区器和序列化器,这样数据只会发送至指定Topic的某一个分区中。对于存在多分区的Topic我们一般要自定义分区器和序列化器,指定数据发送至不同分区的逻辑。

此篇博客所涉及的组件版本

Flink:1.10.0

Kafka:2.3.0

序列化器

在Kafka生产者将数据写入至Kafka集群中时,为了能够在网络中传输数据对象,需要先将数据进行序列化处理,对于初学者来说,在初始化生产者对象时,一般都会采用默认的序列化器。

默认的序列化器不会对数据进行任何操作,也不会生成key。如果我们需要指定数据的key或者在数据发送前进行一些定制化的操作,那么我们就需要自定义序列化器,并且在初始化生产者对象时指定我们自己的序列化器。

分区器

对于Kakfa中一个topic存在多个分区的情况下,我们怎么知道发送的数据会被分配到哪个分区呢,这时候就要通过分区器来进行区分。

在Kafka中,主要有以下四种数据分区策略

第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去

第二种分区策略:没有给定分区号,给定数据的key值,通过key取hashCode进行分区

第三种分区策略:既没有给定分区号,也没有给定key值,直接轮询进行分区

第四种分区策略:自定义分区

分区器就是以上分区策略的代码实现。

Flink中的Kafka序列化器

源码解读

在之前的Flink版中中,自定义Kafka序列化器都是实现KeyedSerializationSchema接口,看一下它的源码:

//表示当前接口已经不推荐使用
@Deprecated
@PublicEvolving
//当前接口实现时需要指定生产者所要传输的对象类型
public interface KeyedSerializationSchema<T> extends Serializable {
     //根据传入的对象生成key并序列化为字节数组
    //如果没有生成key,这个方法可返回null。
    byte[] serializeKey(T element);
    //根据传入的对象生成value并序列化为字节数组
    byte[] serializeValue(T element);
    //根据传入的对象指定需要发送的Topic
    //此方法可以返回null,因为在初始化生产者对象的时候就已经指定了Topic。
    String getTargetTopic(T element);
}

以上接口存在三个方法,但是每个输入的参数都是一样的,代码复用性低。

于是现在的Flink版本一般推荐实现KafkaSerializationSchema接口来实现序列化器,看一下它的源码:

//当前接口实现时需要指定生产者所要传输的对象类型
@PublicEvolving
public interface KafkaSerializationSchema<T> extends Serializable {
    /**
     * Serializes given element and returns it as a {@link ProducerRecord}.
     *
     * @param element element to be serialized
     * @param timestamp timestamp (can be null)
     * @return Kafka {@link ProducerRecord}
     */
    ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
}

可以看到当前接口只需要实现serialize方法,根据传入的对象构造并返回一个ProducerRecord对象。

我们来看一下ProducerRecord类的源码:

package org.apache.kafka.clients.producer;
import java.util.Objects;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null) {
            throw new IllegalArgumentException("Topic cannot be null.");
        } else if (timestamp != null && timestamp < 0L) {
            throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        } else if (partition != null && partition < 0) {
            throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        } else {
            this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
            this.headers = new RecordHeaders(headers);
        }
    }
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, (Iterable)null);
    }
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, (Long)null, key, value, headers);
    }
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, (Long)null, key, value, (Iterable)null);
    }
    public ProducerRecord(String topic, K key, V value) {
        this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
    }
    public ProducerRecord(String topic, V value) {
        this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
    }
    public String topic() {
        return this.topic;
    }
    public Headers headers() {
        return this.headers;
    }
    public K key() {
        return this.key;
    }
    public V value() {
        return this.value;
    }
    public Long timestamp() {
        return this.timestamp;
    }
    public Integer partition() {
        return this.partition;
    }
    public String toString() {
        String headers = this.headers == null ? "null" : this.headers.toString();
        String key = this.key == null ? "null" : this.key.toString();
        String value = this.value == null ? "null" : this.value.toString();
        String timestamp = this.timestamp == null ? "null" : this.timestamp.toString();
        return "ProducerRecord(topic=" + this.topic + ", partition=" + this.partition + ", headers=" + headers + ", key=" + key + ", value=" + value + ", timestamp=" + timestamp + ")";
    }
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        } else if (!(o instanceof ProducerRecord)) {
            return false;
        } else {
            ProducerRecord<?, ?> that = (ProducerRecord)o;
            return Objects.equals(this.key, that.key) && Objects.equals(this.partition, that.partition) && Objects.equals(this.topic, that.topic) && Objects.equals(this.headers, that.headers) && Objects.equals(this.value, that.value) && Objects.equals(this.timestamp, that.timestamp);
        }
    }
    public int hashCode() {
        int result = this.topic != null ? this.topic.hashCode() : 0;
        result = 31 * result + (this.partition != null ? this.partition.hashCode() : 0);
        result = 31 * result + (this.headers != null ? this.headers.hashCode() : 0);
        result = 31 * result + (this.key != null ? this.key.hashCode() : 0);
        result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
        result = 31 * result + (this.timestamp != null ? this.timestamp.hashCode() : 0);
        return result;
    }
}

可以看到ProducerRecord在初始化时需要以下参数:

String topic;//分区名称,不可以为空

Integer partition;//当前记录需要写入的分区值,可以为空

Headers headers;//kafka头信息,可以为空

K key;//当前记录的key,可以为空

V value;//当前记录的实际value,不可以为空

Long timestamp;//指定生产者创建当前记录的时间戳,可以为空

在ProducerRecord的多个重构的构造函数中,参数最少的一个只需要传入topic名称和value即可。其他值都可为空。

public ProducerRecord(String topic, V value) {
        this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
    }

自定义序列化器示例

基于上述知识,我们可以通过实现KafkaSerializationSchema自定义序列化器,以下是一个最简单的自定义序列化器

package lenrnflink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
public class Test {
    //实现KafkaSerializationSchema接口来自定义序列化器,传入的参数设定为String类型。
    public static class MySerializationSchema implements KafkaSerializationSchema<String> {
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
            //返回一条记录,指定topic为test,分区为0,key为null,value为传入对象转化而成的字节数组。
            return new ProducerRecord<>("test",0,null,element.getBytes());
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092,cdh4:9092,cdh5:9092");
        //初始化Flink-Kafka生产者,并将自定义序列化器作为参数传递。
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<String>("test",new MySerializationSchema(),properties,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
        DataStream<String> dataStream = environment.readTextFile("E:/test.txt","GB2312");
        dataStream.addSink(kafkaSink);
        environment.execute("WordCount");
    }
}

以上只是一个最简单的自定义序列化器,用户在自己编写的时候,可以结合业务逻辑,根据传递过来的对象,在序列化器中指定其topic,partition,key和value等。

Flink中的Kafka分区器

源码解读

在Flink中,自定义Kafka分区器需要继承FlinkKafkaPartitioner抽象类,看一下源码:

@PublicEvolving
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
    private static final long serialVersionUID = -9086719227828020494L;
     //当前方法接收上游传过来的并行实例ID和并行实例总数。
     //不是抽象方法,可以不用在子类实现。
    public void open(int parallelInstanceId, int parallelInstances) {
        // overwrite this method if needed.
    }
    /**
     * @param record the record value
     * @param key serialized key of the record
     * @param value serialized value of the record
     * @param targetTopic target topic for the record
     * @param partitions found partitions for the target topic
     * @return the id of the target partition
     */
     //当前方法为抽象方法,需要在子类中实现其方法体。指定其具体的分区。
    public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}

此类主要为open方法用于接收上游传过来的并行实例ID和总数。和partition抽象方法,进行指定分区的具体操作。

Flink中根据此实现了一个默认的分区器FlinkFixedPartitioner,看其源码:

@PublicEvolving
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private static final long serialVersionUID = -3785320239953858777L;
    private int parallelInstanceId;
    @Override
    public void open(int parallelInstanceId, int parallelInstances) {
        Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
        Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
        //接收上游传过来的并行实例ID,并将值传递给成员变量,用于分区操作。
        this.parallelInstanceId = parallelInstanceId;
    }
    @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(
            partitions != null && partitions.length > 0,
            "Partitions of the target topic is empty.");
        //通过Flink并行实例的id去和Kafka分区的数量取余来决定这个实例的数据写到哪个Kafka分区
        return partitions[parallelInstanceId % partitions.length];
    }
    @Override
    public boolean equals(Object o) {
        return this == o || o instanceof FlinkFixedPartitioner;
    }
    @Override
    public int hashCode() {
        return FlinkFixedPartitioner.class.hashCode();
    }
}

通过源码可以看出,当前分区器主要是通过Flink并行实例的id和Kafka分区的数量取余来决定这个实例的数据写到哪个Kafka分区,并且一个实例只写Kafka中的一个分区。

这样做的好处最大限度的利用了Flink和Kafka的可扩展性,提高数据处理效率。

自定义分区器示例

package lenrnflink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Optional;
import java.util.Properties;
public class Test {
    //继承FlinkKafkaPartitioner自定义分区器
    public static class KafkaPartitioner extends FlinkKafkaPartitioner<String> {
        @Override
        //当前分区器的逻辑为,如果传入的对象开头为1,则分区的值返回1,开头为2,则分区的值返回2,其他则返回0。
        public int partition(String s, byte[] bytes, byte[] bytes1, String s2, int[] ints) {
            if(s.startsWith("1")){
                return 1;
            }
            if(s.startsWith("2")){
                return 2;
            }
            return 0;
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092,cdh4:9092,cdh5:9092");
        //必须通过Optional容器添加自定义分区器。
        Optional<FlinkKafkaPartitioner> ps = Optional.of(new KafkaPartitioner());
        //初始化Flink-kafka生产者时,指定自定义分区器。
        FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer("test",new SimpleStringSchema(), properties, ps);
        DataStream<String> dataStream = environment.readTextFile("E:/test.txt","GB2312");
        dataStream.addSink(kafkaSink);
        environment.execute("WordCount");
    }
}

以上为一个非常简单的自定义分区器,用户可根据业务需要制定一个属于自己的分区器。

结束语

以下为FlinkKafkaProducer源码

/** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList), (Optional)Optional.of(new FlinkFixedPartitioner()));
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (Optional)Optional.of(new FlinkFixedPartitioner()));
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (Optional)customPartitioner);
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
        this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), Optional.of(new FlinkFixedPartitioner()));
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
        this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()));
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) {
        this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()), semantic, 5);
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
        this(defaultTopicId, serializationSchema, producerConfig, customPartitioner, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
    }
    /** @deprecated */
    @Deprecated
    public FlinkKafkaProducer(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) {
        this(defaultTopicId, serializationSchema, (FlinkKafkaPartitioner)customPartitioner.orElse((Object)null), (KafkaSerializationSchema)null, producerConfig, semantic, kafkaProducersPoolSize);
    }
    public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) {
        this(defaultTopic, serializationSchema, producerConfig, semantic, 5);
    }
    public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) {
        this(defaultTopic, (KeyedSerializationSchema)null, (FlinkKafkaPartitioner)null, serializationSchema, producerConfig, semantic, kafkaProducersPoolSize);
    }

在阅读Flink中的Kafka生产者源码FlinkKafkaProducer时发现其多个构造函数,凡是参数中包含FlinkKafkaProducer的都被标记为了deprecated,说明官方已经不推荐使用自定义分区器来进行数据的分区操作。

只有参数包含KafkaSerializationSchema的两个构造函数是正常的,说明现在官方推荐使用KafkaSerializationSchema接口来进行序列化的操作。

并且阅读源码的过程中可以发现,KafkaSerializationSchema中也有对数据的分区操作。只需要结合KafkaContextAware接口即可实现获取Flink并行实例ID和数量的功能。

相关文章
|
3月前
|
消息中间件 分布式计算 算法
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
62 5
大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举
|
3月前
|
消息中间件 SQL 分布式计算
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
大数据-64 Kafka 高级特性 分区Partition 分区重新分配 实机实测重分配
126 7
|
2月前
|
消息中间件 负载均衡 Kafka
【赵渝强老师】Kafka的主题与分区
Kafka 中的消息按主题分类,生产者发送消息到特定主题,消费者订阅主题消费。主题可分多个分区,每个分区仅属一个主题。消息追加到分区时,Broker 分配唯一偏移量地址,确保消息在分区内的顺序性。Kafka 保证分区有序而非主题有序。示例中,Topic A 有 3 个分区,分区可分布于不同 Broker 上,支持负载均衡和容错。视频讲解及图示详见原文。
|
2月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
3月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
2月前
|
消息中间件 Kafka
【赵渝强老师】Kafka分区的副本机制
在Kafka中,每个主题可有多个分区,每个分区有多个副本。其中仅有一个副本为Leader,负责对外服务,其余为Follower。当Leader所在Broker宕机时,Follower可被选为新的Leader,实现高可用。文中附有示意图及视频讲解。
|
5月前
|
消息中间件 负载均衡 Kafka
Kafka分区分配策略大揭秘:RoundRobin、Range、Sticky,你真的了解它们吗?
【8月更文挑战第24天】Kafka是一款突出高吞吐量、可扩展性和数据持久性的分布式流处理平台。其核心特性之一是分区分配策略,对于实现系统的负载均衡和高可用性至关重要。Kafka支持三种主要的分区分配策略:RoundRobin(轮询)、Range(范围)和Sticky(粘性)。RoundRobin策略通过轮询方式均衡分配分区;Range策略根据主题分区数和消费者数量分配;而Sticky策略则在保持原有分配的基础上动态调整,以确保各消费者负载均衡。理解这些策略有助于优化Kafka性能并满足不同业务场景需求。
403 59
|
3月前
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
84 4
|
3月前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
59 3
|
3月前
|
消息中间件 JSON 大数据
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
大数据-66 Kafka 高级特性 分区Partition 副本因子Replication Factor replicas动态修改 线上动态修改副本数
60 1