1. 背景及需求
链路:Flink -> sink kafka
需求:客户需求根据数据的特征,以自定义的逻辑,将不同的数据写到kafka不同的分区中
2. 需求实现
2.1. JAR作业
2.1.1. 实现分析
jar作业需要有一定的ds开发基础,在官网文档上有如下的介绍,可以通过 producer record来指定该数据的客户自定义分区逻辑。
通过查看ProducerRecord类的构造方法,我们可以看到在6个重载方法的参数中,partition参数指定了该数据结构写入的分区,具体实现参考:2.1.2
细心的同学可能会发现,阿里云官网上Flink SQL作业通过实现自定义的FlinkKafkaPartitioner来实现数据的自定义分区写入,那ds的jar作业是否也可以呢?
答案是肯定的,我们可以看到,继承自TwoPhaseCommitSinkFunction的FlinkKafkaProducer类也有很多重载的构造方法,我们可以看到*partitioner相关的构造方法,通过自定义实现FlinkKafkaPartitioner即可,具体实现参考:2.1.3及2.2的Flink SQL作业部分。
2.1.2. 通过KafkaSerializationSchema实现
publicclassKafkaSinkSimpleTest01 { publicstaticvoidmain(String[] args) throwsException { // set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); // 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周"); // 做个print sink以便观察stream.print("print-test"); // 实现自定义的 KafkaSerializationSchemaKafkaSerializationSchema<String>serializationSchema=newKafkaSerializationSchema<String>() { // 重写 serialize方法,并重新定义ProducerRecord,以指定数据的分区或者keypublicProducerRecord<byte[], byte[]>serialize(Stringelement, Longtimestamp) { returnnewProducerRecord<byte[], byte[]>( "sink_topic", // target topic11, // partition"key-value".getBytes(StandardCharsets.UTF_8), // keyelement.getBytes(StandardCharsets.UTF_8)); // record contents } }; // 实例化 sinkfunctionFlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>( "sink_topic", // target topicserializationSchema, // serialization schemaproperties, // producer configFlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerancestream.addSink(myProducer); // execute programenv.execute("Flink Streaming Java API Skeleton"); } }
2.1.3. 通过FlinkKafkaPartitioner实现
publicclassKafkaSinkWithSelfDefPartitioner { publicstaticvoidmain(String[] args) throwsException { // set up the streaming execution environmentfinalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); // 简单添加几条string类型的数据DataStreamSource<String>stream=env.fromElements("赵", "钱", "孙", "李", "周"); stream.print("print-test"); // 在SerializationSchema的实现中不指定分区SerializationSchema<String>serializationSchema=newSerializationSchema<String>( ) { publicbyte[] serialize(Stringelement) { returnelement.getBytes(StandardCharsets.UTF_8); } }; // 自定义FlinkKafkaPartitioner 根据record的值进行判断,写入不同的分区FlinkKafkaPartitioner<String>flinkKafkaPartitioner=newFlinkKafkaPartitioner<String>() { publicintpartition(Stringrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) { // 主要是根据record的情况,更改这里的实现方法,返回kafka分区的integer类型即可if(record.equals("test")){ return6; }else{ return7; } } }; FlinkKafkaProducer<String>myProducer=newFlinkKafkaProducer<>( "sunyf_topic", serializationSchema, properties, flinkKafkaPartitioner, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 2 ); stream.addSink(myProducer); // execute programenv.execute("Flink Streaming Java API Skeleton"); } }
2.2. Flink SQL作业
样例代码如下:
packageorg.example; importorg.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; importorg.apache.flink.table.data.binary.BinaryRowData; publicclassFlinkSQLKafkaSinkPartitionerextendsFlinkKafkaPartitioner { publicintpartition(Objectrecord, byte[] key, byte[] value, StringtargetTopic, int[] partitions) { BinaryRowDatabrData=null; // 在flink sql中配置自定义partitioner record的实际类型是BinaryRowDataif (recordinstanceofBinaryRowData) { brData= (BinaryRowData) record; // 可以通过如下类的方法获取该行所有数据for (inti=0; i<brData.getArity(); i++) { System.out.println("索引: "+i+" :"+brData.getString(i)); } Stringdt=brData.getString(2).toString(); intdtInt=Integer.parseInt(dt); // 单数日期写5分区,偶数写4分区if (dtInt%2==0){ return4; }else{ return5; } } else { thrownewIllegalArgumentException("Record is not of type BinaryRowData"); } } }
打包后上传:
在sql作业的sink中添加如下with
'sink.partitioner' = 'org.example.FlinkSQLKafkaSinkPartitioner'
整体样例SQL作业
CREATE TEMPORARY TABLE KafkaTable ( `key_id` string, `a` STRING, `b` string, `c` string, `ts` TIMESTAMP_LTZ(3) METADATA FROM'timestamp' VIRTUAL, process_topic string metadata from'topic' VIRTUAL, `partition` int METADATA VIRTUAL, header MAP<string,string> METADATA from'headers' VIRTUAL, `leader-epoch` int METADATA VIRTUAL, `offset` int METADATA VIRTUAL, `timestamp-type` string METADATA VIRTUAL ) WITH ('connector'='kafka','topic'='source_topic','properties.bootstrap.servers'='','properties.group.id'='group','scan.startup.mode'='earliest-offset','format'='csv','key.format'='csv','value.format'='csv','key.fields'='key_id','key.fields-prefix'='key_','value.fields-include'='EXCEPT_KEY');CREATE TEMPORARY TABLE kafka_sink ( `a` STRING, `b` string, `c` string ) WITH ('connector'='kafka','topic'='sink_topic','properties.bootstrap.servers'='','properties.group.id'='group','format'='csv','properties.enable.idempotence'='false','sink.partitioner'='org.example.FlinkSQLKafkaSinkPartitioner');CREATE TEMPORARY TABLE print_sink( `a` STRING, `b` string, `c` string ) WITH ('connector'='print','print-identifier'='print666');BEGIN STATEMENT SET;insertinto print_sink select a,b,c from KafkaTable ;insertinto kafka_sink select a,b,c from KafkaTable ;END;
3. 踩坑记录
3.1. 本地idea调试外网读写kafka高安全模式
【问题现象】
大量kafka堆栈抛出的warning信息,报brocker失联,如下所示。
14:05:58,947 WARN org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-5] Bootstrap broker 121.40.72.21:9093 (id: -1 rack: null) disconnected
【问题原因及解决】
需要参考kafka客户端在properties里配置ssl,sasl以及jaas相关配置项,相关配置参数可以参考kafka官网。
- https://help.aliyun.com/zh/apsaramq-for-kafka/developer-reference/update-the-ssl-certificate-algorithm?spm=a2c4g.11186623.0.i26
- https://github.com/AliwareMQ/aliware-kafka-demos/blob/master/kafka-java-demo/vpc-ssl/src/main/resources/mix.4096.client.truststore.jks
Propertiesproperties=newProperties(); properties.setProperty("bootstrap.servers", "ip:9093,ip:9093,ip:9093"); properties.setProperty("security.protocol", "SASL_SSL"); // SSL 配置// 配置服务端提供的 truststore (CA 证书) 的路径// Configure the path of truststore (CA) provided by the serverproperties.setProperty("ssl.truststore.location", "/file_path/mix.4096.client.truststore.jks"); properties.setProperty("ssl.truststore.password", "KafkaOnsClient"); // SASL 配置// 将 SASL 机制配置为 as SCRAM-SHA-256properties.setProperty("sasl.mechanism", "PLAIN"); properties.setProperty("ssl.endpoint.identification.algorithm", ""); // 配置 JAASproperties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
3.2. 下游丢数据 or 算子发送的数据与kafka group读取数据数量不一致
【问题现象】
- flink souce算子 records sent = 2
- metrics曲线显示source算子读取数据 = 4
- 指定kafka group在kafka topic中消费点位均达到最大点位,topic中消息共计4条
- 作业无任何异常
【问题原因及解决】
指定topic中2条消息key不为空,其余2条消息key为空,在source表中定义了key相关的字段,所以出现了source读取到4条数据(与曲线对应),但是source 算子并未向下游发送key为空的两条消息。
3.3. CORRUPT_MESSAGE
【问题现象】
大量warning报CORRUPT_MESSAGE,下游kafka没有数据,参考:https://www.alibabacloud.com/help/zh/apsaramq-for-kafka/developer-reference/what-do-i-do-if-an-error-is-reported-when-i-use-the-apsaramq-for-kafka-client
2024-01-17 14:43:52,094 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-1] Got error produce response with correlation id 40 on topic-partition sunyf_sink-0, retrying (2147483615 attempts left). Error: CORRUPT_MESSAGE
【问题原因及解决】
enable.idempotence 参数在不同vvr版本下默认值不同,vvr6 + 云kafka可以在不做参数修改的情况下正常生产数据,vvr8则需要手动指定为false。
由于云存储的云消息队列 Kafka 版不支持幂等和事务写入,您将无法使用Kafka结果表提供的精确一次语义exactly-once semantic功能,同时您需要在结果表中添加配置项properties.enable.idempotence=false以关闭幂等写入功能