Flink sql 问题之主动使数据延时一段时间如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Flink sql 主动使数据延时一段时间有什么方案


我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。

FLink sql有什么方案实现吗?


参考回答:

我们也遇到过类似场景。

如果你的数据里面有事件时间,可以写个udf来判断下,如果事件时间-当前时间 小于某个阈值,可以sleep一下。

如果没有事件时间,那就不太好直接搞了,我们是自己搞了一个延迟维表,就是保证每条数据进到维表join算子后等固定时间后再去join。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370136


问题二:Flink SQL 关键字 user?


我发现我创建表 字段名为 user会报错。 user是关键字吗,还是其他原因

"CREATE TABLE ods_foo (\n" +

" id INT,\n" +

" user ARRAY<ROW >\n" +

") WITH (

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "user" at line 3, column 5.

Was expecting one of:


参考回答:

是的,user是关键字,关键字列表可以参考[1].

如果遇到关键字,可以使用 ` 来处理,比如:

CREATE TABLE user (...) WITH (...);

[1]

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/#reserved-keywords


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370135


问题三:flink1.10 定义表时,把json数组声明成STRING类型的,查询出来是空


可以通过以下步骤还原车祸现场: kafka topic: test_action kafka message: {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] }

代码Problem2.java: package com.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row;

/** * * 在自定义tableFunction的时候,如果SQL中传入的是ARRAY类型, * 那么在eval方法接收到的就是Row[], * 问题出在,Row[]中的数据获取不到,里面的元素都是NULL * * 现在思路:就是在定义表的时候,把ARRYA看成STRING, * 现在的问题,就是查询出来,都是空 * * kafka topic: test_action * * kafka message: * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": "id002", "actionName": "bbb"} ] } */ public class Problem2 {

public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, envSettings); bsEnv.registerFunction("explode3", new ExplodeFunction());

String ddlSource = "CREATE TABLE actionTable3 (\n" + " action STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = '0.11',\n" + " 'connector.topic' = 'test_action',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'update-mode' = 'append',\n" + " 'format.type' = 'json',\n" + " 'format.derive-schema' = 'false',\n" + " 'format.json-schema' = '{"type": "object", "properties": {"action": {"type": "string"} } }'" + ")"; System.out.println(ddlSource); bsEnv.sqlUpdate(ddlSource);

Table table = bsEnv.sqlQuery("select * from actionTable3"); // Table table = bsEnv.sqlQuery("select * from actionTable2, LATERAL TABLE(explode3(action)) as T(word)"); table.printSchema(); bsEnv.toAppendStream(table, Row.class) .print();// 输出都是空

bsEnv.execute("ARRAY tableFunction Problem"); } }


参考回答:

这是一个已知问题[1],你可以看下这个issue,是否可以解决你的问题?

[1] https://issues.apache.org/jira/browse/FLINK-18002


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370134


问题四:【Flink的shuffle mode】


看Flink源码时候,在应用中使用keyBy后,源码的transformations会有shuffle mode方法,这个shuffle mode看是UNDEFINED的。

那么,shuffle mode有哪些方式?在应用里面可以设置么?


参考回答:

现在就两种:pipeline和batch

batch的话是block住,直到执行完毕才发给下游的,所以这个shuffle mode一般只对批作业有用。

理论上可以per transformation的来设置,see PartitionTransformation.


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370133


问题五:Flink job不定期就会重启,版本是1.9


Flink job经常不定期重启,看了异常日志基本都是下面这种,可以帮忙解释下什么原因吗?

2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-akka.actor.default-dispatcher-27] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-akka.remote.default-remote-dispatcher-22 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.875 [flink-metrics-16] INFO akka.remote.RemoteActorRefProvider$RemotingTerminator flink-metrics-akka.remote.default-remote-dispatcher-14 - Remoting shut down. 2020-07-01 20:20:43.891 [flink-metrics-16] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. 2020-07-01 20:20:43.895 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit code 2. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235) at java.lang.Thread.run(Thread.java:745) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#-781959047]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for AskTimeoutException is that the recipient actor didn't send a reply. at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) ... 9 common frames omitted


参考回答:

从报错信息看是 Akka 的 RPC 调用超时,因为是 LocalFencedMessage 所以基本上可以排除网络问题。 建议看一下 JM 进程的 GC 压力以及线程数量,是否存在压力过大 RPC 来不及响应的情况。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/370132

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
186 61
|
16天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
90 14
|
2月前
|
SQL 存储 缓存
SQL Server 数据太多如何优化
11种优化方案供你参考,优化 SQL Server 数据库性能得从多个方面着手,包括硬件配置、数据库结构、查询优化、索引管理、分区分表、并行处理等。通过合理的索引、查询优化、数据分区等技术,可以在数据量增大时保持较好的性能。同时,定期进行数据库维护和清理,保证数据库高效运行。
|
3月前
|
SQL 移动开发 Oracle
SQL语句实现查询连续六天数据的方法与技巧
在数据库查询中,有时需要筛选出符合特定时间连续性条件的数据记录
|
3月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
3月前
|
SQL 存储 关系型数据库
添加数据到数据库的SQL语句详解与实践技巧
在数据库管理中,添加数据是一个基本操作,它涉及到向表中插入新的记录
|
3月前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
87 1
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
62 0
|
3月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
61 0

相关产品

  • 实时计算 Flink版