开发者社区> 问答> 正文

这是一个bug吗?

您好,我在使用DataStream API 读取HBase表时,使用了HBaseRowInputFormat,并根据HBaseTableSchema了schema,代码如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment val hbaseTableSchema = TableSchema.builder() .add(TableColumn.of("id", DataTypes.STRING())) .add(TableColumn.of("f1", DataTypes.ROW(DataTypes.FIELD("value", DataTypes.STRING())))) .build() val schema = HBaseTableSchema.fromTableSchema(hbaseTableSchema)

val ds: DataStream[Row] = env.createInput(new HBaseRowInputFormat( hbaseConfig(), tabelName, schema )) ds.print() env.execute(this.getClass.getSimpleName) 运行时报了如下错误:

java.lang.RuntimeException: Row arity of from (2) does not match this serializers field length (1). at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:113) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:58) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

找到了原因是HBaseRowInputFormat源码中: @Override public TypeInformation getProducedType() { // split the fieldNames String[] famNames = schema.getFamilyNames(); TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length]; int i = 0; for (String family : famNames) { typeInfos[i] = new RowTypeInfo( schema.getQualifierTypes(family), schema.getQualifierNames(family)); i++; } return new RowTypeInfo(typeInfos, famNames); } 此处在构建TypeInformation时,没有加入rowkey的类型

所以这是一个bug吗?*来自志愿者整理的flink邮件归档

展开
收起
JACKJACK 2021-12-08 10:55:09 672 0
1 条回答
写回答
取消 提交回答
  • 对的是我!

    可以参照一下 HBaseTableSource 里面的实现方法

    HBaseTableSchema hbaseSchema = new HBaseTableSchema(); hbaseSchema.addColumn(xxx) hbaseSchema.setRowKey(xxx);

    execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hbaseSchema), getReturnType()) .name(explainSource());*来自志愿者整理的flink邮件归档

    2021-12-08 11:14:05
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载