Flink问题之嵌套 json 中string 数组的解析异常如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:嵌套 json 中string 数组的解析异常


我使用 flink 1.9 处理嵌套 json, 它嵌套了一个string数组,构造出的 table schema结构为:

Row(parsedResponse: BasicArrayTypeInfo , timestamp: Long)

执行作业后会发生报错如下,出现 object 类型和string 类型的转换错误

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast

to [Ljava.lang.String;

at

org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.copy(StringArraySerializer.java:35)

at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)

at

org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)

大佬们知道该怎么修改么?

我的json 的结构如下:

{"parsedResponse":["apple", "banana", "orange"], "timestamp": "1522253345"}

P.S:

如果把 string 数组改为 long 数组或者 double 数组执行对应的操作可以正确运行,目前来看只有 string 数组出现问题。


参考回答:

看了下代码,这确实是Flink 1.9里面的一个bug[1], 原因没有 source 没有正确处理legacy type 和新的 type,这个issue没有在1.9的分支上修复,可以升级到1.10.1试下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372075


问题二:【Flink的shuffle mode】


现在就两种:pipeline和batch

batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。

理论上可以per transformation的来设置,see PartitionTransformation.


参考回答:

那就是说datasream默认模式就是pipeline,而批模式是batch,批的模式是存在shuffle情况下,需要等shuffle操作造成,才能发送到下游.那如果批应用有shuffle操作和没有shuffle的,是都要等这个shuffle操作完成了才能一起发给下游,还是说其他非shuffle操作完成了可以先发给下游,不用等shuffle操作完成一起再发送?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372073


问题三:Flink 1.11 SQL作业中调用UDTF 出现“No match found for func


本人基于Flink 1.11 SNAPSHOT 在 Flink sql 作业中使用 UDTF, UDTF 的定义如下:

@FunctionHint( input = {@DataTypeHint("STRING"), @DataTypeHint("STRING")}, output = @DataTypeHint("STRING") ) public class Split extends TableFunction { public Split(){} public void eval(String str, String ch) { if (str == null || str.isEmpty()) { return; } else { String[] ss = str.split(ch); for (String s : ss) { collect(s); } } } }

在flink sql中通过 create function splitByChar as '..Split' 来创建这个function,在tableEnv 中调用executeSql(....) 来完成对这个 function的注册,在sql 后面的计算逻辑中 通过以下方式来调用这个UDTF create view view_source_1 as select dateTime,itime`, lng,lat,net,event_info, cast(split_index(T.s, '_', 0) as int) as time_page from view_source as a left join LATERAL TABLE (splitByChar('a,b,c',',')) as T(s) on true;

结果一直出现以下错误信息: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 3, column 25 to line 3, column 47: No match found for function signature splitByChar( , ) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:629) .................... Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3, column 25 to line 3, column 47: r( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) at org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) at org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) at org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) ... 8 more Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature splitByChar( , ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)

之前在flink 1.10 里面使用是正常的, 问下各位大佬有没有在flink 1.11 遇到过这个错误, 麻烦提供一下帮助。


参考回答:

我感觉这应该是新版本的udf的bug,我在本地也可以复现。 已经建了一个issue[1] 来跟进。

[1] https://issues.apache.org/jira/browse/FLINK-18520


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372070


问题四:flink sql 读写写kafka表的时候可以指定消息的key吗


 flink sql 写kafka表的时候可以指定消息的key吗?

看官网的kafka connector没有找到消息key相关的说明

如果可以的话,如何指定?


参考回答:

目前还不支持的,社区有一个 FLIP-107[1] 在计划做这个事情。[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell. https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372063


问题五:DataStream的state问题


想问下,在给state设置ttl的时候,如下面的代码:                    StateTtlConfig ttlConfig = StateTtlConfig

                           .newBuilder(Time.days(1))

                           .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

                           .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

                           .build();

设置了1天时间之后失效,例如2020-07-07 08:30:00点开始的job,那失效时间是这个时间段2020-07-07 00:00:00~2020-07-07 23:59:59,还是job上线之后,2020-07-07 08:30:00~2020-07-08 08:30:00这个时间段?


参考回答:

是最后一次 access 的时间到当前的时间超过了你设置的 ttl 间隔,比如你配置的是 OnCreateAndWrite

那么就是创建和写操作之后的 1 天,这个 state 会变成 expired,具体的可以参考文档[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372062

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
63 3
|
1月前
|
SQL 存储 JSON
SQL,解析 json
SQL,解析 json
66 8
|
2月前
|
安全 Java API
【Java面试题汇总】Java基础篇——String+集合+泛型+IO+异常+反射(2023版)
String常量池、String、StringBuffer、Stringbuilder有什么区别、List与Set的区别、ArrayList和LinkedList的区别、HashMap底层原理、ConcurrentHashMap、HashMap和Hashtable的区别、泛型擦除、ABA问题、IO多路复用、BIO、NIO、O、异常处理机制、反射
【Java面试题汇总】Java基础篇——String+集合+泛型+IO+异常+反射(2023版)
|
3月前
|
存储 JSON API
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
——在成长的路上,我们都是同行者。这篇关于商品详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦! 淘宝API接口(如淘宝开放平台提供的API)允许开发者获取淘宝商品的各种信息,包括商品详情。然而,需要注意的是,直接访问淘宝的商品数据API通常需要商家身份或开发者权限,并且需要遵循淘宝的API使用协议。
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
|
2月前
|
JSON API 数据格式
requests库中json参数与data参数使用方法的深入解析
选择 `data`或 `json`取决于你的具体需求,以及服务器端期望接收的数据格式。
215 2
|
2月前
|
JSON 前端开发 JavaScript
解析JSON文件
解析JSON文件
122 9
|
1月前
|
JSON JavaScript API
商品详情数据接口解析返回的JSON数据(API接口整套流程)
商品详情数据接口解析返回的JSON数据是API接口使用中的一个重要环节,它涉及从发送请求到接收并处理响应的整个流程。以下是一个完整的API接口使用流程,包括如何解析返回的JSON数据:
|
2月前
|
存储 JSON API
Python编程:解析HTTP请求返回的JSON数据
使用Python处理HTTP请求和解析JSON数据既直接又高效。`requests`库的简洁性和强大功能使得发送请求、接收和解析响应变得异常简单。以上步骤和示例提供了一个基础的框架,可以根据你的具体需求进行调整和扩展。通过合适的异常处理,你的代码将更加健壮和可靠,为用户提供更加流畅的体验。
159 0
|
3月前
|
JSON API 数据格式
基于服务器响应的实时天气数据进行JSON解析的详细代码及其框架
【8月更文挑战第25天】这段资料介绍了一个使用Python从服务器获取实时天气数据并解析JSON格式数据的基本框架。主要分为三个部分:一是安装必要的`requests`库以发起HTTP请求获取数据,同时利用Python内置的`json`库处理JSON数据;二是提供了具体的代码实现,包括获取天气数据的`get_weather_data`函数和解析数据的`parse_weather_data`函数;三是对代码逻辑进行了详细说明,包括如何通过API获取数据以及如何解析这些数据来获取温度和天气描述等信息。用户需要根据实际使用的天气API调整代码中的API地址、参数和字段名称。
|
3月前
|
JSON 数据格式 索引
【Azure Developer】Azure Logic App 示例: 解析 Request Body 的 JSON 的表达式? triggerBody()?
【Azure Developer】Azure Logic App 示例: 解析 Request Body 的 JSON 的表达式? triggerBody()?

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多