Flink SQL问题之复杂JSON解析如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:ddl es 报错


源码如下:

CREATE TABLE buy_cnt_per_hour (

 hour_of_day BIGINT,

 buy_cnt BIGINT

) WITH (

 'connector.type' = 'elasticsearch',

 'connector.version' = '6',

 'connector.hosts' = 'http://localhost:9200',

 'connector.index' = 'buy_cnt_per_hour',

 'connector.document-type' = 'user_behavior',

 'connector.bulk-flush.max-actions' = '1',

 'format.type' = 'json',

 'update-mode' = 'append'

)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "

  • ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
  • "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
  • "'connector.document-type' = 'user_behavior',"
  • "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
  • "'update-mode' = 'append' )";

tableEnv.sqlUpdate(sinkDDL);

Table table = tableEnv.sqlQuery("select * from test_es ");

tableEnv.toRetractStream(table, Row.class).print();

streamEnv.execute("");

}

}

具体error

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append


参考回答:

这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作

真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。

而tableEnv.toRetractStream(table, Row.class).print();

这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372141


问题二:DataStream统计uv问题


大家好!

     想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:

     1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,

          这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。

     2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。

DataStream<UvPer10Min&gt; uvPer10MinDataStream = userBehaviorSource

.windowAll(TumblingProcessingTimeWindows.of(Time.days(1L)))

.trigger(CountTrigger.of(1L))

.evictor(CountEvictor.of(0L, true))

.process(new ProcessAllWindowFunction<UserBehavior, UvPer10Min, TimeWindow&gt;() {

private transient MapState<String, String&gt; userIdState;

private transient ValueState<Long&gt; uvCountState;

&nbsp; &nbsp;


参考回答:

我建议你用ContinuousEventTimeTrigger,可以在窗口范围内,连续触发。

你这个countTrigger,促发次数太多了,而且你后面是processWindowFunction,导致计算压力比较大。

建议你用aggregateWindowFuntion


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372140


问题三:Flink SQL如何将多个表的查询结果(列不同)聚合成一张表


select a.table_tmp1.r1 / a.table_tmp2.r2

这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。

zilong xiao <acidzz163@gmail.com> 于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段

> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~

> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from

> (

> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS

> table_tmp2,

> )as a


参考回答:

你得有个join条件连接两张表的


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372135


问题四:一个source多个sink的同步问题


是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。


参考回答:

窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372131


问题五:Flink SQL复杂JSON解析


像如下这种JSON输入,

{

"id": 1,

"many_names": [

{"name": "foo"},

{"name": "bar"}

]

}

输出表两行 id 1, name foo | id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?

还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。

来自 Outlookhttp://aka.ms/weboutlook


参考回答:

我理解最佳实践是第一种,先读出来array,再用table function展开成多行。 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372126

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
JSON JavaScript 前端开发
C++ 智能指针与 JSON 处理:高级编程技巧与常见问题解析
C++ 智能指针与 JSON 处理:高级编程技巧与常见问题解析
269 0
|
5天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
|
15天前
|
存储 JSON JavaScript
「Python系列」Python JSON数据解析
在Python中解析JSON数据通常使用`json`模块。`json`模块提供了将JSON格式的数据转换为Python对象(如列表、字典等)以及将Python对象转换为JSON格式的数据的方法。
31 0
|
1月前
|
JSON JavaScript 数据格式
【深入探究C++ JSON库】解析JSON元素的层级管理与遍历手段
【深入探究C++ JSON库】解析JSON元素的层级管理与遍历手段
95 2
|
1月前
|
XML JSON API
深入解析C++ JSON库:nlohmann::json:: parse的内部机制与应用
深入解析C++ JSON库:nlohmann::json:: parse的内部机制与应用
52 0
|
1月前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
107 0
|
1月前
|
SQL 存储 Kubernetes
Seata常见问题之mybatisplus的批量插入方法报SQL错误如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
26 0
|
1月前
|
JSON 数据格式
人脸检测解析json的工具类face_test
人脸检测解析json的工具类face_test
14 0
|
1月前
|
SQL 关系型数据库 API
Star 4.7k!高效SQL Parser!纯Python开发!自称目前最快的纯Python SQL解析器!
Star 4.7k!高效SQL Parser!纯Python开发!自称目前最快的纯Python SQL解析器!
|
1月前
|
JSON JavaScript 前端开发
如何在Python中解析JSON响应?
【2月更文挑战第26天】【2月更文挑战第92篇】如何在Python中解析JSON响应?

相关产品

  • 实时计算 Flink版