阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

1. protobuf简介

Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。

Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。

参考文档:

  1. 维基百科
  2. 官方文档

2. .proto文件

我们使用proto3的语法简单创建了一个名为User的消息体,其中包含了多种int,bool,string以及不同精度的浮点数字段,并将该文件保存为user.proto。

syntax = "proto3";
// proto 的package名称
// package com.example;
// java package名称,如果不指定会默认用proto的package
// option java_package = "com.example";
// 是否编译成多个文件
// option java_multiple_files = true;
// java的封装类的类名
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

3. .proto -> java.class

将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。

protoc --java_out=./ /path/user.proto

生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。

image.png

4. 序列化与反序列化测试

package org.example;
public class UserProtoTest {
    public static void main(String[] args) throws Exception {
        // 通过getClientPush方法将数据序列化
        byte[] byteData = getClientPush();
        System.out.println("数据字节长度:" + byteData.length);
        /**
         * 接收数据反序列化:将字节数据转化为对象数据。
         */
        UserProtoBuf.User user = UserProtoBuf.User.parseFrom(byteData);
        // 看下对象的tostring方法打印出来的内容
        System.out.println("user obj to string:\n" + user);
        // 随便获取下某些字段进行打印
        System.out.println("UserName=" + user.getUserName());
        System.out.println("Timestamp=" + user.getTimestamp());
        System.out.println("Height=" + user.getHeight());
    }
    /**
     * 模拟发送方,将数据序列化后发送
     *
     * @return
     */
    private static byte[] getClientPush() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.74F);
        user.setWeight(70.07D);
        user.setUserName("adam");
        user.setFullAddress("China");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

image.png

5. 云Kafka读写protobuf测试

测试环境以云kafka为例

5.1. 写Kafka

package org.example;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerProtoTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        //        alikafka_post-cn-wwo3i0eho00o
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //云消息队列 Kafka 版消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        //请求的最长等待时间。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔。
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "sunyf_topic";
        byte[] value = getProtoTestData();
        try {
            //批量获取Future对象可以加快速度,但注意,批量不要太大。
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<String, byte[]>(topic, value);
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            futures.add(metadataFuture);
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                //同步获得Future对象的结果。
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //客户端内部重试之后,仍然发送失败,业务要应对此类错误。
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
    private static byte[] getProtoTestData() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.88F);
        user.setWeight(66.76D);
        user.setUserName("name");
        user.setFullAddress("addr");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

5.2. 读Kafka

package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerProtoTest  {
    public static void main(String args[]) {
        Properties props = new Properties();
        //设置接入点,通过控制台获取对应Topic的接入点。
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        //如果是SSL接入点实例,请注释以下第一行代码。
        //可更加实际拉去数据和客户的版本等设置此值,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//         * 两次Poll之间的最大允许间隔。
//         * 消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//         * 设置单次拉取的量,走公网访问时,该参数会有较大影响。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        //每次poll的最大数量。
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        // 这里注意下value的序列化格式,默认的是string,这里改成了byte的
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sunyf_group2");
        //如果是SSL接入点实例,请取消以下一行代码的注释。
        //Hostname校验改成空。
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        props.put("auto.offset.reset", "earliest");
        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer<String, byte[]> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>(props);
        //设置消费组订阅的Topic,可以订阅多个。
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
        List<String> subscribedTopics = new ArrayList<String>();
        //如果是SSL接入点实例,请注释以下前五行代码,取消第六行代码的注释。
        //如果需要订阅多个Topic,则在这里add进去即可。
        //每个Topic需要先在控制台进行创建。
        String topicStr = "sunyf_topic";
        String[] topics = topicStr.split(",");
        for (String topic : topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, byte[]> records = consumer.poll(10000);
                //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                //建议开一个单独的线程池来消费消息,然后异步返回结果。
                for (ConsumerRecord<String, byte[]> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                    byte[] value = record.value();
                    UserProtoBuf.User user = UserProtoBuf.User.parseFrom(value);
                    System.out.println("user=" + user);
                    System.out.println("UserName=" + user.getUserName());
                    System.out.println("Timestamp=" + user.getTimestamp());
                    System.out.println("Height=" + user.getHeight());
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(10000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }
}

我们通过上述代码,对写入Kafka实例的protobuf消息进行读取,我们在代码中重置了消费位点到最早的位点,"auto.offset.reset" = "earliest",从日志中我们也看到了客户端的这一动作,并可以看到从partition:10,offset:0开始消费,成功消费到一个user message并打印出了其中的部分字段。

image.png

6. 编译打包上传到flink

可以通过IDEA在本地进行打包

  1. 通过flink-quick-start的项目为基础,创建打包环境

archetypeGroupId

org.apache.flink

archetypeArtifactId

flink-quickstart-java

archetypeVersion

1.17.2

  1. 添加flink-protobuf的依赖,如下所示的dependency到pom中
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-protobuf</artifactId>
  <version>1.17.2</version>
</dependency>
  1. 添加protoc的java类到项目中
  2. 使用maven对项目进行打包
  3. 对target文件下的打包后的jar进行检查,是否含有protoc编译后的类。这里使用了官网的测试类,相关参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/。该proto文件中配置了option java_multiple_files = true; 所以生成了如下三个类:com.example.SimpleTest,com.example.SimpleTestOrBuilder,com.example.SimpleTestOuterClass

image.png

7. flink作业测试

7.1. 上传依赖

image.png

7.2. 作业开发与配置

通过FLINK-SQL创建任务,其中字段类型的映射可以参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#data-type-mapping

CREATE TEMPORARY TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sunyf_topic',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'sunyf_group_flink',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  -- 指定消息体对应的message类
  'protobuf.message-class-name' = 'com.example.SimpleTest',
  'protobuf.ignore-parse-errors' = 'true'
)
;
CREATE TEMPORARY TABLE print_sink (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'    
)
;
insert into print_sink
select * from simple_test;

任务发布后我们可以看到taskmanager中的printsinkoutputwriter有相关数据的输出,与我们写入kafka时的数据一致。

image.png

8. flink中对proto2和3版本的差异

空值的差异以及处理

protobuf.read-default-values

只有以proto2编译时,此选项才有效,且默认为false。如果为true,则格式将读取空值作为proto文件中定义的默认值。如果为false,则生成null值。如果proto语法是proto3,则该值将强制设置为true,因为proto3的标准是使用默认值。

关于空值的默认值,可以参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#null-values

proto2编译

2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[null, xxx, null, null, null, null, null, null, null]
2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, null, null, null, null, null, WEB]

proto3编译

2024-03-26 16:22:20,411 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[0, xxx, 0, [], 0.0, {}, [], 0, UNIVERSAL]
2024-03-26 16:22:20,412 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, [], 0.0, {}, [], 0, WEB]

9. 相关报错

9.1. com.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptor cannot be cast to com.google.protobuf.DescriptorsDescriptor

9.1.1. 表象

相关堆栈报错如下所示:

org.apache.flink.table.api.ValidationException: SQL validation failed. get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.formatException(ErrorConverter.java:125)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toErrorDetail(ErrorConverter.java:60)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toGrpcException(ErrorConverter.java:54)
  at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validateAndGeneratePlan(FlinkSqlServiceImpl.java:1152)
  at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3777)
  at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.StatusInterceptor$1.onHalfClose(StatusInterceptor.java:116)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.CancelHandlerRegister$1.onHalfClose(CancelHandlerRegister.java:59)
  at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
  at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
  at java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
  at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:716)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:385)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:596)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:250)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:180)
  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
  at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3847)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2716)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2261)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2120)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:721)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:707)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3693)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:615)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:378)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:354)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1828)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1504)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:450)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:476)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:904)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:423)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:320)
  at org.apache.flink.table.planner.delegation.OperationIterator.convertNext(OperationIterator.java:102)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:86)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:49)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validate(OperationExecutorImpl.java:434)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validateAndGeneratePlan(OperationExecutorImpl.java:396)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$validateAndGeneratePlan$30(DelegateOperationExecutor.java:273)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
  at org.apache.flink.table.sqlserver.context.SqlServerSecurityContext.runSecured(SqlServerSecurityContext.java:72)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapClassLoader(DelegateOperationExecutor.java:322)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$wrapExecutor$36(DelegateOperationExecutor.java:349)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$FileDescriptor cannot be cast to com.google.protobuf.Descriptors$Descriptor
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
  ... 43 more

9.1.2. 代码分析

JM报错根因乍看之下具有较强隐蔽性,起初怀疑是包冲突,但provided runtime jar以及shade相关类后仍异常。

结合堆栈以及flink-protobuf-1.17.2.jar中源码:

image.png

及SQL代码:

CREATE TEMPORARY TABLE table_name (
...
)
   WITH (
  'connector' = 'kafka',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'com.example.TcTest',
  'protobuf.ignore-parse-errors'='true'
);

及tc_test.proto文件:

syntax = "proto3";
option java_package = "com.example";
message M1 {
...
}
message M2 {
...
}
message M3 {
...
}

及生成的java类:

public final class TcTest {
  public interface M1OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M1)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M1 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M1)
      M1OrBuilder {
      ...
  }
  public interface M2OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M2)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M2 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M2)
      M2OrBuilder {
      ...
  }
}

综合分析,.proto文件中message消息体众多,且option中没有指定java_outer_classname,则java文件名默认改为驼峰命名的方式,即TcTest.java,且为嵌套的java类,内涵所有的M1,M2等所有message消息体,以子类的形式存在,且所有类中都存在getDescriptor方法,分别用于描述proto文件和message文件,但返回类型不同,TcTest类返回的类型为FileDescriptor,而M1等类返回的类型为Descriptor,在Flink SQL配置时仅指定了主类,与编译参数错位,导致类转换异常。拆分proto文件中message到多个文件后编译或指定子类(如com.example.TcTest$M1)后正常消费上游数据。

9.2. org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'

9.2.1. 表象

相关堆栈报错如下所示:

2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf codegen compile error: 
package org.apache.flink.formats.protobuf.deserialize;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericArrayData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import com.google.protobuf.ByteString;
public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{
public static RowData decode(.UserProtoBuf.User message){
RowData rowData=null;
.UserProtoBuf.User message0 = message;
GenericRowData rowData0 = new GenericRowData(7);
Object elementDataVar1 = null;
elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);
Object elementDataVar2 = null;
elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);
Object elementDataVar3 = null;
elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);
Object elementDataVar4 = null;
elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);
Object elementDataVar5 = null;
elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);
Object elementDataVar6 = null;
elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);
Object elementDataVar7 = null;
elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);
rowData = rowData0;
return rowData;
}
}
2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]
Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'
    at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more

9.2.2. 源码分析

通过异常堆栈报错的部分可以看出decode方法的message参数的类型少指定了package,期望应该是例如:com.example.UserProtoBuf.User的形式,但是生成的代码message参数的类型为“.UserProtoBuf.User”

public static RowData decode(.UserProtoBuf.User message){
    ...
}

user.proto文件如下所示,使用protoc编译为UserProtoBuf.java文件后通过IDEA打包成jar并添加到作业依赖中:

syntax = "proto3";
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

Flink SQL:

CREATE TEMPORARY TABLE test (
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
  'protobuf.ignore-parse-errors' = 'true'
)
;

结合堆栈信息

可以定位到异常抛出的方法为:

org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass

image.png

code为org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter类的构造方法ProtoToRowConverter中如下代码调用并传入的,具体的参数值为“codegenAppender.code()”

Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code());

codegenAppender对象以及生成的java代码中decode方法message参数的类型是由如下的代码生成的

PbCodegenAppender codegenAppender = new PbCodegenAppender();
codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){");

继续追溯fullMessageClassName

// 通过我们传入的类名获取该类的Descriptor对象
Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
// 通过该Descriptor对象获取 路径+类名 作为decode方法中message的参数类型
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);

image.png

再继续看PbFormatUtils.getFullJavaName具体是怎么实现的,当前测试类非包装类,会走到else部分的代码块,getOuterProtoPrefix方法会传入自定义protobuf format的FileDescriptor

image.png

getOuterProtoPrefix方法的return部分的代码我们可以看到,在我们这个case中应该是javaPackageName为空导致message的类型为"."开头,没有包含前面的路径(在idea中打包到了org.example中,并在flink sql的参数中进行了指定)。究其原因,javaPackageName的取值在编译的protobuf 类中通过FileDescriptor获取java package或proto package的值(java package为空时)

image.png

但是当前proto文件中,如下两个参数均未指定,而是通过idea对生成的类进行了package的声明,导致无法找到对应的javaPackageName,我们通过local debug同样可以验证我们的推论。

package com.example.xxx.model;
option java_package = "com.example.xxx.model";

image.png

我们可以在protobuf的官网上找到两个参数的相关介绍:https://protobuf.dev/programming-guides/proto3/#options

可以看到java_package与package两个参数的关系如上图getOuterProtoPrefix方法中的逻辑一致,优先为非空的java_package,不然则去proto文件的package参数。

image.png

9.2.3. 问题解决

我们可以按照flink官网的样例,在proto文件中对package以及option java_package两个参数均进行声明,并与idea项目中java打包后的路径一致即可绕过这个问题,让compileClass方法可以通过FileDescriptor找到package名。该问题已经提出issue到社区:https://issues.apache.org/jira/browse/FLINK-35034

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
17天前
|
SQL 存储 分布式计算
阿里巴巴瓴羊基于 Flink 实时计算的优化和实践
本⽂整理⾃阿里云智能集团技术专家王柳焮⽼师在 Flink Forward Asia 2023 中平台建设专场的分享。
290 2
阿里巴巴瓴羊基于 Flink 实时计算的优化和实践
|
14天前
|
运维 JavaScript Java
Serverless 应用引擎产品使用之在阿里云函数计算中想为两个不同的服务分别开通自定义域名如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
26 1
|
1天前
|
安全 Devops 测试技术
深入了解阿里云云效DevOps:构建高效软件开发实践
阿里云云效DevOps,集成CI/CD与自动化测试,提升开发效率。支持持续集成确保代码质量,自动化测试加速交付,多环境及灰度发布保障安全可靠性。助团队构建高效开发实践,增强竞争力。
7 1
|
4天前
|
Java Linux 数据安全/隐私保护
Docker自定义JDK镜像并拉取至阿里云镜像仓库全攻略
Docker自定义JDK镜像并拉取至阿里云镜像仓库全攻略
|
6天前
|
存储 监控 Apache
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
网易的灵犀办公和云信利用 Apache Doris 改进了大规模日志和时序数据处理,取代了 Elasticsearch 和 InfluxDB。Doris 实现了更低的服务器资源消耗和更高的查询性能,相比 Elasticsearch,查询速度提升至少 11 倍,存储资源节省达 70%。Doris 的列式存储、高压缩比和倒排索引等功能,优化了日志和时序数据的存储与分析,降低了存储成本并提高了查询效率。在灵犀办公和云信的实际应用中,Doris 显示出显著的性能优势,成功应对了数据增长带来的挑战。
查询提速11倍、资源节省70%,阿里云数据库内核版 Apache Doris 在网易日志和时序场景的实践
|
12天前
|
测试技术 块存储 开发者
阿里云块存储团队软件工程实践
本文介绍了阿里云团队软件工程实际开发流程,并简述了开发过程中遇到的一些问题。且附带案例,以及遇到案例中出现的情况应当如何应对。
|
14天前
|
运维 IDE Serverless
Serverless 应用引擎产品使用之阿里函数计算中,阿里云容器镜像服务(Container Registry)中创建自定义镜像,然后将其部署到FC上如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
28 0
|
14天前
|
运维 网络协议 安全
Serverless 应用引擎产品使用之阿里云函数计算中添加自定义域名进行域名DNS验证如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
22 1
|
27天前
|
SQL 运维 DataWorks
Flink CDC在阿里云DataWorks数据集成应用实践
本文整理自阿里云 DataWorks 数据集成团队的高级技术专家 王明亚(云时)老师在 Flink Forward Asia 2023 中数据集成专场的分享。
518 2
Flink CDC在阿里云DataWorks数据集成应用实践
|
2天前
|
弹性计算 运维 监控
解密阿里云弹性计算:探索云服务器ECS的核心功能
阿里云ECS是核心计算服务,提供弹性云服务器资源,支持实例按需配置、集群管理和监控,集成安全防护,确保服务稳定、安全,助力高效业务运营。
14 0