阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云Flink-自定义kafka format实践及踩坑记录(以protobuf为例)

1. protobuf简介

Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。

Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。

参考文档:

  1. 维基百科
  2. 官方文档

2. .proto文件

我们使用proto3的语法简单创建了一个名为User的消息体,其中包含了多种int,bool,string以及不同精度的浮点数字段,并将该文件保存为user.proto。

syntax = "proto3";
// proto 的package名称
// package com.example;
// java package名称,如果不指定会默认用proto的package
// option java_package = "com.example";
// 是否编译成多个文件
// option java_multiple_files = true;
// java的封装类的类名
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

3. .proto -> java.class

将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。

protoc --java_out=./ /path/user.proto

生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。

image.png

4. 序列化与反序列化测试

package org.example;
public class UserProtoTest {
    public static void main(String[] args) throws Exception {
        // 通过getClientPush方法将数据序列化
        byte[] byteData = getClientPush();
        System.out.println("数据字节长度:" + byteData.length);
        /**
         * 接收数据反序列化:将字节数据转化为对象数据。
         */
        UserProtoBuf.User user = UserProtoBuf.User.parseFrom(byteData);
        // 看下对象的tostring方法打印出来的内容
        System.out.println("user obj to string:\n" + user);
        // 随便获取下某些字段进行打印
        System.out.println("UserName=" + user.getUserName());
        System.out.println("Timestamp=" + user.getTimestamp());
        System.out.println("Height=" + user.getHeight());
    }
    /**
     * 模拟发送方,将数据序列化后发送
     *
     * @return
     */
    private static byte[] getClientPush() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.74F);
        user.setWeight(70.07D);
        user.setUserName("adam");
        user.setFullAddress("China");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

image.png

5. 云Kafka读写protobuf测试

测试环境以云kafka为例

5.1. 写Kafka

package org.example;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerProtoTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        //        alikafka_post-cn-wwo3i0eho00o
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //云消息队列 Kafka 版消息的序列化方式。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        //请求的最长等待时间。
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数。
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔。
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。
        //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。
        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
        String topic = "sunyf_topic";
        byte[] value = getProtoTestData();
        try {
            //批量获取Future对象可以加快速度,但注意,批量不要太大。
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<String, byte[]>(topic, value);
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            futures.add(metadataFuture);
            producer.flush();
            for (Future<RecordMetadata> future : futures) {
                //同步获得Future对象的结果。
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Exception e) {
            //客户端内部重试之后,仍然发送失败,业务要应对此类错误。
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }
    private static byte[] getProtoTestData() {
        // 按照定义的数据结构,创建一个对象。
        UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder();
        user.setAge(18);
        user.setTimestamp(System.currentTimeMillis());
        user.setEnabled(true);
        user.setHeight(1.88F);
        user.setWeight(66.76D);
        user.setUserName("name");
        user.setFullAddress("addr");
        /**
         * 发送数据序列化:将对象数据转化为字节数据输出
         */
        UserProtoBuf.User userBuild = user.build();
        byte[] bytes = userBuild.toByteArray();
        return bytes;
    }
}

5.2. 读Kafka

package org.example;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
public class KafkaConsumerProtoTest  {
    public static void main(String args[]) {
        Properties props = new Properties();
        //设置接入点,通过控制台获取对应Topic的接入点。
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093");
        //如果是SSL接入点实例,请注释以下第一行代码。
        //可更加实际拉去数据和客户的版本等设置此值,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks");
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//         * 两次Poll之间的最大允许间隔。
//         * 消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//         * 设置单次拉取的量,走公网访问时,该参数会有较大影响。
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
        //每次poll的最大数量。
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        // 这里注意下value的序列化格式,默认的是string,这里改成了byte的
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //当前消费实例所属的消费组,请在控制台申请之后填写。
        //属于同一个组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sunyf_group2");
        //如果是SSL接入点实例,请取消以下一行代码的注释。
        //Hostname校验改成空。
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";");
        props.put("auto.offset.reset", "earliest");
        //构造消息对象,也即生成一个消费实例。
        KafkaConsumer<String, byte[]> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>(props);
        //设置消费组订阅的Topic,可以订阅多个。
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。
        List<String> subscribedTopics = new ArrayList<String>();
        //如果是SSL接入点实例,请注释以下前五行代码,取消第六行代码的注释。
        //如果需要订阅多个Topic,则在这里add进去即可。
        //每个Topic需要先在控制台进行创建。
        String topicStr = "sunyf_topic";
        String[] topics = topicStr.split(",");
        for (String topic : topics) {
            subscribedTopics.add(topic.trim());
        }
        //subscribedTopics.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopics);
        //循环消费消息。
        while (true) {
            try {
                ConsumerRecords<String, byte[]> records = consumer.poll(10000);
                //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
                //建议开一个单独的线程池来消费消息,然后异步返回结果。
                for (ConsumerRecord<String, byte[]> record : records) {
                    System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                    byte[] value = record.value();
                    UserProtoBuf.User user = UserProtoBuf.User.parseFrom(value);
                    System.out.println("user=" + user);
                    System.out.println("UserName=" + user.getUserName());
                    System.out.println("Timestamp=" + user.getTimestamp());
                    System.out.println("Height=" + user.getHeight());
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(10000);
                } catch (Throwable ignore) {
                }
                e.printStackTrace();
            }
        }
    }
}

我们通过上述代码,对写入Kafka实例的protobuf消息进行读取,我们在代码中重置了消费位点到最早的位点,"auto.offset.reset" = "earliest",从日志中我们也看到了客户端的这一动作,并可以看到从partition:10,offset:0开始消费,成功消费到一个user message并打印出了其中的部分字段。

image.png

6. 编译打包上传到flink

可以通过IDEA在本地进行打包

  1. 通过flink-quick-start的项目为基础,创建打包环境

archetypeGroupId

org.apache.flink

archetypeArtifactId

flink-quickstart-java

archetypeVersion

1.17.2

  1. 添加flink-protobuf的依赖,如下所示的dependency到pom中
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-protobuf</artifactId>
  <version>1.17.2</version>
</dependency>
  1. 添加protoc的java类到项目中
  2. 使用maven对项目进行打包
  3. 对target文件下的打包后的jar进行检查,是否含有protoc编译后的类。这里使用了官网的测试类,相关参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/。该proto文件中配置了option java_multiple_files = true; 所以生成了如下三个类:com.example.SimpleTest,com.example.SimpleTestOrBuilder,com.example.SimpleTestOuterClass

image.png

7. flink作业测试

7.1. 上传依赖

image.png

7.2. 作业开发与配置

通过FLINK-SQL创建任务,其中字段类型的映射可以参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#data-type-mapping

CREATE TEMPORARY TABLE simple_test (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sunyf_topic',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'sunyf_group_flink',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  -- 指定消息体对应的message类
  'protobuf.message-class-name' = 'com.example.SimpleTest',
  'protobuf.ignore-parse-errors' = 'true'
)
;
CREATE TEMPORARY TABLE print_sink (
  uid BIGINT,
  name STRING,
  category_type INT,
  content BINARY,
  price DOUBLE,
  value_map map<BIGINT, row<v1 BIGINT, v2 INT>>,
  value_arr array<row<v1 BIGINT, v2 INT>>,
  corpus_int INT,
  corpus_str STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'    
)
;
insert into print_sink
select * from simple_test;

任务发布后我们可以看到taskmanager中的printsinkoutputwriter有相关数据的输出,与我们写入kafka时的数据一致。

image.png

8. flink中对proto2和3版本的差异

空值的差异以及处理

protobuf.read-default-values

只有以proto2编译时,此选项才有效,且默认为false。如果为true,则格式将读取空值作为proto文件中定义的默认值。如果为false,则生成null值。如果proto语法是proto3,则该值将强制设置为true,因为proto3的标准是使用默认值。

关于空值的默认值,可以参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#null-values

proto2编译

2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[null, xxx, null, null, null, null, null, null, null]
2024-03-25 15:08:55,075 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, null, null, null, null, null, WEB]

proto3编译

2024-03-26 16:22:20,411 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[0, xxx, 0, [], 0.0, {}, [], 0, UNIVERSAL]
2024-03-26 16:22:20,412 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, [], 0.0, {}, [], 0, WEB]

9. 相关报错

9.1. com.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptor cannot be cast to com.google.protobuf.DescriptorsDescriptor

9.1.1. 表象

相关堆栈报错如下所示:

org.apache.flink.table.api.ValidationException: SQL validation failed. get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.formatException(ErrorConverter.java:125)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toErrorDetail(ErrorConverter.java:60)
  at org.apache.flink.table.sqlserver.utils.ErrorConverter.toGrpcException(ErrorConverter.java:54)
  at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validateAndGeneratePlan(FlinkSqlServiceImpl.java:1152)
  at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3777)
  at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.StatusInterceptor$1.onHalfClose(StatusInterceptor.java:116)
  at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
  at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
  at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
  at org.apache.flink.table.sqlserver.interceptor.CancelHandlerRegister$1.onHalfClose(CancelHandlerRegister.java:59)
  at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
  at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
  at java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: get org.example.UserProtoBuf descriptors error!
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
  at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45)
  at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:716)
  at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:385)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:596)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:250)
  at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:180)
  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
  at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3847)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2716)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2261)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2120)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:721)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:707)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3693)
  at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:615)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:378)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:354)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1828)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1504)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:450)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:476)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:904)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:423)
  at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:320)
  at org.apache.flink.table.planner.delegation.OperationIterator.convertNext(OperationIterator.java:102)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:86)
  at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:49)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validate(OperationExecutorImpl.java:434)
  at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validateAndGeneratePlan(OperationExecutorImpl.java:396)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$validateAndGeneratePlan$30(DelegateOperationExecutor.java:273)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
  at org.apache.flink.table.sqlserver.context.SqlServerSecurityContext.runSecured(SqlServerSecurityContext.java:72)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapClassLoader(DelegateOperationExecutor.java:322)
  at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$wrapExecutor$36(DelegateOperationExecutor.java:349)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$FileDescriptor cannot be cast to com.google.protobuf.Descriptors$Descriptor
  at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
  ... 43 more

9.1.2. 代码分析

JM报错根因乍看之下具有较强隐蔽性,起初怀疑是包冲突,但provided runtime jar以及shade相关类后仍异常。

结合堆栈以及flink-protobuf-1.17.2.jar中源码:

image.png

及SQL代码:

CREATE TEMPORARY TABLE table_name (
...
)
   WITH (
  'connector' = 'kafka',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'com.example.TcTest',
  'protobuf.ignore-parse-errors'='true'
);

及tc_test.proto文件:

syntax = "proto3";
option java_package = "com.example";
message M1 {
...
}
message M2 {
...
}
message M3 {
...
}

及生成的java类:

public final class TcTest {
  public interface M1OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M1)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M1 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M1)
      M1OrBuilder {
      ...
  }
  public interface M2OrBuilder extends
      // @@protoc_insertion_point(interface_extends:M2)
      com.google.protobuf.MessageOrBuilder {
      ...
  }
  public static final class M2 extends
      com.google.protobuf.GeneratedMessageV3 implements
      // @@protoc_insertion_point(message_implements:M2)
      M2OrBuilder {
      ...
  }
}

综合分析,.proto文件中message消息体众多,且option中没有指定java_outer_classname,则java文件名默认改为驼峰命名的方式,即TcTest.java,且为嵌套的java类,内涵所有的M1,M2等所有message消息体,以子类的形式存在,且所有类中都存在getDescriptor方法,分别用于描述proto文件和message文件,但返回类型不同,TcTest类返回的类型为FileDescriptor,而M1等类返回的类型为Descriptor,在Flink SQL配置时仅指定了主类,与编译参数错位,导致类转换异常。拆分proto文件中message到多个文件后编译或指定子类(如com.example.TcTest$M1)后正常消费上游数据。

9.2. org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'

9.2.1. 表象

相关堆栈报错如下所示:

2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf codegen compile error: 
package org.apache.flink.formats.protobuf.deserialize;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericArrayData;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import com.google.protobuf.ByteString;
public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{
public static RowData decode(.UserProtoBuf.User message){
RowData rowData=null;
.UserProtoBuf.User message0 = message;
GenericRowData rowData0 = new GenericRowData(7);
Object elementDataVar1 = null;
elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);
Object elementDataVar2 = null;
elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);
Object elementDataVar3 = null;
elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);
Object elementDataVar4 = null;
elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);
Object elementDataVar5 = null;
elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);
Object elementDataVar6 = null;
elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);
Object elementDataVar7 = null;
elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);
rowData = rowData0;
return rowData;
}
}
2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause:
org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]
Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'
    at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
    at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
    ... 14 more

9.2.2. 源码分析

通过异常堆栈报错的部分可以看出decode方法的message参数的类型少指定了package,期望应该是例如:com.example.UserProtoBuf.User的形式,但是生成的代码message参数的类型为“.UserProtoBuf.User”

public static RowData decode(.UserProtoBuf.User message){
    ...
}

user.proto文件如下所示,使用protoc编译为UserProtoBuf.java文件后通过IDEA打包成jar并添加到作业依赖中:

syntax = "proto3";
option java_outer_classname = "UserProtoBuf";
message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
}

Flink SQL:

CREATE TEMPORARY TABLE test (
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
  'protobuf.ignore-parse-errors' = 'true'
)
;

结合堆栈信息

可以定位到异常抛出的方法为:

org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass

image.png

code为org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter类的构造方法ProtoToRowConverter中如下代码调用并传入的,具体的参数值为“codegenAppender.code()”

Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code());

codegenAppender对象以及生成的java代码中decode方法message参数的类型是由如下的代码生成的

PbCodegenAppender codegenAppender = new PbCodegenAppender();
codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){");

继续追溯fullMessageClassName

// 通过我们传入的类名获取该类的Descriptor对象
Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
// 通过该Descriptor对象获取 路径+类名 作为decode方法中message的参数类型
String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);

image.png

再继续看PbFormatUtils.getFullJavaName具体是怎么实现的,当前测试类非包装类,会走到else部分的代码块,getOuterProtoPrefix方法会传入自定义protobuf format的FileDescriptor

image.png

getOuterProtoPrefix方法的return部分的代码我们可以看到,在我们这个case中应该是javaPackageName为空导致message的类型为"."开头,没有包含前面的路径(在idea中打包到了org.example中,并在flink sql的参数中进行了指定)。究其原因,javaPackageName的取值在编译的protobuf 类中通过FileDescriptor获取java package或proto package的值(java package为空时)

image.png

但是当前proto文件中,如下两个参数均未指定,而是通过idea对生成的类进行了package的声明,导致无法找到对应的javaPackageName,我们通过local debug同样可以验证我们的推论。

package com.example.xxx.model;
option java_package = "com.example.xxx.model";

image.png

我们可以在protobuf的官网上找到两个参数的相关介绍:https://protobuf.dev/programming-guides/proto3/#options

可以看到java_package与package两个参数的关系如上图getOuterProtoPrefix方法中的逻辑一致,优先为非空的java_package,不然则去proto文件的package参数。

image.png

9.2.3. 问题解决

我们可以按照flink官网的样例,在proto文件中对package以及option java_package两个参数均进行声明,并与idea项目中java打包后的路径一致即可绕过这个问题,让compileClass方法可以通过FileDescriptor找到package名。该问题已经提出issue到社区:https://issues.apache.org/jira/browse/FLINK-35034

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3天前
|
消息中间件 Kafka 分布式数据库
实时计算 Flink版产品使用合集之如何批量读取Kafka数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之使用DTS从RDSMySQL数据库同步数据到云Kafka,增量同步数据延迟时间超过1秒。如何诊断问题并降低延迟
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错之运行kafka时报错:javax.management.InstanceAlreadyExistsException,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错之连接外部kafka本地执行测试代码报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL 流计算 消息中间件
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
《Flink SQL 1.9.0 技术内幕和最佳实践》,许多小伙伴对演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。
Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
|
3天前
|
SQL JSON 资源调度
实时计算 Flink版产品使用合集之如何指定FlinkYarnSession启动的properties文件存放位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
SQL 监控 Oracle
实时计算 Flink版产品使用合集之如何指定表的隐藏列为主键
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
关系型数据库 测试技术 数据处理
实时计算 Flink版产品使用合集之TaskManager宕机是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3天前
|
Java 数据库连接 数据处理
实时计算 Flink版产品使用合集之是否支持将数据直接写入adb
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

相关产品

  • 实时计算 Flink版