Flink SQL 编程(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
Elasticsearch Serverless通用抵扣包,测试体验金 200元
简介: 快速学习 Flink SQL 编程。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品  Flink SQL 编程(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10032


Flink SQL 编程(二)

 

四. 用 window 聚合和非 windo 聚合知识解决实际问题

需求1(filter)

统计出现在纽约的车。

那么如何计算出现在纽约的行车记录呢? 

刚刚在 rides 表里讲到有这样两个字段: 

lon:FLOAT      //经度

lat:FLOAT      //纬度

通过经度和纬度可以计算出在地图上的点,通过这个点可以得出行车记录是不是发生在纽约,所以要开发一个是否在纽约的 udf。

(开发 udf 时,运行时代码最好用 java 写。)

以下是一份已经做好的的代码:

package  com.dataartisans.udfs;

import org.apache.flink.table.functions.ScalarFunction;

import static com.dataartisans.udfs.util.GeoUtils.isInnyc:

/**

* Table API / SQL Scalar UDF to check if a coordinate is in NYC.

*/

public class IsInNYC extends ScaLarFunction {

public booLean eval(float Lon, float Lat){

return isInNYC(Lon, Lat);

解释一下这段 udf 的代码:

要实现一个 udf,首先要继承自 scalarfunction 这个类,位置在 package 的下面。

然后要声明一个 eval 方法。eval 的参数可以任意定义,这里的定义是两个 float 的类型的经度和纬度,返回的类型是一个 Boolean:是不是在纽约。

纽约的经度纬度如果在这个经纬度范围里,那么就可以确定哪条是发生在纽约的行车记录。

如果想让这个 udf 在 SQL 里面运转的话,首先需要打包:

输入

SQL-udfs git:(master) mvn clean package

打包完成之后, target 上会有一个 jar 包,包含了开发的这些类。然后把这个 jar 包拖到 SQL client  的 lab 上面。

在 yaml 的配置文件里定义刚刚开发的函数,定义的这个函数在 function 下面可以定义 N 个,我们只需要定义一个isInNYC 的函数,它的名字就是 isInNYC。这样就注册好了。

注册好了用 SQL 试一下:

输入

Flink SQL>show function

可以看到 isInNYC 已经在表里了:

Flink SQL> show functions;

timeDiff

toCoords

isInNYC

toAreaId

Drivers

然后用 udf 进行一些操作

输入

Flink SQL> select * from Rides where isInNYC(lon, lat); 

出现以下界面。

可以得出:出现在纽约的行车记录已经被选出。

接下来的需求是把它放到 view 里,因为后面要经常用到纽约这个需求。这样直接用 view 即可。

这就涉及到 CREATE VIEW 的语法。

需求2:

做一个无限流的聚合,计算搭载每种乘客数量的行车事件数。搭载一个乘客的行车数有多少个?大概两个乘客的行车数有多少个?

分析:Key 是每种乘客数量。它的 group by 里是没有开创这个函数的。一般来说,我们要用 count 去计算。

它的 query 写出来是比较简单的: 

1 SELECT

2 psgCnt,

3 COUNT(*)as cnt

4 FROM Rides

5 GROUP BY psgCnt;

需求3(Window Agg)

为了持续地监测城市的交通流量,计算每个区域每5分钟的进入的车辆数。

我们只关心纽约的区域交通情况,并且只关心至少有5辆车子进入的区域

toAreaId 里面的 udf 已经计算好了,通过经度和纬度我们能够定位到地球上一个区域Id。

写出它的 Query:

Select

toAreald(lon,lat),AS area,

isStart,

TUMBLE_END(rideTime,INTERVAL '5'MINUTE), AS window_end,

COUNT(*)as AS cnt

FROM Rides

WHERE isInNYC(lon, lat)

GROUP BY

toAreald(lon,lat),

isStart,

TUMBLE(rideTime,INTERVAL ‘5’MINUTE)

HAVING COUNT(*)>=5;

根据它的区域 ID 和五分钟的窗口来做聚合。

image.png

由图可见窗口的结束时间是不断在变化的,而且区域 ID 和 window

的 END 都没有重复的结果。印证了 window 聚合的特点:只输出了一

次,不会再重复输出。

需求4(write to Kafka)

将每10分钟的搭乘的乘客数写入 Kafka

结果表: Sink_TenMinPsgCnts 

Yaml表:

-name:Sink_ TenMinPsgCnts

type:sink

update-mode: append

schema:-name: cntStart

type:TIMESTAMP

-name:cntEnd

type:TIMESTAMP

-name:cnt

type:LONG  connector:

property-version:1

type: kafka

version: 0.11

topic:TenMinPsgCnts

startup-mode: earliest-offset

properties:

- key: zookeeper.connect

value: zookeeper:2181

-key:bootstrap.servers

value:kafka:9092

- key: group.id

value: trainingGroup

format:

property-version:1

type: json

输出模式是 append,因为 window 的输出没有更新。

Schema 有三个字段,起始时间、结束时间以及个数。

存在的类型是 kafka。

Properties 是位置的信息。

由 json 的方式储存。

Sink 表的定义和 source 表的定义,模式是一样的,只是 type 不一样。

Flink SQL > show tables;

DriverChanges

Fares

Rides

Sink_AreaCnts

Sink_TenMinPsgCnts

Flink SQL > describe Sink_TenMinPsgCnts

Root

I--cntStart:Timestamp

I--cntEnd:Timestamp

I-- cnt :long

针对需求,执行以下 query:

Flink SQL > SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE),

TUMBLE_END(rideTime, INTERVAL '10' MINUTE),

SUM(psgCnt) as cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

用 query 做出的结果,我们需要把它写到 kafka 里面

只需要在刚刚写出的 query 上面一行加上 INSERT INTO 这一语法

写到 Sink_TenMinPsgCnts 表里:

INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

SUM(psgCnt)as  cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);

然后用 kafka 的命令监控 kafka 这一 topic 执行输入的情况:

输入

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh   --boot

得出此表:

sql-training git:(master) ✘docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh--bootstrap-server kafka:9092--topic TenMinPsgCnts --from-beginning

[2019-04-29 12:55:47,317] WARN Error while fetching metadata with correlation id 2: {TenMinPsgCnts=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)

然后执行刚刚的 query

Flink SQL >INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

SUM(psgCnt)as  cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);

显示错误

因为 schema 不匹配,现在 Flink 在 INSERT INTO 的时候是强类型匹配,要输入进去一定要输入相同的类型,所以query 里还要加入 cast:

输入

Flink SQL >INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

CAST(SUM(psgCnt)as BIGINT) as cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);  

这样就提交成功了。 

打开 docker 可以看到这边结果已经输出了:

cntStart":1357008600000,"cntEnd":1357009200000,"cnt":180721} cntStart":1357009200000,"cntEnd":1357009800000,"cnt":18024} cntStart":1357009800000,"cntEnd":1357010400000,"cnt":172017} cntStart":1357010400000,"cntEnd":1357011000000,"cnt":16870} cntStart":1357011000000,"cntEnd":1357011600000,"cnt":15888}

cntStart":1357011600000,"cntEnd":1357012200000,"cnt":15221} cntStart":1357012200000,"cntEnd":1357012800000,"cnt":14427}

说明已经成功地把 query 的结果插入到 kafka 里面了。

然后看 job 的运行情况

最上面一行是刚刚提交的 query。

下面的框里面,第一个是从 source 里面读,然后做了一个 window,后面接了一个 sink 连到 kafka 里面。

需求5(write to ES)

从每个区域出发的行车数,写入到ES。

结果表:Sink AreaCnts

要根据行车区域(区域 id)来做聚合,也就是统计行车数

写一下 query: 

SELECT

toAreaId(lon, lat) as area,

COUNT(*)as cnt

FROM Rides

GROUP BY toAreaId(lon, lat)

可以看出跟之前的 group by query 不太一样,之前是 window 的聚合,每次只产生一个结果 ,不会更新,输出的是append 的模式。

group by 没有窗口,是在无限流上的聚合,输出的结果是 update 的结果,是不断的更新之前的结果。所以后面如果要接 sink,也是 update 的 sink。

看 yaml 文件的配置: 

-name: Sink_AreaCnts

type: sink

update-mode: upsert

schema:

-name: areaId

type: INT

-name: cnt

type:LONG

connector:

type: elasticsearch

version: 6

hosts

-hostname: "elasticsearch"

port: 9200

protocol:"http"

index:"area-cnts"

document-type:"areacnt"

key-delimiter: "$"

format:

property-version:1

type: json

update-mode 是 upsert 的mode,所以它能够接受非 window 聚合的结果。

Schema 里有两个字段,一个是 areaId,一个是 Cnt。

定义 elasticsearch 的索引的名字为 area-cnts。

Format 类型是 json。

由此可以看出跟之前的主要区别点是 update-mode 是 upsert。目前 elasticsearch 支持 upsert 模式,但是像 kafka是不支持的。所以如果要把 group 的聚合写到结果表里,需要选一个可存储的结果表。

如果要把 query 写入结果表的话,要在前面加上 INSERT INTO Sink_AreaCnts

因为原 query 数据量过大,所以要加一个限制:where isInNYC (lon,lat)

Flink SQL >INSERT INTO Sink_AreaCnts

SELECT

toAreaId(lon, lat) as areaId,

COUNT(*)as cnt

FROM Rides

Where isInNYC(lon, lat)

GROUP BY toAreaId(lon, lat)

就可以得出需求5的答案了。

 

五. 答疑环节

1,问:请问统计用户当前点击事件发生时间倒推过去一小时内的点击数,用什么窗口?比如用户在13:24发生的一个点击事件要统计12:24到13:24的点击数。

答:目前 Flink SQL 的窗口还不支持这种功能。 

2,问:一个表既是 source 表又充当 sink 表该怎么定义?

答:目前在 SQL client 里,如果一个表又是 source 又是 sink,在老版本里需要定义两遍,在新的版本里面 Type 等于both 可以定义这种类型。 

3,问:无限流聚合没有窗口函数,SQL client 中如何设置 State TTL?如果不能设置,如何解决?

答:SQL client 中的配置 execution的item,里面可以配置 min 和 ma x 的 idle-state 的时间,用这两个参数和用代码写是一样的效果。

4,问:Flink SQL 批处理数据采用什么方式定时调度

答:目前不支持定时调度。

5,问:多表进行关联是需要依次生成中间表关联还是直接多表关联

答:可以直接多表进行关联。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
23天前
|
算法 数据挖掘 数据库
通过 SQL 快速使用 OceanBase 向量检索学习笔记
通过 SQL 快速使用 OceanBase 向量检索学习笔记
|
29天前
|
SQL 数据库
SQL 学习笔记 - 多表关系与多表查询
数据库多表关系包括一对多、多对多和一对一,常用外键关联。多表查询方式有隐式/显式内连接、外连接、子查询等,支持别名和条件筛选。子查询分为标量、列、行、表子查询,常用于复杂查询场景。
|
7月前
|
SQL 数据库连接 Linux
数据库编程:在PHP环境下使用SQL Server的方法。
看看你吧,就像一个调皮的小丑鱼在一片广阔的数据库海洋中游弋,一路上吞下大小数据如同海中的珍珠。不管有多少难关,只要记住这个流程,剩下的就只是探索未知的乐趣,沉浸在这个充满挑战的数据库海洋中。
149 16
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
385 15
|
9月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1684 27
|
10月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
715 14
|
11月前
|
SQL 数据挖掘 Python
数据分析编程:SQL,Python or SPL?
数据分析编程用什么,SQL、python or SPL?话不多说,直接上代码,对比明显,明眼人一看就明了:本案例涵盖五个数据分析任务:1) 计算用户会话次数;2) 球员连续得分分析;3) 连续三天活跃用户数统计;4) 新用户次日留存率计算;5) 股价涨跌幅分析。每个任务基于相应数据表进行处理和计算。
|
12月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
226 0
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
379 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄

热门文章

最新文章