1. protobuf简介
Protocol Buffers(简称:ProtoBuf)是一种开源跨平台的序列化数据结构的协议。其对于存储资料或在网络上进行通信的程序是很有用的。这个方法包含一个接口描述语言,描述一些数据结构,并提供程序工具根据这些描述产生代码,这些代码将用来生成或解析代表这些数据结构的字节流。
Google最初开发了Protocol Buffers用于内部使用。Protocol Buffers的设计目标是简单和性能。与XML相比更小且更快。
参考文档:
2. .proto文件
我们使用proto3的语法简单创建了一个名为User的消息体,其中包含了多种int,bool,string以及不同精度的浮点数字段,并将该文件保存为user.proto。
syntax = "proto3"; // proto 的package名称 // package com.example; // java package名称,如果不指定会默认用proto的package // option java_package = "com.example"; // 是否编译成多个文件 // option java_multiple_files = true; // java的封装类的类名 option java_outer_classname = "UserProtoBuf"; message User { int32 age = 1; int64 timestamp = 2; bool enabled = 3; float height = 4; double weight = 5; string userName = 6; string Full_Address = 7; }
3. .proto -> java.class
将.proto文件编译成java类需要用到protoc工具,参考链接:https://protobuf.dev/programming-guides/proto3/#generating,可以通过如下命令生成java类。
protoc --java_out=./ /path/user.proto
生成的java类整体结构如下所示,作为一个嵌套类,类名整体为我们指定的java_outer_classname参数,即UserProtoBuf。
4. 序列化与反序列化测试
package org.example; public class UserProtoTest { public static void main(String[] args) throws Exception { // 通过getClientPush方法将数据序列化 byte[] byteData = getClientPush(); System.out.println("数据字节长度:" + byteData.length); /** * 接收数据反序列化:将字节数据转化为对象数据。 */ UserProtoBuf.User user = UserProtoBuf.User.parseFrom(byteData); // 看下对象的tostring方法打印出来的内容 System.out.println("user obj to string:\n" + user); // 随便获取下某些字段进行打印 System.out.println("UserName=" + user.getUserName()); System.out.println("Timestamp=" + user.getTimestamp()); System.out.println("Height=" + user.getHeight()); } /** * 模拟发送方,将数据序列化后发送 * * @return */ private static byte[] getClientPush() { // 按照定义的数据结构,创建一个对象。 UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder(); user.setAge(18); user.setTimestamp(System.currentTimeMillis()); user.setEnabled(true); user.setHeight(1.74F); user.setWeight(70.07D); user.setUserName("adam"); user.setFullAddress("China"); /** * 发送数据序列化:将对象数据转化为字节数据输出 */ UserProtoBuf.User userBuild = user.build(); byte[] bytes = userBuild.toByteArray(); return bytes; } }
5. 云Kafka读写protobuf测试
测试环境以云kafka为例
5.1. 写Kafka
package org.example; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; public class KafkaProducerProtoTest { public static void main(String[] args) { Properties props = new Properties(); // alikafka_post-cn-wwo3i0eho00o props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks"); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); //云消息队列 Kafka 版消息的序列化方式。 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); //请求的最长等待时间。 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000); //设置客户端内部重试次数。 props.put(ProducerConfig.RETRIES_CONFIG, 5); //设置客户端内部重试间隔。 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";"); //构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可。 //如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个。 KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props); String topic = "sunyf_topic"; byte[] value = getProtoTestData(); try { //批量获取Future对象可以加快速度,但注意,批量不要太大。 List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128); ProducerRecord<String, byte[]> kafkaMessage = new ProducerRecord<String, byte[]>(topic, value); Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage); futures.add(metadataFuture); producer.flush(); for (Future<RecordMetadata> future : futures) { //同步获得Future对象的结果。 try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { t.printStackTrace(); } } } catch (Exception e) { //客户端内部重试之后,仍然发送失败,业务要应对此类错误。 System.out.println("error occurred"); e.printStackTrace(); } } private static byte[] getProtoTestData() { // 按照定义的数据结构,创建一个对象。 UserProtoBuf.User.Builder user = UserProtoBuf.User.newBuilder(); user.setAge(18); user.setTimestamp(System.currentTimeMillis()); user.setEnabled(true); user.setHeight(1.88F); user.setWeight(66.76D); user.setUserName("name"); user.setFullAddress("addr"); /** * 发送数据序列化:将对象数据转化为字节数据输出 */ UserProtoBuf.User userBuild = user.build(); byte[] bytes = userBuild.toByteArray(); return bytes; } }
5.2. 读Kafka
package org.example; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; public class KafkaConsumerProtoTest { public static void main(String args[]) { Properties props = new Properties(); //设置接入点,通过控制台获取对应Topic的接入点。 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<ip1>:9093,<ip2>:9093,<ip3>:9093"); //如果是SSL接入点实例,请注释以下第一行代码。 //可更加实际拉去数据和客户的版本等设置此值,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/Users/adamsun/Downloads/mix.4096.client.truststore.jks"); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // * 两次Poll之间的最大允许间隔。 // * 消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Group移除并触发Rebalance,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // * 设置单次拉取的量,走公网访问时,该参数会有较大影响。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); //每次poll的最大数量。 //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化方式 // 这里注意下value的序列化格式,默认的是string,这里改成了byte的 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); //当前消费实例所属的消费组,请在控制台申请之后填写。 //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "sunyf_group2"); //如果是SSL接入点实例,请取消以下一行代码的注释。 //Hostname校验改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<user_name>\" password=\"<pass_word>\";"); props.put("auto.offset.reset", "earliest"); //构造消息对象,也即生成一个消费实例。 KafkaConsumer<String, byte[]> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>(props); //设置消费组订阅的Topic,可以订阅多个。 //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样。 List<String> subscribedTopics = new ArrayList<String>(); //如果是SSL接入点实例,请注释以下前五行代码,取消第六行代码的注释。 //如果需要订阅多个Topic,则在这里add进去即可。 //每个Topic需要先在控制台进行创建。 String topicStr = "sunyf_topic"; String[] topics = topicStr.split(","); for (String topic : topics) { subscribedTopics.add(topic.trim()); } //subscribedTopics.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopics); //循环消费消息。 while (true) { try { ConsumerRecords<String, byte[]> records = consumer.poll(10000); //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。 //建议开一个单独的线程池来消费消息,然后异步返回结果。 for (ConsumerRecord<String, byte[]> record : records) { System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset())); byte[] value = record.value(); UserProtoBuf.User user = UserProtoBuf.User.parseFrom(value); System.out.println("user=" + user); System.out.println("UserName=" + user.getUserName()); System.out.println("Timestamp=" + user.getTimestamp()); System.out.println("Height=" + user.getHeight()); } } catch (Exception e) { try { Thread.sleep(10000); } catch (Throwable ignore) { } e.printStackTrace(); } } } }
我们通过上述代码,对写入Kafka实例的protobuf消息进行读取,我们在代码中重置了消费位点到最早的位点,"auto.offset.reset" = "earliest",从日志中我们也看到了客户端的这一动作,并可以看到从partition:10,offset:0开始消费,成功消费到一个user message并打印出了其中的部分字段。
6. 编译打包上传到flink
可以通过IDEA在本地进行打包
- 通过flink-quick-start的项目为基础,创建打包环境
archetypeGroupId |
org.apache.flink |
archetypeArtifactId |
flink-quickstart-java |
archetypeVersion |
1.17.2 |
- 添加flink-protobuf的依赖,如下所示的dependency到pom中
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-protobuf</artifactId> <version>1.17.2</version> </dependency>
- 添加protoc的java类到项目中
- 使用maven对项目进行打包
- 对target文件下的打包后的jar进行检查,是否含有protoc编译后的类。这里使用了官网的测试类,相关参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/。该proto文件中配置了option java_multiple_files = true; 所以生成了如下三个类:com.example.SimpleTest,com.example.SimpleTestOrBuilder,com.example.SimpleTestOuterClass
7. flink作业测试
7.1. 上传依赖
7.2. 作业开发与配置
通过FLINK-SQL创建任务,其中字段类型的映射可以参考:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#data-type-mapping
CREATE TEMPORARY TABLE simple_test ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'sunyf_topic', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'sunyf_group_flink', 'scan.startup.mode' = 'earliest-offset', 'format' = 'protobuf', -- 指定消息体对应的message类 'protobuf.message-class-name' = 'com.example.SimpleTest', 'protobuf.ignore-parse-errors' = 'true' ) ; CREATE TEMPORARY TABLE print_sink ( uid BIGINT, name STRING, category_type INT, content BINARY, price DOUBLE, value_map map<BIGINT, row<v1 BIGINT, v2 INT>>, value_arr array<row<v1 BIGINT, v2 INT>>, corpus_int INT, corpus_str STRING ) WITH ( 'connector' = 'print', 'logger' = 'true' ) ; insert into print_sink select * from simple_test;
任务发布后我们可以看到taskmanager中的printsinkoutputwriter有相关数据的输出,与我们写入kafka时的数据一致。
8. flink中对proto2和3版本的差异
空值的差异以及处理
protobuf.read-default-values
只有以proto2编译时,此选项才有效,且默认为false。如果为true,则格式将读取空值作为proto文件中定义的默认值。如果为false,则生成null值。如果proto语法是proto3,则该值将强制设置为true,因为proto3的标准是使用默认值。
关于空值的默认值,可以参考文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/formats/protobuf/#null-values
proto2编译
2024-03-25 15:08:55,075 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[null, xxx, null, null, null, null, null, null, null] 2024-03-25 15:08:55,075 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, null, null, null, null, null, WEB]
proto3编译
2024-03-26 16:22:20,411 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[0, xxx, 0, [], 0.0, {}, [], 0, UNIVERSAL] 2024-03-26 16:22:20,412 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter [] - +I[1, xxx, 2, [], 0.0, {}, [], 0, WEB]
9. 相关报错
9.1. com.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptorcannotbecasttocom.google.protobuf.DescriptorsFileDescriptor cannot be cast to com.google.protobuf.DescriptorsDescriptor
9.1.1. 表象
相关堆栈报错如下所示:
org.apache.flink.table.api.ValidationException: SQL validation failed. get org.example.UserProtoBuf descriptors error! at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41) at org.apache.flink.table.sqlserver.utils.ErrorConverter.formatException(ErrorConverter.java:125) at org.apache.flink.table.sqlserver.utils.ErrorConverter.toErrorDetail(ErrorConverter.java:60) at org.apache.flink.table.sqlserver.utils.ErrorConverter.toGrpcException(ErrorConverter.java:54) at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validateAndGeneratePlan(FlinkSqlServiceImpl.java:1152) at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3777) at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172) at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at org.apache.flink.table.sqlserver.interceptor.StatusInterceptor$1.onHalfClose(StatusInterceptor.java:116) at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at org.apache.flink.table.sqlserver.interceptor.CancelHandlerRegister$1.onHalfClose(CancelHandlerRegister.java:59) at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331) at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) at java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: get org.example.UserProtoBuf descriptors error! at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126) at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58) at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45) at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:716) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:385) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:596) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:250) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:180) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3847) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2716) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2261) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2175) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2120) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:721) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:707) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3693) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:615) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:378) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:354) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1828) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1504) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:450) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(SqlToOperationConverter.java:476) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:904) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:423) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:320) at org.apache.flink.table.planner.delegation.OperationIterator.convertNext(OperationIterator.java:102) at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:86) at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:49) at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validate(OperationExecutorImpl.java:434) at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validateAndGeneratePlan(OperationExecutorImpl.java:396) at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$validateAndGeneratePlan$30(DelegateOperationExecutor.java:273) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.flink.table.sqlserver.context.SqlServerSecurityContext.runSecured(SqlServerSecurityContext.java:72) at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapClassLoader(DelegateOperationExecutor.java:322) at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$wrapExecutor$36(DelegateOperationExecutor.java:349) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$FileDescriptor cannot be cast to com.google.protobuf.Descriptors$Descriptor at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123) ... 43 more
9.1.2. 代码分析
JM报错根因乍看之下具有较强隐蔽性,起初怀疑是包冲突,但provided runtime jar以及shade相关类后仍异常。
结合堆栈以及flink-protobuf-1.17.2.jar中源码:
及SQL代码:
CREATE TEMPORARY TABLE table_name ( ... ) WITH ( 'connector' = 'kafka', 'format' = 'protobuf', 'protobuf.message-class-name' = 'com.example.TcTest', 'protobuf.ignore-parse-errors'='true' );
及tc_test.proto文件:
syntax = "proto3"; option java_package = "com.example"; message M1 { ... } message M2 { ... } message M3 { ... }
及生成的java类:
public final class TcTest { public interface M1OrBuilder extends // @@protoc_insertion_point(interface_extends:M1) com.google.protobuf.MessageOrBuilder { ... } public static final class M1 extends com.google.protobuf.GeneratedMessageV3 implements // @@protoc_insertion_point(message_implements:M1) M1OrBuilder { ... } public interface M2OrBuilder extends // @@protoc_insertion_point(interface_extends:M2) com.google.protobuf.MessageOrBuilder { ... } public static final class M2 extends com.google.protobuf.GeneratedMessageV3 implements // @@protoc_insertion_point(message_implements:M2) M2OrBuilder { ... } }
综合分析,.proto文件中message消息体众多,且option中没有指定java_outer_classname,则java文件名默认改为驼峰命名的方式,即TcTest.java,且为嵌套的java类,内涵所有的M1,M2等所有message消息体,以子类的形式存在,且所有类中都存在getDescriptor方法,分别用于描述proto文件和message文件,但返回类型不同,TcTest类返回的类型为FileDescriptor,而M1等类返回的类型为Descriptor,在Flink SQL配置时仅指定了主类,与编译参数错位,导致类转换异常。拆分proto文件中message到多个文件后编译或指定子类(如com.example.TcTest$M1)后正常消费上游数据。
9.2. org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'
9.2.1. 表象
相关堆栈报错如下所示:
2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils [] - Protobuf codegen compile error: package org.apache.flink.formats.protobuf.deserialize; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericArrayData; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.HashMap; import com.google.protobuf.ByteString; public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{ public static RowData decode(.UserProtoBuf.User message){ RowData rowData=null; .UserProtoBuf.User message0 = message; GenericRowData rowData0 = new GenericRowData(7); Object elementDataVar1 = null; elementDataVar1 = message0.getAge(); rowData0.setField(0, elementDataVar1); Object elementDataVar2 = null; elementDataVar2 = message0.getTimestamp(); rowData0.setField(1, elementDataVar2); Object elementDataVar3 = null; elementDataVar3 = message0.getEnabled(); rowData0.setField(2, elementDataVar3); Object elementDataVar4 = null; elementDataVar4 = message0.getHeight(); rowData0.setField(3, elementDataVar4); Object elementDataVar5 = null; elementDataVar5 = message0.getWeight(); rowData0.setField(4, elementDataVar5); Object elementDataVar6 = null; elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString()); rowData0.setField(5, elementDataVar6); Object elementDataVar7 = null; elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString()); rowData0.setField(6, elementDataVar7); rowData = rowData0; return rowData; } } 2024-03-23 23:23:38,856 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372] Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 more Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.' at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 more
9.2.2. 源码分析
通过异常堆栈报错的部分可以看出decode方法的message参数的类型少指定了package,期望应该是例如:com.example.UserProtoBuf.User的形式,但是生成的代码message参数的类型为“.UserProtoBuf.User”
public static RowData decode(.UserProtoBuf.User message){ ... }
user.proto文件如下所示,使用protoc编译为UserProtoBuf.java文件后通过IDEA打包成jar并添加到作业依赖中:
syntax = "proto3"; option java_outer_classname = "UserProtoBuf"; message User { int32 age = 1; int64 timestamp = 2; bool enabled = 3; float height = 4; double weight = 5; string userName = 6; string Full_Address = 7; }
Flink SQL:
CREATE TEMPORARY TABLE test ( ... ) WITH ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = 'earliest-offset', 'format' = 'protobuf', 'protobuf.message-class-name' = 'org.example.UserProtoBuf$User', 'protobuf.ignore-parse-errors' = 'true' ) ;
结合堆栈信息
可以定位到异常抛出的方法为:
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass
code为org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter类的构造方法ProtoToRowConverter中如下代码调用并传入的,具体的参数值为“codegenAppender.code()”
Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code());
codegenAppender对象以及生成的java代码中decode方法message参数的类型是由如下的代码生成的
PbCodegenAppender codegenAppender = new PbCodegenAppender(); codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){");
继续追溯fullMessageClassName
// 通过我们传入的类名获取该类的Descriptor对象 Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName()); // 通过该Descriptor对象获取 路径+类名 作为decode方法中message的参数类型 String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
再继续看PbFormatUtils.getFullJavaName具体是怎么实现的,当前测试类非包装类,会走到else部分的代码块,getOuterProtoPrefix方法会传入自定义protobuf format的FileDescriptor
getOuterProtoPrefix方法的return部分的代码我们可以看到,在我们这个case中应该是javaPackageName为空导致message的类型为"."开头,没有包含前面的路径(在idea中打包到了org.example中,并在flink sql的参数中进行了指定)。究其原因,javaPackageName的取值在编译的protobuf 类中通过FileDescriptor获取java package或proto package的值(java package为空时)
但是当前proto文件中,如下两个参数均未指定,而是通过idea对生成的类进行了package的声明,导致无法找到对应的javaPackageName,我们通过local debug同样可以验证我们的推论。
package com.example.xxx.model; option java_package = "com.example.xxx.model";
我们可以在protobuf的官网上找到两个参数的相关介绍:https://protobuf.dev/programming-guides/proto3/#options
可以看到java_package与package两个参数的关系如上图getOuterProtoPrefix方法中的逻辑一致,优先为非空的java_package,不然则去proto文件的package参数。
9.2.3. 问题解决
我们可以按照flink官网的样例,在proto文件中对package以及option java_package两个参数均进行声明,并与idea项目中java打包后的路径一致即可绕过这个问题,让compileClass方法可以通过FileDescriptor找到package名。该问题已经提出issue到社区:https://issues.apache.org/jira/browse/FLINK-35034