flink问题之在通过TableFunction实现行转列时Row一直是空如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空


可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }

代码Problem2.java: package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;

/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 {

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{"type": "object", "properties": {"action": {"type": "string"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(action)) as T(word)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem"); } }


参考回答:

这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?

[1] https://issues.apache.org/jira/browse/FLINK-18002


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372052


问题二:flink1.10在通过TableFunction实现行转列时,Row一直是空


我的业务场景,是想实现一个行转列的效果。然后通过自定义tableFunction来实现。 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, 那么在eval方法接收到的就是Row[], 问题出在,Row[]中的数据获取不到,里面的元素都是NULL

通过下面的步骤和代码可还原车祸场景: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }

代码1:Problem.java package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;

/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem {

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode2", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable (\n" + " action ARRAY<\n" + " ROW<" + " actionID STRING,\n" + " actionName STRING\n" + " >\n" + " >\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json'\n" + ")"; bsEnv.sqlUpdate(ddlSource);

// Table table = bsEnv.sqlQuery("select action from actionTable"); Table table = bsEnv.sqlQuery("select * from actionTable, LATERAL TABLE(explode2(action)) as T(word)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print("==tb==");

bsEnv.execute("ARRAY tableFunction Problem"); } }

代码2:ExplodeFunction.java package com.flink;

import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row;

import java.util.ArrayList; import java.util.Arrays;

public class ExplodeFunction extends TableFunction {

public void eval(Row[] values) { System.out.println(values.length); if (values.length > 0) { for (Row row : values) { if (row != null) {// 这里debug出来的row总是空 ArrayList list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { Object field = row.getField(i); list.add(field); }

collector.collect(Row.of(Arrays.toString(list.toArray()))); } } } } }


参考回答:

当前还不支持 Row[] 作为参数。目前有一个 issue 在解决这个问题,可以关注下。 https://issues.apache.org/jira/browse/FLINK-17855


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372038


问题三:作业因为异常restart后,频繁OOM


目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os

kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。

如果单纯heap的状态后台,作业restart不会出现这样的问题。

有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job

restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。

请问大家是否有什么好的提议或者解决方法?

其中一次系统内核日志如下:

Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit

28672000kB, failcnt 11225

Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit

9007199254740988kB, failcnt 0

Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit

9007199254740988kB, failcnt 0

Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for

/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice:

cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K

B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB

unevictable:0KB

Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for

/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1

7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB

swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB

active_file:0KB unevictable:0KB

Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for

/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:

cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB

inactive_anon:0KB active_anon:28671760KB inactive_file:4KB

active_file:4KB

unevictable:0KB

Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss

nr_ptes swapents oom_score_adj name

Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 253 1

4 0 -998 pause

Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421

7 0 -998 bash

Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316

14500 0 -998 java

Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196

6 0 -998 tail

Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill

process 26824 (Window(Tumbling) score 4 or sacrifice child

Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java)

total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB

Looking forward to your reply and help.


参考回答:

很早以前遇到这个问题, standalone 模式下 metaspace 释放不掉, 感觉是一个比较严重的 bug

https://issues.apache.org/jira/browse/FLINK-11205 这边有过讨论


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371862


问题四:Flink job不定期就会重启,版本是1.9


Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?

2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.891 [flink-metrics-16] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:745) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) ... 9 common frames omitted


参考回答:

从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371674


问题五:flink1.9读取阿里Mq问题


flink1.9读取阿里RocketMQ 如何设置AccessKey,SecretKey 参数

finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000)....build();


参考回答:

社区版本的 Flink 应该默认没有和 RocketMQ 连接的 Connector,在 RocketMQ 的社区项目中看到和 Flink 整合的模块:

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

你说的 AccessKey,SecretKey 参数应该是 ACL 权限校验,看了代码应该是不支持的,不过可以自己去进行扩展。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
消息中间件 Kafka Apache
flink问题之Row一直是空如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
32 1
|
3月前
|
SQL 流计算
Flink SQL提供了行转列的功能,可以通过使用`UNPIVOT`操作来实现
【1月更文挑战第1天】Flink SQL提供了行转列的功能,可以通过使用`UNPIVOT`操作来实现
104 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
476 5
|
4月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
234 0
|
25天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1037 1
官宣|Apache Flink 1.19 发布公告
|
27天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
130 3
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
128 0
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
77 1
|
1月前
|
缓存 分布式计算 Apache
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
Apache Hudi与Apache Flink更好地集成,最新方案了解下?
57 0
|
1月前
|
监控 Apache 开发工具
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
Apache Flink 1.12.2集成Hudi 0.9.0运行指南
64 0

相关产品

  • 实时计算 Flink版