阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录

1. 背景及需求

链路:Flink -> sink kafka

需求:客户需求根据数据的特征,以自定义的逻辑,将不同的数据写到kafka不同的分区中

阿里云官方文档链接:https://help.aliyun.com/zh/flink/developer-reference/kafka-connector?spm=a2c4g.11186623.0.0.bc541be5ucyowS#section-oos-95x-usp

Flink社区官方链接:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

2. 需求实现

2.1. JAR作业

2.1.1. 实现分析

jar作业需要有一定的ds开发基础,在官网文档上有如下的介绍,可以通过 producer record来指定该数据的客户自定义分区逻辑。

image.png

通过查看ProducerRecord类的构造方法,我们可以看到在6个重载方法的参数中,partition参数指定了该数据结构写入的分区,具体实现参考:2.1.2

image.png

细心的同学可能会发现,阿里云官网上Flink SQL作业通过实现自定义的FlinkKafkaPartitioner来实现数据的自定义分区写入,那ds的jar作业是否也可以呢?

答案是肯定的,我们可以看到,继承自TwoPhaseCommitSinkFunction的FlinkKafkaProducer类也有很多重载的构造方法,我们可以看到*partitioner相关的构造方法,通过自定义实现FlinkKafkaPartitioner即可,具体实现参考:2.1.3及2.2的Flink SQL作业部分。

image.png

2.1.2. 通过KafkaSerializationSchema实现

publicclassKafkaSinkSimpleTest01 {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
// 做个print sink以便观察stream.print("print-test");
// 实现自定义的 KafkaSerializationSchemaKafkaSerializationSchema<String>serializationSchema=newKafkaSerializationSchema<String>() {
// 重写 serialize方法,并重新定义ProducerRecord,以指定数据的分区或者key@OverridepublicProducerRecord<byte[], byte[]>serialize(Stringelement, @NullableLongtimestamp) {
returnnewProducerRecord<byte[], byte[]>(
"sink_topic", // target topic11,  // partition"key-value".getBytes(StandardCharsets.UTF_8),  // keyelement.getBytes(StandardCharsets.UTF_8)); // record contents            }
        };
// 实例化 sinkfunctionFlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sink_topic",             // target topicserializationSchema,    // serialization schemaproperties,             // producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerancestream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.1.3. 通过FlinkKafkaPartitioner实现

publicclassKafkaSinkWithSelfDefPartitioner {
publicstaticvoidmain(String[] args) throwsException {
// set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
// 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周");
stream.print("print-test");
// 在SerializationSchema的实现中不指定分区SerializationSchema<String>serializationSchema=newSerializationSchema<String>(
        ) {
@Overridepublicbyte[] serialize(Stringelement) {
returnelement.getBytes(StandardCharsets.UTF_8);
            }
        };
// 自定义FlinkKafkaPartitioner 根据record的值进行判断,写入不同的分区FlinkKafkaPartitioner<String>flinkKafkaPartitioner=newFlinkKafkaPartitioner<String>() {
@Overridepublicintpartition(Stringrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
// 主要是根据record的情况,更改这里的实现方法,返回kafka分区的integer类型即可if(record.equals("test")){
return6;
                }else{
return7;
                }
            }
        };
FlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>(
"sunyf_topic",
serializationSchema,
properties,
flinkKafkaPartitioner,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
2        );
stream.addSink(myProducer);
// execute programenv.execute("Flink Streaming Java API Skeleton");
    }
}

2.2. Flink SQL作业

image.png

样例代码如下:

packageorg.example;
importorg.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
importorg.apache.flink.table.data.binary.BinaryRowData;
publicclassFlinkSQLKafkaSinkPartitionerextendsFlinkKafkaPartitioner {
@Overridepublicintpartition(Objectrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) {
BinaryRowDatabrData=null;
// 在flink sql中配置自定义partitioner record的实际类型是BinaryRowDataif (recordinstanceofBinaryRowData) {
brData= (BinaryRowData) record;
// 可以通过如下类的方法获取该行所有数据for (inti=0; i<brData.getArity(); i++) {
System.out.println("索引: "+i+" :"+brData.getString(i));
            }
Stringdt=brData.getString(2).toString();
intdtInt=Integer.parseInt(dt);
//        单数日期写5分区,偶数写4分区if (dtInt%2==0){
return4;
            }else{
return5;
            }
        } else {
thrownewIllegalArgumentException("Record is not of type BinaryRowData");
        }
    }
}

打包后上传:

image.png

在sql作业的sink中添加如下with

'sink.partitioner' = 'org.example.FlinkSQLKafkaSinkPartitioner'

image.png

整体样例SQL作业

CREATE TEMPORARY TABLE KafkaTable (  `key_id` string,  `a` STRING,  `b` string,  `c` string,  `ts` TIMESTAMP_LTZ(3) METADATA FROM'timestamp' VIRTUAL,  process_topic string metadata from'topic' VIRTUAL,  `partition` int METADATA VIRTUAL,  header MAP<string,string> METADATA from'headers' VIRTUAL,  `leader-epoch` int METADATA VIRTUAL,  `offset` int METADATA VIRTUAL,  `timestamp-type` string METADATA VIRTUAL
) WITH ('connector'='kafka','topic'='source_topic','properties.bootstrap.servers'='','properties.group.id'='group','scan.startup.mode'='earliest-offset','format'='csv','key.format'='csv','value.format'='csv','key.fields'='key_id','key.fields-prefix'='key_','value.fields-include'='EXCEPT_KEY');CREATE TEMPORARY TABLE kafka_sink (  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='kafka','topic'='sink_topic','properties.bootstrap.servers'='','properties.group.id'='group','format'='csv','properties.enable.idempotence'='false','sink.partitioner'='org.example.FlinkSQLKafkaSinkPartitioner');CREATE TEMPORARY TABLE print_sink(  `a` STRING,  `b` string,  `c` string
) WITH ('connector'='print','print-identifier'='print666');BEGIN STATEMENT SET;insertinto print_sink
select a,b,c from KafkaTable
;insertinto kafka_sink
select a,b,c from KafkaTable
;END;

3. 踩坑记录

3.1. 本地idea调试外网读写kafka高安全模式

【问题现象】

大量kafka堆栈抛出的warning信息,报brocker失联,如下所示。

14:05:58,947 WARN  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-5] Bootstrap broker 121.40.72.21:9093 (id: -1 rack: null) disconnected

image.png

【问题原因及解决】

需要参考kafka客户端在properties里配置ssl,sasl以及jaas相关配置项,相关配置参数可以参考kafka官网。

  1. https://help.aliyun.com/zh/apsaramq-for-kafka/developer-reference/update-the-ssl-certificate-algorithm?spm=a2c4g.11186623.0.i26
  2. https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-java-demo/vpc-ssl/src/main/resources/mix.4096.client.truststore.jks
Propertiesproperties=newProperties();
properties.setProperty("bootstrap.servers", "ip:9093,ip:9093,ip:9093");
properties.setProperty("security.protocol", "SASL_SSL");
// SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径// Configure the path of truststore (CA) provided by the serverproperties.setProperty("ssl.truststore.location", "/file_path/mix.4096.client.truststore.jks");
properties.setProperty("ssl.truststore.password", "KafkaOnsClient");
// SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256properties.setProperty("sasl.mechanism", "PLAIN");
properties.setProperty("ssl.endpoint.identification.algorithm", "");
// 配置 JAASproperties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

3.2. 下游丢数据 or 算子发送的数据与kafka group读取数据数量不一致

【问题现象】

  1. flink souce算子 records sent = 2
  2. metrics曲线显示source算子读取数据 = 4
  3. 指定kafka group在kafka topic中消费点位均达到最大点位,topic中消息共计4条
  4. 作业无任何异常

image.png

image.png

image.png

【问题原因及解决】

指定topic中2条消息key不为空,其余2条消息key为空,在source表中定义了key相关的字段,所以出现了source读取到4条数据(与曲线对应),但是source 算子并未向下游发送key为空的两条消息。

3.3. CORRUPT_MESSAGE

【问题现象】

大量warning报CORRUPT_MESSAGE,下游kafka没有数据,参考:https://www.alibabacloud.com/help/zh/apsaramq-for-kafka/developer-reference/what-do-i-do-if-an-error-is-reported-when-i-use-the-apsaramq-for-kafka-client

2024-01-17 14:43:52,094 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-1] Got error produce response with correlation id 40 on topic-partition sunyf_sink-0, retrying (2147483615 attempts left). Error: CORRUPT_MESSAGE

image.png

【问题原因及解决】

enable.idempotence 参数在不同vvr版本下默认值不同,vvr6 + 云kafka可以在不做参数修改的情况下正常生产数据,vvr8则需要手动指定为false。

由于云存储的云消息队列 Kafka 版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能,同时您需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
110 4
|
1天前
|
SQL 存储 Apache
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
150 3
基于 Flink 进行增量批计算的探索与实践
|
15天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
334 2
探索Flink动态CEP:杭州银行的实战案例
|
8天前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
47 14
|
29天前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
3月前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
120 9
|
3月前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
83 5
|
3月前
|
运维 监控 安全
实时计算Flink场景实践和核心功能体验
实时计算Flink场景实践和核心功能体验

相关产品

  • 实时计算 Flink版