Flink SQL问题之复杂JSON解析如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:ddl es 报错


源码如下:

CREATE TABLE buy_cnt_per_hour (

 hour_of_day BIGINT,

 buy_cnt BIGINT

) WITH (

 'connector.type' = 'elasticsearch',

 'connector.version' = '6',

 'connector.hosts' = 'http://localhost:9200',

 'connector.index' = 'buy_cnt_per_hour',

 'connector.document-type' = 'user_behavior',

 'connector.bulk-flush.max-actions' = '1',

 'format.type' = 'json',

 'update-mode' = 'append'

)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "

  • ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
  • "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
  • "'connector.document-type' = 'user_behavior',"
  • "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
  • "'update-mode' = 'append' )";

tableEnv.sqlUpdate(sinkDDL);

Table table = tableEnv.sqlQuery("select * from test_es ");

tableEnv.toRetractStream(table, Row.class).print();

streamEnv.execute("");

}

}

具体error

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append


参考回答:

这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作

真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。

而tableEnv.toRetractStream(table, Row.class).print();

这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372141


问题二:DataStream统计uv问题


大家好!

     想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:

     1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,

          这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。

     2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。

DataStream<UvPer10Min&gt; uvPer10MinDataStream = userBehaviorSource

.windowAll(TumblingProcessingTimeWindows.of(Time.days(1L)))

.trigger(CountTrigger.of(1L))

.evictor(CountEvictor.of(0L, true))

.process(new ProcessAllWindowFunction<UserBehavior, UvPer10Min, TimeWindow&gt;() {

private transient MapState<String, String&gt; userIdState;

private transient ValueState<Long&gt; uvCountState;

&nbsp; &nbsp;


参考回答:

我建议你用ContinuousEventTimeTrigger,可以在窗口范围内,连续触发。

你这个countTrigger,促发次数太多了,而且你后面是processWindowFunction,导致计算压力比较大。

建议你用aggregateWindowFuntion


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372140


问题三:Flink SQL如何将多个表的查询结果(列不同)聚合成一张表


select a.table_tmp1.r1 / a.table_tmp2.r2

这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。

zilong xiao <acidzz163@gmail.com> 于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段

> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~

> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from

> (

> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS

> table_tmp2,

> )as a


参考回答:

你得有个join条件连接两张表的


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372135


问题四:一个source多个sink的同步问题


是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。


参考回答:

窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372131


问题五:Flink SQL复杂JSON解析


像如下这种JSON输入,

{

"id": 1,

"many_names": [

{"name": "foo"},

{"name": "bar"}

]

}

输出表两行 id 1, name foo | id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?

还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。

来自 Outlookhttp://aka.ms/weboutlook


参考回答:

我理解最佳实践是第一种,先读出来array,再用table function展开成多行。 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372126

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4天前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之怎么解析 MySQL DDL 语句
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
20 2
|
4天前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
19 1
|
2天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错显示“Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT and DELETE"是什么意思
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
17 0
|
2天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
38 0
|
2天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
16 1
|
3天前
|
SQL Java 关系型数据库
实时计算 Flink版操作报错合集之通过flink sql形式同步数据到hudi中,本地启动mian方法报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
22 8
|
3天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之flink sql ROW_NUMBER()回退更新的机制,有相关文档介绍吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
4天前
|
SQL 流计算 API
实时计算 Flink版产品使用合集之ClickHouse-JDBC 写入数据时,发现写入的目标表名称与 PreparedStatement 中 SQL 的表名不一致如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
9 0
|
4天前
|
SQL 数据处理 API
实时计算 Flink版产品使用合集之遇到SQL Server锁表问题如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
10 0
|
4天前
|
消息中间件 关系型数据库 网络安全
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1

热门文章

最新文章

  • 1
    实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
    15
  • 2
    实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
    23
  • 3
    实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
    13
  • 4
    实时计算 Flink版操作报错合集之报错显示“Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT and DELETE"是什么意思
    17
  • 5
    实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
    16
  • 6
    实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
    12
  • 7
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    17
  • 8
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    21
  • 9
    实时计算 Flink版操作报错合集之查询sqlserver ,全量阶段出现报错如何解决
    16
  • 10
    实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
    38
  • 相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多