阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

1. 背景及需求

链路:Flink -> sink kafka

需求:客户需求根据数据的特征,以自定义的逻辑,将不同的数据写到kafka不同的分区中

阿里云官方文档链接:https://help.aliyun.com/zh/flink/developer-reference/kafka-connector?spm=a2c4g.11186623.0.0.bc541be5ucyowS#section-oos-95x-usp

Flink社区官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

2. 需求实现

2.1. JAR作业

2.1.1. 实现分析

jar作业需要有一定的ds开发基础,在官网文档上有如下的介绍,可以通过 producer record来指定该数据的客户自定义分区逻辑。

image.png

通过查看ProducerRecord类的构造方法,我们可以看到在6个重载方法的参数中,partition参数指定了该数据结构写入的分区,具体实现参考:2.1.2

image.png

细心的同学可能会发现,阿里云官网上Flink SQL作业通过实现自定义的FlinkKafkaPartitioner来实现数据的自定义分区写入,那ds的jar作业是否也可以呢?

答案是肯定的,我们可以看到,继承自TwoPhaseCommitSinkFunction的FlinkKafkaProducer类也有很多重载的构造方法,我们可以看到*partitioner相关的构造方法,通过自定义实现FlinkKafkaPartitioner即可,具体实现参考:2.1.3及2.2的Flink SQL作业部分。

image.png

2.1.2. 通过KafkaSerializationSchema实现

publicclassKafkaSinkSimpleTest01 {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
// 做个print sink以便观察stream.print("print-test");
// 实现自定义的 KafkaSerializationSchemaKafkaSerializationSchema<String>serializationSchema=newKafkaSerializationSchema<String>() {
// 重写 serialize方法,并重新定义ProducerRecord,以指定数据的分区或者key@OverridepublicProducerRecord<byte[], byte[]>serialize(Stringelement, @NullableLongtimestamp) {
returnnewProducerRecord<byte[], byte[]>(
"sink_topic", // target topic11,  // partition"key-value".getBytes(StandardCharsets.UTF_8),  // keyelement.getBytes(StandardCharsets.UTF_8)); // record contents            }
        };
// 实例化 sinkfunctionFlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sink_topic",             // target topicserializationSchema,    // serialization schemaproperties,             // producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerancestream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.1.3. 通过FlinkKafkaPartitioner实现

publicclassKafkaSinkWithSelfDefPartitioner {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
stream.print("print-test");
// 在SerializationSchema的实现中不指定分区SerializationSchema<String>serializationSchema=newSerializationSchema<String>(
        ) {
@Overridepublicbyte[] serialize(Stringelement) {
returnelement.getBytes(StandardCharsets.UTF_8);
            }
        };
// 自定义FlinkKafkaPartitioner 根据record的值进行判断,写入不同的分区FlinkKafkaPartitioner<String>flinkKafkaPartitioner=newFlinkKafkaPartitioner<String>() {
@Overridepublicintpartition(Stringrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
// 主要是根据record的情况,更改这里的实现方法,返回kafka分区的integer类型即可if(record.equals("test")){
return6;
                }else{
return7;
                }
            }
        };
FlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sunyf_topic",
serializationSchema,
properties,
flinkKafkaPartitioner,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
2        );
stream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.2. Flink SQL作业

image.png

样例代码如下:

packageorg.example;
importorg.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
importorg.apache.flink.table.data.binary.BinaryRowData;
publicclassFlinkSQLKafkaSinkPartitionerextendsFlinkKafkaPartitioner {
@Overridepublicintpartition(Objectrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
BinaryRowDatabrData=null;
// 在flink sql中配置自定义partitioner record的实际类型是BinaryRowDataif (recordinstanceofBinaryRowData) {
brData= (BinaryRowData) record;
// 可以通过如下类的方法获取该行所有数据for (inti=0; i<brData.getArity(); i++) {
System.out.println("索引: "+i+" :"+brData.getString(i));
            }
Stringdt=brData.getString(2).toString();
intdtInt=Integer.parseInt(dt);
//        单数日期写5分区,偶数写4分区if (dtInt%2==0){
return4;
            }else{
return5;
            }
        } else {
thrownewIllegalArgumentException("Record is not of type BinaryRowData");
        }
    }
}

打包后上传:

image.png

在sql作业的sink中添加如下with

'sink.partitioner' = 'org.example.FlinkSQLKafkaSinkPartitioner'

image.png

整体样例SQL作业

CREATE TEMPORARY TABLE KafkaTable (  `key_id` string,  `a` STRING,  `b` string,  `c` string,  `ts` TIMESTAMP_LTZ(3) METADATA FROM'timestamp' VIRTUAL,  process_topic string metadata from'topic' VIRTUAL,  `partition` int METADATA VIRTUAL,  header MAP<string,string> METADATA from'headers' VIRTUAL,  `leader-epoch` int METADATA VIRTUAL,  `offset` int METADATA VIRTUAL,  `timestamp-type` string METADATA VIRTUAL
) WITH ('connector'='kafka','topic'='source_topic','properties.bootstrap.servers'='','properties.group.id'='group','scan.startup.mode'='earliest-offset','format'='csv','key.format'='csv','value.format'='csv','key.fields'='key_id','key.fields-prefix'='key_','value.fields-include'='EXCEPT_KEY');CREATE TEMPORARY TABLE kafka_sink (  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='kafka','topic'='sink_topic','properties.bootstrap.servers'='','properties.group.id'='group','format'='csv','properties.enable.idempotence'='false','sink.partitioner'='org.example.FlinkSQLKafkaSinkPartitioner');CREATE TEMPORARY TABLE print_sink(  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='print','print-identifier'='print666');BEGIN STATEMENT SET;insertinto print_sink
select a,b,c from KafkaTable
;insertinto kafka_sink
select a,b,c from KafkaTable
;END;

3. 踩坑记录

3.1. 本地idea调试外网读写kafka高安全模式

【问题现象】

大量kafka堆栈抛出的warning信息,报brocker失联,如下所示。

14:05:58,947 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-5] Bootstrap broker 121.40.72.21:9093 (id: -1 rack: null) disconnected

image.png

【问题原因及解决】

需要参考kafka客户端在properties里配置ssl,sasl以及jaas相关配置项,相关配置参数可以参考kafka官网。

  1. https://help.aliyun.com/zh/apsaramq-for-kafka/developer-reference/update-the-ssl-certificate-algorithm?spm=a2c4g.11186623.0.i26
  2. https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-java-demo/vpc-ssl/src/main/resources/mix.4096.client.truststore.jks
Propertiesproperties=newProperties();
properties.setProperty("bootstrap.servers", "ip:9093,ip:9093,ip:9093");
properties.setProperty("security.protocol", "SASL_SSL");
// SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径// Configure the path of truststore (CA) provided by the serverproperties.setProperty("ssl.truststore.location", "/file_path/mix.4096.client.truststore.jks");
properties.setProperty("ssl.truststore.password", "KafkaOnsClient");
// SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
// 配置 JAASproperties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

3.2. 下游丢数据 or 算子发送的数据与kafka group读取数据数量不一致

【问题现象】

  1. flink souce算子 records sent = 2
  2. metrics曲线显示source算子读取数据 = 4
  3. 指定kafka group在kafka topic中消费点位均达到最大点位,topic中消息共计4条
  4. 作业无任何异常

image.png

image.png

image.png

【问题原因及解决】

指定topic中2条消息key不为空,其余2条消息key为空,在source表中定义了key相关的字段,所以出现了source读取到4条数据(与曲线对应),但是source 算子并未向下游发送key为空的两条消息。

3.3. CORRUPT_MESSAGE

【问题现象】

大量warning报CORRUPT_MESSAGE,下游kafka没有数据,参考:https://www.alibabacloud.com/help/zh/apsaramq-for-kafka/developer-reference/what-do-i-do-if-an-error-is-reported-when-i-use-the-apsaramq-for-kafka-client

2024-01-17 14:43:52,094 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-1] Got error produce response with correlation id 40 on topic-partition sunyf_sink-0, retrying (2147483615 attempts left). Error: CORRUPT_MESSAGE

image.png

【问题原因及解决】

enable.idempotence 参数在不同vvr版本下默认值不同,vvr6 + 云kafka可以在不做参数修改的情况下正常生产数据,vvr8则需要手动指定为false。

由于云存储的云消息队列 Kafka 版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能,同时您需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
186 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
101 0
|
2月前
|
消息中间件 分布式计算 算法
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
56 3
|
2月前
|
消息中间件 缓存 分布式计算
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析
37 2
|
2月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
56 3
|
2月前
|
消息中间件 存储 分布式计算
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
26 1
|
2月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
193 0
|
2月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
48 0
|
7月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用合集之支持sink到多分区的kafka ,还能保持有序吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 Java Kafka
Flink的sink实战之二:kafka
实践如何将flink数据集sink到kafka
311 0
Flink的sink实战之二:kafka

相关产品

  • 实时计算 Flink版
  • 下一篇
    DataWorks