Flink SQL问题之复杂JSON解析如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:ddl es 报错


源码如下:

CREATE TABLE buy_cnt_per_hour (

 hour_of_day BIGINT,

 buy_cnt BIGINT

) WITH (

 'connector.type' = 'elasticsearch',

 'connector.version' = '6',

 'connector.hosts' = 'http://localhost:9200',

 'connector.index' = 'buy_cnt_per_hour',

 'connector.document-type' = 'user_behavior',

 'connector.bulk-flush.max-actions' = '1',

 'format.type' = 'json',

 'update-mode' = 'append'

)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

public class ESTest {

public static void main(String[] args) throws Exception {

//2、设置运行环境

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

streamEnv.setParallelism(1);

String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,buy_cnt BIGINT "

  • ") WITH ( 'connector.type' = 'elasticsearch','connector.version' = '6',"
  • "'connector.hosts' = 'http://localhost:9200','connector.index' = 'buy_cnt_per_hour',"
  • "'connector.document-type' = 'user_behavior',"
  • "'connector.bulk-flush.max-actions' = '1',\n" + "'format.type' = 'json',"
  • "'update-mode' = 'append' )";

tableEnv.sqlUpdate(sinkDDL);

Table table = tableEnv.sqlQuery("select * from test_es ");

tableEnv.toRetractStream(table, Row.class).print();

streamEnv.execute("");

}

}

具体error

The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'elasticsearch' 'format.type' expects 'csv', but is 'json'The following properties are requested: connector.bulk-flush.max-actions=1 connector.document-type=user_behavior connector.hosts=http://localhost:9200 connector.index=buy_cnt_per_hour connector.type=elasticsearch connector.version=6 format.type=json schema.0.data-type=BIGINT schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt update-mode=append


参考回答:

这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作

真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。

而tableEnv.toRetractStream(table, Row.class).print();

这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372141


问题二:DataStream统计uv问题


大家好!

     想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:

     1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,

          这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。

     2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。

DataStream<UvPer10Min&gt; uvPer10MinDataStream = userBehaviorSource

.windowAll(TumblingProcessingTimeWindows.of(Time.days(1L)))

.trigger(CountTrigger.of(1L))

.evictor(CountEvictor.of(0L, true))

.process(new ProcessAllWindowFunction<UserBehavior, UvPer10Min, TimeWindow&gt;() {

private transient MapState<String, String&gt; userIdState;

private transient ValueState<Long&gt; uvCountState;

&nbsp; &nbsp;


参考回答:

我建议你用ContinuousEventTimeTrigger,可以在窗口范围内,连续触发。

你这个countTrigger,促发次数太多了,而且你后面是processWindowFunction,导致计算压力比较大。

建议你用aggregateWindowFuntion


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372140


问题三:Flink SQL如何将多个表的查询结果(列不同)聚合成一张表


select a.table_tmp1.r1 / a.table_tmp2.r2

这个是对同一行的数据进行操作,所以你需要先对table_tmp1和table_tmp2做一个join,将两个表的数据根据条件合并成一张表。

zilong xiao <acidzz163@gmail.com> 于2020年7月8日周三 下午8:55写道:

> 列如下面这样,需要查询table1 & table2,分别查询不同的字段

> 在最外层做比值,flink貌似语法检查不通过,应该怎么写这样的SQL呢,有前辈可以指导下不~

> select a.table_tmp1.r1 / a.table_tmp2.r2 as value0 from

> (

> (SELECT r1 FROM table1) AS table_tmp1, (SELECT r2 FROM table2) AS

> table_tmp2,

> )as a


参考回答:

你得有个join条件连接两张表的


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372135


问题四:一个source多个sink的同步问题


是1个小时才到来。10:00- 11:00的数据,11:01分到来。

但是现在的问题是这个数据来了,我的第一个sink马上就保存到数据库了, 11:02进数据库。但是第二个sink,因为有tumble window,所以10:00- 11:00的数据,需要到12:01,才会触发这个窗口。


参考回答:

窗口的触发逻辑就是这样的,必须watermark达到了窗口结束时间才会触发,可能10-11点的窗口中的数据最大只有10:59呢


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372131


问题五:Flink SQL复杂JSON解析


像如下这种JSON输入,

{

"id": 1,

"many_names": [

{"name": "foo"},

{"name": "bar"}

]

}

输出表两行 id 1, name foo | id 1, name bar

最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?

还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。

来自 Outlookhttp://aka.ms/weboutlook


参考回答:

我理解最佳实践是第一种,先读出来array,再用table function展开成多行。 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins 


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372126

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
860 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
321 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
937 1
|
9月前
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
599 4
JSON数据解析实战:从嵌套结构到结构化表格
|
11月前
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
11月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
2055 27
|
11月前
|
JSON 小程序 UED
微信小程序 app.json 配置文件解析与应用
本文介绍了微信小程序中 `app.json` 配置文件的详细
1718 12
|
11月前
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
453 12
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
913 14
|
12月前
|
JSON JavaScript 前端开发
一次采集JSON解析错误的修复
两段采集来的JSON格式数据存在格式问题,直接使用PHP的`json_decode`会报错。解决思路包括:1) 手动格式化并逐行排查错误;2) 使用PHP-V8JS扩展在JavaScript环境中解析。具体方案一是通过正则表达式和字符串替换修复格式,方案二是利用V8Js引擎执行JS代码并返回JSON字符串,最终实现正确解析。 简介: 两段采集的JSON数据因掺杂JavaScript代码导致PHP解析失败。解决方案包括手动格式化修复和使用PHP-V8JS扩展在JavaScript环境中解析,确保JSON数据能被正确处理。

热门文章

最新文章

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多
  • DNS