阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

1. 背景及需求

链路:Flink -> sink kafka

需求:客户需求根据数据的特征,以自定义的逻辑,将不同的数据写到kafka不同的分区中

阿里云官方文档链接:https://help.aliyun.com/zh/flink/developer-reference/kafka-connector?spm=a2c4g.11186623.0.0.bc541be5ucyowS#section-oos-95x-usp

Flink社区官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

2. 需求实现

2.1. JAR作业

2.1.1. 实现分析

jar作业需要有一定的ds开发基础,在官网文档上有如下的介绍,可以通过 producer record来指定该数据的客户自定义分区逻辑。

image.png

通过查看ProducerRecord类的构造方法,我们可以看到在6个重载方法的参数中,partition参数指定了该数据结构写入的分区,具体实现参考:2.1.2

image.png

细心的同学可能会发现,阿里云官网上Flink SQL作业通过实现自定义的FlinkKafkaPartitioner来实现数据的自定义分区写入,那ds的jar作业是否也可以呢?

答案是肯定的,我们可以看到,继承自TwoPhaseCommitSinkFunction的FlinkKafkaProducer类也有很多重载的构造方法,我们可以看到*partitioner相关的构造方法,通过自定义实现FlinkKafkaPartitioner即可,具体实现参考:2.1.3及2.2的Flink SQL作业部分。

image.png

2.1.2. 通过KafkaSerializationSchema实现

publicclassKafkaSinkSimpleTest01 {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
// 做个print sink以便观察stream.print("print-test");
// 实现自定义的 KafkaSerializationSchemaKafkaSerializationSchema<String>serializationSchema=newKafkaSerializationSchema<String>() {
// 重写 serialize方法,并重新定义ProducerRecord,以指定数据的分区或者key@OverridepublicProducerRecord<byte[], byte[]>serialize(Stringelement, @NullableLongtimestamp) {
returnnewProducerRecord<byte[], byte[]>(
"sink_topic", // target topic11,  // partition"key-value".getBytes(StandardCharsets.UTF_8),  // keyelement.getBytes(StandardCharsets.UTF_8)); // record contents            }
        };
// 实例化 sinkfunctionFlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sink_topic",             // target topicserializationSchema,    // serialization schemaproperties,             // producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerancestream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.1.3. 通过FlinkKafkaPartitioner实现

publicclassKafkaSinkWithSelfDefPartitioner {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
stream.print("print-test");
// 在SerializationSchema的实现中不指定分区SerializationSchema<String>serializationSchema=newSerializationSchema<String>(
        ) {
@Overridepublicbyte[] serialize(Stringelement) {
returnelement.getBytes(StandardCharsets.UTF_8);
            }
        };
// 自定义FlinkKafkaPartitioner 根据record的值进行判断,写入不同的分区FlinkKafkaPartitioner<String>flinkKafkaPartitioner=newFlinkKafkaPartitioner<String>() {
@Overridepublicintpartition(Stringrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
// 主要是根据record的情况,更改这里的实现方法,返回kafka分区的integer类型即可if(record.equals("test")){
return6;
                }else{
return7;
                }
            }
        };
FlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sunyf_topic",
serializationSchema,
properties,
flinkKafkaPartitioner,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
2        );
stream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.2. Flink SQL作业

image.png

样例代码如下:

packageorg.example;
importorg.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
importorg.apache.flink.table.data.binary.BinaryRowData;
publicclassFlinkSQLKafkaSinkPartitionerextendsFlinkKafkaPartitioner {
@Overridepublicintpartition(Objectrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
BinaryRowDatabrData=null;
// 在flink sql中配置自定义partitioner record的实际类型是BinaryRowDataif (recordinstanceofBinaryRowData) {
brData= (BinaryRowData) record;
// 可以通过如下类的方法获取该行所有数据for (inti=0; i<brData.getArity(); i++) {
System.out.println("索引: "+i+" :"+brData.getString(i));
            }
Stringdt=brData.getString(2).toString();
intdtInt=Integer.parseInt(dt);
//        单数日期写5分区,偶数写4分区if (dtInt%2==0){
return4;
            }else{
return5;
            }
        } else {
thrownewIllegalArgumentException("Record is not of type BinaryRowData");
        }
    }
}

打包后上传:

image.png

在sql作业的sink中添加如下with

'sink.partitioner' = 'org.example.FlinkSQLKafkaSinkPartitioner'

image.png

整体样例SQL作业

CREATE TEMPORARY TABLE KafkaTable (  `key_id` string,  `a` STRING,  `b` string,  `c` string,  `ts` TIMESTAMP_LTZ(3) METADATA FROM'timestamp' VIRTUAL,  process_topic string metadata from'topic' VIRTUAL,  `partition` int METADATA VIRTUAL,  header MAP<string,string> METADATA from'headers' VIRTUAL,  `leader-epoch` int METADATA VIRTUAL,  `offset` int METADATA VIRTUAL,  `timestamp-type` string METADATA VIRTUAL
) WITH ('connector'='kafka','topic'='source_topic','properties.bootstrap.servers'='','properties.group.id'='group','scan.startup.mode'='earliest-offset','format'='csv','key.format'='csv','value.format'='csv','key.fields'='key_id','key.fields-prefix'='key_','value.fields-include'='EXCEPT_KEY');CREATE TEMPORARY TABLE kafka_sink (  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='kafka','topic'='sink_topic','properties.bootstrap.servers'='','properties.group.id'='group','format'='csv','properties.enable.idempotence'='false','sink.partitioner'='org.example.FlinkSQLKafkaSinkPartitioner');CREATE TEMPORARY TABLE print_sink(  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='print','print-identifier'='print666');BEGIN STATEMENT SET;insertinto print_sink
select a,b,c from KafkaTable
;insertinto kafka_sink
select a,b,c from KafkaTable
;END;

3. 踩坑记录

3.1. 本地idea调试外网读写kafka高安全模式

【问题现象】

大量kafka堆栈抛出的warning信息,报brocker失联,如下所示。

14:05:58,947 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-5] Bootstrap broker 121.40.72.21:9093 (id: -1 rack: null) disconnected

image.png

【问题原因及解决】

需要参考kafka客户端在properties里配置ssl,sasl以及jaas相关配置项,相关配置参数可以参考kafka官网。

  1. https://help.aliyun.com/zh/apsaramq-for-kafka/developer-reference/update-the-ssl-certificate-algorithm?spm=a2c4g.11186623.0.i26
  2. https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-java-demo/vpc-ssl/src/main/resources/mix.4096.client.truststore.jks
Propertiesproperties=newProperties();
properties.setProperty("bootstrap.servers", "ip:9093,ip:9093,ip:9093");
properties.setProperty("security.protocol", "SASL_SSL");
// SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径// Configure the path of truststore (CA) provided by the serverproperties.setProperty("ssl.truststore.location", "/file_path/mix.4096.client.truststore.jks");
properties.setProperty("ssl.truststore.password", "KafkaOnsClient");
// SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
// 配置 JAASproperties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

3.2. 下游丢数据 or 算子发送的数据与kafka group读取数据数量不一致

【问题现象】

  1. flink souce算子 records sent = 2
  2. metrics曲线显示source算子读取数据 = 4
  3. 指定kafka group在kafka topic中消费点位均达到最大点位,topic中消息共计4条
  4. 作业无任何异常

image.png

image.png

image.png

【问题原因及解决】

指定topic中2条消息key不为空,其余2条消息key为空,在source表中定义了key相关的字段,所以出现了source读取到4条数据(与曲线对应),但是source 算子并未向下游发送key为空的两条消息。

3.3. CORRUPT_MESSAGE

【问题现象】

大量warning报CORRUPT_MESSAGE,下游kafka没有数据,参考:https://www.alibabacloud.com/help/zh/apsaramq-for-kafka/developer-reference/what-do-i-do-if-an-error-is-reported-when-i-use-the-apsaramq-for-kafka-client

2024-01-17 14:43:52,094 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-1] Got error produce response with correlation id 40 on topic-partition sunyf_sink-0, retrying (2147483615 attempts left). Error: CORRUPT_MESSAGE

image.png

【问题原因及解决】

enable.idempotence 参数在不同vvr版本下默认值不同,vvr6 + 云kafka可以在不做参数修改的情况下正常生产数据,vvr8则需要手动指定为false。

由于云存储的云消息队列 Kafka 版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能,同时您需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
9天前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
136 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
1月前
|
消息中间件 运维 监控
爆款游戏背后:尚娱如何借助阿里云 Kafka Serverless 轻松驾驭“潮汐流量”?
阿里云 Kafka 不仅为尚娱提供了高可靠、低延迟的消息通道,更通过 Serverless 弹性架构实现了资源利用率和成本效益的双重优化,助力尚娱在快速迭代的游戏市场中实现敏捷运营、稳定交付与可持续增长。
119 21
|
1月前
|
消息中间件 存储 运维
嘉银科技基于阿里云 Kafka Serverless 提升业务弹性能力,节省成本超过 20%
云消息队列 Kafka 版 Serverless 系列凭借其秒级弹性扩展、按需付费、轻运维的优势,助力嘉银科技业务系统实现灵活扩缩容,在业务效率和成本优化上持续取得突破,保证服务的敏捷性和稳定性,并节省超过 20% 的成本。
134 24
消息中间件 存储 传感器
111 0
|
4月前
|
消息中间件 存储 大数据
阿里云消息队列 Kafka 架构及典型应用场景
阿里云消息队列 Kafka 是一款基于 Apache Kafka 的分布式消息中间件,支持消息发布与订阅模型,满足微服务解耦、大数据处理及实时流数据分析需求。其通过存算分离架构优化成本与性能,提供基础版、标准版和专业版三种 Serverless 版本,分别适用于不同业务场景,最高 SLA 达 99.99%。阿里云 Kafka 还具备弹性扩容、多可用区部署、冷热数据缓存隔离等特性,并支持与 Flink、MaxCompute 等生态工具无缝集成,广泛应用于用户行为分析、数据入库等场景,显著提升数据处理效率与实时性。
|
4月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
消息中间件 Kafka 流计算
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
402 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
9月前
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
607 0
Flink CDC 在阿里云实时计算Flink版的云上实践

相关产品

  • 实时计算 Flink版