阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

1. protobuf简介

Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。

Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。

参考文档:

  1. 维基百科
  2. 官方文档

2. .proto文件

我们使用proto3的语法简单创建了一个名为User的消息体,其中包含了多种int,bool,string以及不同精度的浮点数字段,并将该文件保存为user.proto。

syntax = "proto3";
// proto 的package名称
// package com.example;
// java package名称,如果不指定会默认用proto的package
// option java_package = "com.example";
// 是否编译成多个文件
// option java_multiple_files = true;
// java的封装类的类名
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

3. .proto -> java.class

将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。

protoc --java_out=./ /path/user.proto

生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。

image.png

4. 序列化与反序列化测试

package org.example;
public class UserProtoTest {
    public static void main(String[] args) throws Exception {
        // 通过getClientPush方法将数据序列化
        byte[] byteData = getClientPush();
        System.out.println("数据字节长度:" + byteData.length);
        /**
         * 接收数据反序列化:将字节数据转化为对象数据。
         */
        UserProtoBuf.User user = UserProtoBuf.User.parseFrom(byteData);
        // 看下对象的tostring方法打印出来的内容
        System.out.println("user obj to string:\n" + user);
        // 随便获取下某些字段进行打印
        System.out.println("UserName=" + user.getUserName());
        System.out.println("Timestamp=" + user.getTimestamp());
        System.out.println("Height=" + user.getHeight());
    }
    /**
     * 模拟发送方,将数据序列化后发送
     *
     * @return
     */
    private static byte[] getClientPush() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.74F);
        user.setWeight(70.07D);
        user.setUserName("adam");
        user.setFullAddress("China");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

image.png

5. 云Kafka读写protobuf测试

测试环境以云kafka为例

5.1. 写Kafka

package org.example;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerProtoTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        //        alikafka_post-cn-wwo3i0eho00o
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //云消息队列 Kafka 版消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        //请求的最长等待时间。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔。
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "sunyf_topic";
        byte[] value = getProtoTestData();
        try {
            //批量获取Future对象可以加快速度,但注意,批量不要太大。
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<String, byte[]>(topic, value);
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            futures.add(metadataFuture);
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                //同步获得Future对象的结果。
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //客户端内部重试之后,仍然发送失败,业务要应对此类错误。
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
    private static byte[] getProtoTestData() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.88F);
        user.setWeight(66.76D);
        user.setUserName("name");
        user.setFullAddress("addr");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

5.2. 读Kafka

package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerProtoTest  {
    public static void main(String args[]) {
        Properties props = new Properties();
        //设置接入点,通过控制台获取对应Topic的接入点。
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        //如果是SSL接入点实例,请注释以下第一行代码。
        //可更加实际拉去数据和客户的版本等设置此值,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//         * 两次Poll之间的最大允许间隔。
//         * 消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//         * 设置单次拉取的量,走公网访问时,该参数会有较大影响。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        //每次poll的最大数量。
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        // 这里注意下value的序列化格式,默认的是string,这里改成了byte的
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sunyf_group2");
        //如果是SSL接入点实例,请取消以下一行代码的注释。
        //Hostname校验改成空。
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        props.put("auto.offset.reset", "earliest");
        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer<String, byte[]> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>(props);
        //设置消费组订阅的Topic,可以订阅多个。
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
        List<String> subscribedTopics = new ArrayList<String>();
        //如果是SSL接入点实例,请注释以下前五行代码,取消第六行代码的注释。
        //如果需要订阅多个Topic,则在这里add进去即可。
        //每个Topic需要先在控制台进行创建。
        String topicStr = "sunyf_topic";
        String[] topics = topicStr.split(",");
        for (String topic : topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, byte[]> records = consumer.poll(10000);
                //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                //建议开一个单独的线程池来消费消息,然后异步返回结果。
                for (ConsumerRecord<String, byte[]> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                    byte[] value = record.value();
                    UserProtoBuf.User user = UserProtoBuf.User.parseFrom(value);
                    System.out.println("user=" + user);
                    System.out.println("UserName=" + user.getUserName());
                    System.out.println("Timestamp=" + user.getTimestamp());
                    System.out.println("Height=" + user.getHeight());
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(10000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }
}

我们通过上述代码,对写入Kafka实例的protobuf消息进行读取,我们在代码中重置了消费位点到最早的位点,"auto.offset.reset" = "earliest",从日志中我们也看到了客户端的这一动作,并可以看到从partition:10,offset:0开始消费,成功消费到一个user message并打印出了其中的部分字段。

image.png

6. 编译打包上传到flink

可以通过IDEA在本地进行打包

  1. 通过flink-quick-start的项目为基础,创建打包环境

archetypeGroupId

org.apache.flink

archetypeArtifactId

flink-quickstart-java

archetypeVersion

1.17.2

  1. 添加flink-protobuf的依赖,如下所示的dependency到pom中
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-protobuf</artifactId>
  <version>1.17.2</version>
</dependency>
  1. 添加protoc的java类到项目中
  2. 使用maven对项目进行打包
  3. 对target文件下的打包后的jar进行检查,是否含有protoc编译后的类。这里使用了官网的测试类,相关参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/。该proto文件中配置了option java_multiple_files = true; 所以生成了如下三个类:com.example.SimpleTest,com.example.SimpleTestOrBuilder,com.example.SimpleTestOuterClass

image.png

7. flink作业测试

7.1. 上传依赖

image.png

7.2. 作业开发与配置

通过FLINK-SQL创建任务,其中字段类型的映射可以参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#data-type-mapping

CREATE TEMPORARY TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sunyf_topic',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'sunyf_group_flink',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  -- 指定消息体对应的message类
  'protobuf.message-class-name' = 'com.example.SimpleTest',
  'protobuf.ignore-parse-errors' = 'true'
)
;
CREATE TEMPORARY TABLE print_sink (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'    
)
;
insert into print_sink
select * from simple_test;

任务发布后我们可以看到taskmanager中的printsinkoutputwriter有相关数据的输出,与我们写入kafka时的数据一致。

image.png

8. flink中对proto2和3版本的差异

空值的差异以及处理

protobuf.read-default-values

只有以proto2编译时,此选项才有效,且默认为false。如果为true,则格式将读取空值作为proto文件中定义的默认值。如果为false,则生成null值。如果proto语法是proto3,则该值将强制设置为true,因为proto3的标准是使用默认值。

关于空值的默认值,可以参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#null-values

proto2编译

2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[null, xxx, null, null, null, null, null, null, null]
2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, null, null, null, null, null, WEB]

proto3编译

2024-03-26 16:22:20,411 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[0, xxx, 0, [], 0.0, {}, [], 0, UNIVERSAL]
2024-03-26 16:22:20,412 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, [], 0.0, {}, [], 0, WEB]

9. 相关报错

9.1. com.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptor cannot be cast to com.google.protobuf.DescriptorsDescriptor

9.1.1. 表象

相关堆栈报错如下所示:

org.apache.flink.table.api.ValidationException: SQL validation failed. get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.formatException(ErrorConverter.java:125)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toErrorDetail(ErrorConverter.java:60)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toGrpcException(ErrorConverter.java:54)
  at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validateAndGeneratePlan(FlinkSqlServiceImpl.java:1152)
  at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3777)
  at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.StatusInterceptor$1.onHalfClose(StatusInterceptor.java:116)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.CancelHandlerRegister$1.onHalfClose(CancelHandlerRegister.java:59)
  at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
  at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
  at java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
  at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:716)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:385)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:596)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:250)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:180)
  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
  at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3847)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2716)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2261)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2120)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:721)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:707)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3693)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:615)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:378)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:354)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1828)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1504)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:450)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:476)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:904)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:423)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:320)
  at org.apache.flink.table.planner.delegation.OperationIterator.convertNext(OperationIterator.java:102)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:86)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:49)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validate(OperationExecutorImpl.java:434)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validateAndGeneratePlan(OperationExecutorImpl.java:396)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$validateAndGeneratePlan$30(DelegateOperationExecutor.java:273)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
  at org.apache.flink.table.sqlserver.context.SqlServerSecurityContext.runSecured(SqlServerSecurityContext.java:72)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapClassLoader(DelegateOperationExecutor.java:322)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$wrapExecutor$36(DelegateOperationExecutor.java:349)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$FileDescriptor cannot be cast to com.google.protobuf.Descriptors$Descriptor
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
  ... 43 more

9.1.2. 代码分析

JM报错根因乍看之下具有较强隐蔽性,起初怀疑是包冲突,但provided runtime jar以及shade相关类后仍异常。

结合堆栈以及flink-protobuf-1.17.2.jar中源码:

image.png

及SQL代码:

CREATE TEMPORARY TABLE table_name (
...
)
   WITH (
  'connector' = 'kafka',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'com.example.TcTest',
  'protobuf.ignore-parse-errors'='true'
);

及tc_test.proto文件:

syntax = "proto3";
option java_package = "com.example";
message M1 {
...
}
message M2 {
...
}
message M3 {
...
}

及生成的java类:

public final class TcTest {
  public interface M1OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M1)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M1 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M1)
      M1OrBuilder {
      ...
  }
  public interface M2OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M2)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M2 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M2)
      M2OrBuilder {
      ...
  }
}

综合分析,.proto文件中message消息体众多,且option中没有指定java_outer_classname,则java文件名默认改为驼峰命名的方式,即TcTest.java,且为嵌套的java类,内涵所有的M1,M2等所有message消息体,以子类的形式存在,且所有类中都存在getDescriptor方法,分别用于描述proto文件和message文件,但返回类型不同,TcTest类返回的类型为FileDescriptor,而M1等类返回的类型为Descriptor,在Flink SQL配置时仅指定了主类,与编译参数错位,导致类转换异常。拆分proto文件中message到多个文件后编译或指定子类(如com.example.TcTest$M1)后正常消费上游数据。

9.2. org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'

9.2.1. 表象

相关堆栈报错如下所示:

2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf codegen compile error: 
package org.apache.flink.formats.protobuf.deserialize;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericArrayData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import com.google.protobuf.ByteString;
public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{
public static RowData decode(.UserProtoBuf.User message){
RowData rowData=null;
.UserProtoBuf.User message0 = message;
GenericRowData rowData0 = new GenericRowData(7);
Object elementDataVar1 = null;
elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);
Object elementDataVar2 = null;
elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);
Object elementDataVar3 = null;
elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);
Object elementDataVar4 = null;
elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);
Object elementDataVar5 = null;
elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);
Object elementDataVar6 = null;
elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);
Object elementDataVar7 = null;
elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);
rowData = rowData0;
return rowData;
}
}
2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]
Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'
    at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more

9.2.2. 源码分析

通过异常堆栈报错的部分可以看出decode方法的message参数的类型少指定了package,期望应该是例如:com.example.UserProtoBuf.User的形式,但是生成的代码message参数的类型为“.UserProtoBuf.User”

public static RowData decode(.UserProtoBuf.User message){
    ...
}

user.proto文件如下所示,使用protoc编译为UserProtoBuf.java文件后通过IDEA打包成jar并添加到作业依赖中:

syntax = "proto3";
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

Flink SQL:

CREATE TEMPORARY TABLE test (
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
  'protobuf.ignore-parse-errors' = 'true'
)
;

结合堆栈信息

可以定位到异常抛出的方法为:

org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass

image.png

code为org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter类的构造方法ProtoToRowConverter中如下代码调用并传入的,具体的参数值为“codegenAppender.code()”

Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code());

codegenAppender对象以及生成的java代码中decode方法message参数的类型是由如下的代码生成的

PbCodegenAppender codegenAppender = new PbCodegenAppender();
codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){");

继续追溯fullMessageClassName

// 通过我们传入的类名获取该类的Descriptor对象
Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
// 通过该Descriptor对象获取 路径+类名 作为decode方法中message的参数类型
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);

image.png

再继续看PbFormatUtils.getFullJavaName具体是怎么实现的,当前测试类非包装类,会走到else部分的代码块,getOuterProtoPrefix方法会传入自定义protobuf format的FileDescriptor

image.png

getOuterProtoPrefix方法的return部分的代码我们可以看到,在我们这个case中应该是javaPackageName为空导致message的类型为"."开头,没有包含前面的路径(在idea中打包到了org.example中,并在flink sql的参数中进行了指定)。究其原因,javaPackageName的取值在编译的protobuf 类中通过FileDescriptor获取java package或proto package的值(java package为空时)

image.png

但是当前proto文件中,如下两个参数均未指定,而是通过idea对生成的类进行了package的声明,导致无法找到对应的javaPackageName,我们通过local debug同样可以验证我们的推论。

package com.example.xxx.model;
option java_package = "com.example.xxx.model";

image.png

我们可以在protobuf的官网上找到两个参数的相关介绍:https://protobuf.dev/programming-guides/proto3/#options

可以看到java_package与package两个参数的关系如上图getOuterProtoPrefix方法中的逻辑一致,优先为非空的java_package,不然则去proto文件的package参数。

image.png

9.2.3. 问题解决

我们可以按照flink官网的样例,在proto文件中对package以及option java_package两个参数均进行声明,并与idea项目中java打包后的路径一致即可绕过这个问题,让compileClass方法可以通过FileDescriptor找到package名。该问题已经提出issue到社区:https://issues.apache.org/jira/browse/FLINK-35034

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
18天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
51 4
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
131 0
|
22天前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
54 9
|
23天前
|
运维 监控 安全
实时计算Flink场景实践和核心功能体验
实时计算Flink场景实践和核心功能体验
|
24天前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
51 4
|
14天前
|
数据采集 运维 搜索推荐
实时计算Flink场景实践
在数字化时代,实时数据处理愈发重要。本文分享了作者使用阿里云实时计算Flink版和流式数据湖仓Paimon的体验,展示了其在电商场景中的应用,包括数据抽取、清洗、关联和聚合,突出了系统的高效、稳定和低延迟特点。
43 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
40 0
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
129 0
|
SQL Kubernetes Cloud Native
开发者社区精选直播合集(三十六)| Flink实践合集
Flink 作为业界公认为最好的流计算引擎,不仅仅局限于做流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎,以其高吞吐低延时的优异实时计算能力、支持海量数据的亚秒级快速响应帮助企业和开发者实现数据算力升级,并成为阿里、腾讯、滴滴、美团、字节跳动、Netflix、Lyft 等国内外知名公司建设实时计算平台的首选。
开发者社区精选直播合集(三十六)|  Flink实践合集
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

相关产品

  • 实时计算 Flink版