阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

简介: 阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

1. 背景及需求

链路:Flink -> sink kafka

需求:客户需求根据数据的特征,以自定义的逻辑,将不同的数据写到kafka不同的分区中

阿里云官方文档链接:https://help.aliyun.com/zh/flink/developer-reference/kafka-connector?spm=a2c4g.11186623.0.0.bc541be5ucyowS#section-oos-95x-usp

Flink社区官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

2. 需求实现

2.1. JAR作业

2.1.1. 实现分析

jar作业需要有一定的ds开发基础,在官网文档上有如下的介绍,可以通过 producer record来指定该数据的客户自定义分区逻辑。

image.png

通过查看ProducerRecord类的构造方法,我们可以看到在6个重载方法的参数中,partition参数指定了该数据结构写入的分区,具体实现参考:2.1.2

image.png

细心的同学可能会发现,阿里云官网上Flink SQL作业通过实现自定义的FlinkKafkaPartitioner来实现数据的自定义分区写入,那ds的jar作业是否也可以呢?

答案是肯定的,我们可以看到,继承自TwoPhaseCommitSinkFunction的FlinkKafkaProducer类也有很多重载的构造方法,我们可以看到*partitioner相关的构造方法,通过自定义实现FlinkKafkaPartitioner即可,具体实现参考:2.1.3及2.2的Flink SQL作业部分。

image.png

2.1.2. 通过KafkaSerializationSchema实现

publicclassKafkaSinkSimpleTest01 {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
// 做个print sink以便观察stream.print("print-test");
// 实现自定义的 KafkaSerializationSchemaKafkaSerializationSchema<String>serializationSchema=newKafkaSerializationSchema<String>() {
// 重写 serialize方法,并重新定义ProducerRecord,以指定数据的分区或者key@OverridepublicProducerRecord<byte[], byte[]>serialize(Stringelement, @NullableLongtimestamp) {
returnnewProducerRecord<byte[], byte[]>(
"sink_topic", // target topic11,  // partition"key-value".getBytes(StandardCharsets.UTF_8),  // keyelement.getBytes(StandardCharsets.UTF_8)); // record contents            }
        };
// 实例化 sinkfunctionFlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sink_topic",             // target topicserializationSchema,    // serialization schemaproperties,             // producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerancestream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.1.3. 通过FlinkKafkaPartitioner实现

publicclassKafkaSinkWithSelfDefPartitioner {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
stream.print("print-test");
// 在SerializationSchema的实现中不指定分区SerializationSchema<String>serializationSchema=newSerializationSchema<String>(
        ) {
@Overridepublicbyte[] serialize(Stringelement) {
returnelement.getBytes(StandardCharsets.UTF_8);
            }
        };
// 自定义FlinkKafkaPartitioner 根据record的值进行判断,写入不同的分区FlinkKafkaPartitioner<String>flinkKafkaPartitioner=newFlinkKafkaPartitioner<String>() {
@Overridepublicintpartition(Stringrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
// 主要是根据record的情况,更改这里的实现方法,返回kafka分区的integer类型即可if(record.equals("test")){
return6;
                }else{
return7;
                }
            }
        };
FlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sunyf_topic",
serializationSchema,
properties,
flinkKafkaPartitioner,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
2        );
stream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.2. Flink SQL作业

image.png

样例代码如下:

packageorg.example;
importorg.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
importorg.apache.flink.table.data.binary.BinaryRowData;
publicclassFlinkSQLKafkaSinkPartitionerextendsFlinkKafkaPartitioner {
@Overridepublicintpartition(Objectrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
BinaryRowDatabrData=null;
// 在flink sql中配置自定义partitioner record的实际类型是BinaryRowDataif (recordinstanceofBinaryRowData) {
brData= (BinaryRowData) record;
// 可以通过如下类的方法获取该行所有数据for (inti=0; i<brData.getArity(); i++) {
System.out.println("索引: "+i+" :"+brData.getString(i));
            }
Stringdt=brData.getString(2).toString();
intdtInt=Integer.parseInt(dt);
//        单数日期写5分区,偶数写4分区if (dtInt%2==0){
return4;
            }else{
return5;
            }
        } else {
thrownewIllegalArgumentException("Record is not of type BinaryRowData");
        }
    }
}

打包后上传:

image.png

在sql作业的sink中添加如下with

'sink.partitioner' = 'org.example.FlinkSQLKafkaSinkPartitioner'

image.png

整体样例SQL作业

CREATE TEMPORARY TABLE KafkaTable (  `key_id` string,  `a` STRING,  `b` string,  `c` string,  `ts` TIMESTAMP_LTZ(3) METADATA FROM'timestamp' VIRTUAL,  process_topic string metadata from'topic' VIRTUAL,  `partition` int METADATA VIRTUAL,  header MAP<string,string> METADATA from'headers' VIRTUAL,  `leader-epoch` int METADATA VIRTUAL,  `offset` int METADATA VIRTUAL,  `timestamp-type` string METADATA VIRTUAL
) WITH ('connector'='kafka','topic'='source_topic','properties.bootstrap.servers'='','properties.group.id'='group','scan.startup.mode'='earliest-offset','format'='csv','key.format'='csv','value.format'='csv','key.fields'='key_id','key.fields-prefix'='key_','value.fields-include'='EXCEPT_KEY');CREATE TEMPORARY TABLE kafka_sink (  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='kafka','topic'='sink_topic','properties.bootstrap.servers'='','properties.group.id'='group','format'='csv','properties.enable.idempotence'='false','sink.partitioner'='org.example.FlinkSQLKafkaSinkPartitioner');CREATE TEMPORARY TABLE print_sink(  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='print','print-identifier'='print666');BEGIN STATEMENT SET;insertinto print_sink
select a,b,c from KafkaTable
;insertinto kafka_sink
select a,b,c from KafkaTable
;END;

3. 踩坑记录

3.1. 本地idea调试外网读写kafka高安全模式

【问题现象】

大量kafka堆栈抛出的warning信息,报brocker失联,如下所示。

14:05:58,947 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-5] Bootstrap broker 121.40.72.21:9093 (id: -1 rack: null) disconnected

image.png

【问题原因及解决】

需要参考kafka客户端在properties里配置ssl,sasl以及jaas相关配置项,相关配置参数可以参考kafka官网。

  1. https://help.aliyun.com/zh/apsaramq-for-kafka/developer-reference/update-the-ssl-certificate-algorithm?spm=a2c4g.11186623.0.i26
  2. https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-java-demo/vpc-ssl/src/main/resources/mix.4096.client.truststore.jks
Propertiesproperties=newProperties();
properties.setProperty("bootstrap.servers", "ip:9093,ip:9093,ip:9093");
properties.setProperty("security.protocol", "SASL_SSL");
// SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径// Configure the path of truststore (CA) provided by the serverproperties.setProperty("ssl.truststore.location", "/file_path/mix.4096.client.truststore.jks");
properties.setProperty("ssl.truststore.password", "KafkaOnsClient");
// SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
// 配置 JAASproperties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

3.2. 下游丢数据 or 算子发送的数据与kafka group读取数据数量不一致

【问题现象】

  1. flink souce算子 records sent = 2
  2. metrics曲线显示source算子读取数据 = 4
  3. 指定kafka group在kafka topic中消费点位均达到最大点位,topic中消息共计4条
  4. 作业无任何异常

image.png

image.png

image.png

【问题原因及解决】

指定topic中2条消息key不为空,其余2条消息key为空,在source表中定义了key相关的字段,所以出现了source读取到4条数据(与曲线对应),但是source 算子并未向下游发送key为空的两条消息。

3.3. CORRUPT_MESSAGE

【问题现象】

大量warning报CORRUPT_MESSAGE,下游kafka没有数据,参考:https://www.alibabacloud.com/help/zh/apsaramq-for-kafka/developer-reference/what-do-i-do-if-an-error-is-reported-when-i-use-the-apsaramq-for-kafka-client

2024-01-17 14:43:52,094 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-1] Got error produce response with correlation id 40 on topic-partition sunyf_sink-0, retrying (2147483615 attempts left). Error: CORRUPT_MESSAGE

image.png

【问题原因及解决】

enable.idempotence 参数在不同vvr版本下默认值不同,vvr6 + 云kafka可以在不做参数修改的情况下正常生产数据,vvr8则需要手动指定为false。

由于云存储的云消息队列 Kafka 版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能,同时您需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1099 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
396 0
|
10月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
570 9
Flink在B站的大规模云原生实践
|
10月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
11月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
1091 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
314 11
|
11月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
666 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
11月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
633 9
网易游戏 Flink 云原生实践
|
12月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
1113 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~

相关产品

  • 实时计算 Flink版