Flink问题之嵌套 json 中string 数组的解析异常如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:嵌套 json 中string 数组的解析异常


我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:

Row(parsedResponse: BasicArrayTypeInfo , timestamp: Long)

执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast

to [Ljava.lang.String;

at

org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)

at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)

at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:

{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}

P.S:

如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。


参考回答:

看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372075


问题二:【Flink的shuffle mode】


现在就两种:pipeline和batch

batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。

理论上可以per transformation的来设置,see PartitionTransformation.


参考回答:

那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372073


问题三:Flink 1.11 SQL作业中调用UDTF 出现“No match found for func


本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:

@FunctionHint( input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")}, output = @DataTypeHint("STRING") ) public class Split extends TableFunction { public Split(){} public void eval(String str, String ch) { if (str == null || str.isEmpty()) { return; } else { String[] ss = str.split(ch); for (String s : ss) { collect(s); } } } }

在flink sql中通过 create function splitByChar as '..Split' 来创建这个function,在tableEnv 中调用executeSql(....) 来完成对这个 function的注册,在sql 后面的计算逻辑中 通过以下方式来调用这个UDTF create view view_source_1 as select dateTime,itime`, lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as T(s) on true;

结果一直出现以下错误信息: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 25 to line 3, column 47: No match found for function signature splitByChar( , ) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629) .................... Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 25 to line 3, column 47: r( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 8 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature splitByChar( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)

之前在flink 1.10 里面使用是正常的, 问下各位大佬有没有在flink 1.11 遇到过这个错误, 麻烦提供一下帮助。


参考回答:

我感觉这应该是新版本的udf的bug,我在本地也可以复现。 已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372070


问题四:flink sql 读写写kafka表的时候可以指定消息的key吗


 flink sql 写kafka表的时候可以指定消息的key吗?

看官网的kafka connector没有找到消息key相关的说明

如果可以的话,如何指定?


参考回答:

目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell. https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372063


问题五:DataStream的state问题


想问下,在给state设置ttl的时候,如下面的代码:                    StateTtlConfig ttlConfig = StateTtlConfig

                           .newBuilder(Time.days(1))

                           .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

                           .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

                           .build();

设置了1天时间之后失效,例如2020-07-07 08:30:00点开始的job,那失效时间是这个时间段2020-07-07 00:00:00~2020-07-07 23:59:59,还是job上线之后,2020-07-07 08:30:00~2020-07-08 08:30:00这个时间段?


参考回答:

是最后一次 access 的时间到当前的时间超过了你设置的 ttl 间隔,比如你配置的是 OnCreateAndWrite

那么就是创建和写操作之后的 1 天,这个 state 会变成 expired,具体的可以参考文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372062

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
1
1
0
842
分享
相关文章
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
219 3
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
192 16
Java更新数据库报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
在Java中,使用mybatis-plus更新实体类对象到mysql,其中一个字段对应数据库中json数据类型,更新时报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
627 4
Java更新数据库报错:Data truncation: Cannot create a JSON value from a string with CHARACTER SET 'binary'.
【Java面试题汇总】Java基础篇——String+集合+泛型+IO+异常+反射(2023版)
String常量池、String、StringBuffer、Stringbuilder有什么区别、List与Set的区别、ArrayList和LinkedList的区别、HashMap底层原理、ConcurrentHashMap、HashMap和Hashtable的区别、泛型擦除、ABA问题、IO多路复用、BIO、NIO、O、异常处理机制、反射
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
JavaScript基础&实战(5)js中的数组、forEach遍历、Date对象、Math、String对象
这篇文章介绍了JavaScript中的数组、Date对象、Math对象以及包装类(String、Number、Boolean),并详细讲解了数组的创建、方法(如forEach、push、pop、unshift、slice、splice)和遍历操作,以及工厂方法创建对象和原型对象的概念。
JavaScript基础&实战(5)js中的数组、forEach遍历、Date对象、Math、String对象
实时计算 Flink版操作报错合集之整库同步mysql到starRock提交任务异常,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
深入解析String数组的操作与性能优化策略
深入解析String数组的操作与性能优化策略
为什么String跟JSON不是同个东西?
很多人会误解JSON仅仅是序列化后的String,但这样的表述并不完全准确。JSON本质上是以字符串(String)形式表示的数据交换格式,但它不仅仅是一个字符串,而是具有特定语法和结构的字符串。 很经常遇到的一个场景: 后端:我给你返回了一段JSON,你转化下再遍历吧。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
    AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等