Flink SQL 编程(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 快速学习 Flink SQL 编程。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品  Flink SQL 编程(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10032


Flink SQL 编程(二)

 

四. 用 window 聚合和非 windo 聚合知识解决实际问题

需求1(filter)

统计出现在纽约的车。

那么如何计算出现在纽约的行车记录呢? 

刚刚在 rides 表里讲到有这样两个字段: 

lon:FLOAT      //经度

lat:FLOAT      //纬度

通过经度和纬度可以计算出在地图上的点,通过这个点可以得出行车记录是不是发生在纽约,所以要开发一个是否在纽约的 udf。

(开发 udf 时,运行时代码最好用 java 写。)

以下是一份已经做好的的代码:

package  com.dataartisans.udfs;

import org.apache.flink.table.functions.ScalarFunction;

import static com.dataartisans.udfs.util.GeoUtils.isInnyc:

/**

* Table API / SQL Scalar UDF to check if a coordinate is in NYC.

*/

public class IsInNYC extends ScaLarFunction {

public booLean eval(float Lon, float Lat){

return isInNYC(Lon, Lat);

解释一下这段 udf 的代码:

要实现一个 udf,首先要继承自 scalarfunction 这个类,位置在 package 的下面。

然后要声明一个 eval 方法。eval 的参数可以任意定义,这里的定义是两个 float 的类型的经度和纬度,返回的类型是一个 Boolean:是不是在纽约。

纽约的经度纬度如果在这个经纬度范围里,那么就可以确定哪条是发生在纽约的行车记录。

如果想让这个 udf 在 SQL 里面运转的话,首先需要打包:

输入

SQL-udfs git:(master) mvn clean package

打包完成之后, target 上会有一个 jar 包,包含了开发的这些类。然后把这个 jar 包拖到 SQL client  的 lab 上面。

在 yaml 的配置文件里定义刚刚开发的函数,定义的这个函数在 function 下面可以定义 N 个,我们只需要定义一个isInNYC 的函数,它的名字就是 isInNYC。这样就注册好了。

注册好了用 SQL 试一下:

输入

Flink SQL>show function

可以看到 isInNYC 已经在表里了:

Flink SQL> show functions;

timeDiff

toCoords

isInNYC

toAreaId

Drivers

然后用 udf 进行一些操作

输入

Flink SQL> select * from Rides where isInNYC(lon, lat); 

出现以下界面。

可以得出:出现在纽约的行车记录已经被选出。

接下来的需求是把它放到 view 里,因为后面要经常用到纽约这个需求。这样直接用 view 即可。

这就涉及到 CREATE VIEW 的语法。

需求2:

做一个无限流的聚合,计算搭载每种乘客数量的行车事件数。搭载一个乘客的行车数有多少个?大概两个乘客的行车数有多少个?

分析:Key 是每种乘客数量。它的 group by 里是没有开创这个函数的。一般来说,我们要用 count 去计算。

它的 query 写出来是比较简单的: 

1 SELECT

2 psgCnt,

3 COUNT(*)as cnt

4 FROM Rides

5 GROUP BY psgCnt;

需求3(Window Agg)

为了持续地监测城市的交通流量,计算每个区域每5分钟的进入的车辆数。

我们只关心纽约的区域交通情况,并且只关心至少有5辆车子进入的区域

toAreaId 里面的 udf 已经计算好了,通过经度和纬度我们能够定位到地球上一个区域Id。

写出它的 Query:

Select

toAreald(lon,lat),AS area,

isStart,

TUMBLE_END(rideTime,INTERVAL '5'MINUTE), AS window_end,

COUNT(*)as AS cnt

FROM Rides

WHERE isInNYC(lon, lat)

GROUP BY

toAreald(lon,lat),

isStart,

TUMBLE(rideTime,INTERVAL ‘5’MINUTE)

HAVING COUNT(*)>=5;

根据它的区域 ID 和五分钟的窗口来做聚合。

image.png

由图可见窗口的结束时间是不断在变化的,而且区域 ID 和 window

的 END 都没有重复的结果。印证了 window 聚合的特点:只输出了一

次,不会再重复输出。

需求4(write to Kafka)

将每10分钟的搭乘的乘客数写入 Kafka

结果表: Sink_TenMinPsgCnts 

Yaml表:

-name:Sink_ TenMinPsgCnts

type:sink

update-mode: append

schema:-name: cntStart

type:TIMESTAMP

-name:cntEnd

type:TIMESTAMP

-name:cnt

type:LONG  connector:

property-version:1

type: kafka

version: 0.11

topic:TenMinPsgCnts

startup-mode: earliest-offset

properties:

- key: zookeeper.connect

value: zookeeper:2181

-key:bootstrap.servers

value:kafka:9092

- key: group.id

value: trainingGroup

format:

property-version:1

type: json

输出模式是 append,因为 window 的输出没有更新。

Schema 有三个字段,起始时间、结束时间以及个数。

存在的类型是 kafka。

Properties 是位置的信息。

由 json 的方式储存。

Sink 表的定义和 source 表的定义,模式是一样的,只是 type 不一样。

Flink SQL > show tables;

DriverChanges

Fares

Rides

Sink_AreaCnts

Sink_TenMinPsgCnts

Flink SQL > describe Sink_TenMinPsgCnts

Root

I--cntStart:Timestamp

I--cntEnd:Timestamp

I-- cnt :long

针对需求,执行以下 query:

Flink SQL > SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE),

TUMBLE_END(rideTime, INTERVAL '10' MINUTE),

SUM(psgCnt) as cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

用 query 做出的结果,我们需要把它写到 kafka 里面

只需要在刚刚写出的 query 上面一行加上 INSERT INTO 这一语法

写到 Sink_TenMinPsgCnts 表里:

INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

SUM(psgCnt)as  cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);

然后用 kafka 的命令监控 kafka 这一 topic 执行输入的情况:

输入

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh   --boot

得出此表:

sql-training git:(master) ✘docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh--bootstrap-server kafka:9092--topic TenMinPsgCnts --from-beginning

[2019-04-29 12:55:47,317] WARN Error while fetching metadata with correlation id 2: {TenMinPsgCnts=LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient)

然后执行刚刚的 query

Flink SQL >INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

SUM(psgCnt)as  cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);

显示错误

因为 schema 不匹配,现在 Flink 在 INSERT INTO 的时候是强类型匹配,要输入进去一定要输入相同的类型,所以query 里还要加入 cast:

输入

Flink SQL >INSERT INTO Sink_TenMinPsgCnts

SELECT

TUMBLE_START(rideTime,INTERVAL'10'MINUTE), TUMBLE_END(rideTime,INTERVAL '10' MINUTE),

CAST(SUM(psgCnt)as BIGINT) as cnt

FROM Rides

GROUP BY TUMBLE(rideTime, INTERVAL '10'MINUTE);  

这样就提交成功了。 

打开 docker 可以看到这边结果已经输出了:

cntStart":1357008600000,"cntEnd":1357009200000,"cnt":180721} cntStart":1357009200000,"cntEnd":1357009800000,"cnt":18024} cntStart":1357009800000,"cntEnd":1357010400000,"cnt":172017} cntStart":1357010400000,"cntEnd":1357011000000,"cnt":16870} cntStart":1357011000000,"cntEnd":1357011600000,"cnt":15888}

cntStart":1357011600000,"cntEnd":1357012200000,"cnt":15221} cntStart":1357012200000,"cntEnd":1357012800000,"cnt":14427}

说明已经成功地把 query 的结果插入到 kafka 里面了。

然后看 job 的运行情况

最上面一行是刚刚提交的 query。

下面的框里面,第一个是从 source 里面读,然后做了一个 window,后面接了一个 sink 连到 kafka 里面。

需求5(write to ES)

从每个区域出发的行车数,写入到ES。

结果表:Sink AreaCnts

要根据行车区域(区域 id)来做聚合,也就是统计行车数

写一下 query: 

SELECT

toAreaId(lon, lat) as area,

COUNT(*)as cnt

FROM Rides

GROUP BY toAreaId(lon, lat)

可以看出跟之前的 group by query 不太一样,之前是 window 的聚合,每次只产生一个结果 ,不会更新,输出的是append 的模式。

group by 没有窗口,是在无限流上的聚合,输出的结果是 update 的结果,是不断的更新之前的结果。所以后面如果要接 sink,也是 update 的 sink。

看 yaml 文件的配置: 

-name: Sink_AreaCnts

type: sink

update-mode: upsert

schema:

-name: areaId

type: INT

-name: cnt

type:LONG

connector:

type: elasticsearch

version: 6

hosts

-hostname: "elasticsearch"

port: 9200

protocol:"http"

index:"area-cnts"

document-type:"areacnt"

key-delimiter: "$"

format:

property-version:1

type: json

update-mode 是 upsert 的mode,所以它能够接受非 window 聚合的结果。

Schema 里有两个字段,一个是 areaId,一个是 Cnt。

定义 elasticsearch 的索引的名字为 area-cnts。

Format 类型是 json。

由此可以看出跟之前的主要区别点是 update-mode 是 upsert。目前 elasticsearch 支持 upsert 模式,但是像 kafka是不支持的。所以如果要把 group 的聚合写到结果表里,需要选一个可存储的结果表。

如果要把 query 写入结果表的话,要在前面加上 INSERT INTO Sink_AreaCnts

因为原 query 数据量过大,所以要加一个限制:where isInNYC (lon,lat)

Flink SQL >INSERT INTO Sink_AreaCnts

SELECT

toAreaId(lon, lat) as areaId,

COUNT(*)as cnt

FROM Rides

Where isInNYC(lon, lat)

GROUP BY toAreaId(lon, lat)

就可以得出需求5的答案了。

 

五. 答疑环节

1,问:请问统计用户当前点击事件发生时间倒推过去一小时内的点击数,用什么窗口?比如用户在13:24发生的一个点击事件要统计12:24到13:24的点击数。

答:目前 Flink SQL 的窗口还不支持这种功能。 

2,问:一个表既是 source 表又充当 sink 表该怎么定义?

答:目前在 SQL client 里,如果一个表又是 source 又是 sink,在老版本里需要定义两遍,在新的版本里面 Type 等于both 可以定义这种类型。 

3,问:无限流聚合没有窗口函数,SQL client 中如何设置 State TTL?如果不能设置,如何解决?

答:SQL client 中的配置 execution的item,里面可以配置 min 和 ma x 的 idle-state 的时间,用这两个参数和用代码写是一样的效果。

4,问:Flink SQL 批处理数据采用什么方式定时调度

答:目前不支持定时调度。

5,问:多表进行关联是需要依次生成中间表关联还是直接多表关联

答:可以直接多表进行关联。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
|
SQL Java 关系型数据库
实时计算 Flink版操作报错合集之通过flink sql形式同步数据到hudi中,本地启动mian方法报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
17 8
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之flink sql ROW_NUMBER()回退更新的机制,有相关文档介绍吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
11 1
|
2天前
|
SQL 流计算 API
实时计算 Flink版产品使用合集之ClickHouse-JDBC 写入数据时,发现写入的目标表名称与 PreparedStatement 中 SQL 的表名不一致如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
8 0
|
2天前
|
SQL 数据处理 API
实时计算 Flink版产品使用合集之遇到SQL Server锁表问题如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
8 0
|
2天前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
18 1
|
4天前
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
209 2
|
4天前
|
SQL 消息中间件 分布式数据库
flink sql问题之连接HBase报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
201 0
|
4天前
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
46 3
|
4天前
|
SQL Java API
Flink SQL 问题之窗口函数报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
24 1
|
4天前
|
SQL 消息中间件 关系型数据库
Flink SQL 问题之提交执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
97 2