01 引言
在前面的博客,我们学习了Flink
的TableAP和SQL
,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
- 《Flink教程(09)- Flink批流一体API(Connectors示例)》
- 《Flink教程(10)- Flink批流一体API(其它)》
- 《Flink教程(11)- Flink高级API(Window)》
- 《Flink教程(12)- Flink高级API(Time与Watermaker)》
- 《Flink教程(13)- Flink高级API(状态管理)》
- 《Flink教程(14)- Flink高级API(容错机制)》
- 《Flink教程(15)- Flink高级API(并行度)》
- 《Flink教程(16)- Flink Table与SQL》
本文主要讲的是Flink Table
与SQL
的一些案例。
02 Flink Table&SQL 案例
2.1 案例1(DataStream SQL统计)
需求:将DataStream
注册为Table
和View
并进行SQL
统计。
代码如下:
import static org.apache.flink.table.api.Expressions.$; /** * 案例1:将DataStream注册为Table和View并进行SQL统计 * * @author : YangLinWei * @createTime: 2022/3/8 2:07 下午 */ public class Demo1 { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source DataStream<Order> orderA = env.fromCollection(Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); DataStream<Order> orderB = env.fromCollection(Arrays.asList( new Order(2L, "pen", 3), new Order(2L, "rubber", 3), new Order(4L, "beer", 1))); //3.注册表 // convert DataStream to Table Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount")); // register DataStream as Table tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount")); //4.执行查询 System.out.println(tableA); // union the two tables Table resultTable = tEnv.sqlQuery( "SELECT * FROM " + tableA + " WHERE amount > 2 " + "UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2" ); //5.输出结果 DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class); resultDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Order { public Long user; public String product; public int amount; } }
运行结果:
2.2 案例2(DataStream Table&SQL统计)
需求:使用SQL
和Table
两种方式对DataStream
中的单词进行统计。
示例代码如下(SQL方式):
import static org.apache.flink.table.api.Expressions.$; /** * 使用SQL和Table两种方式对DataStream中的单词进行统计。 * * @author : YangLinWei * @createTime: 2022/3/8 2:13 下午 */ public class Demo02 { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source DataStream<WC> input = env.fromElements( new WC("Hello", 1), new WC("World", 1), new WC("Hello", 1) ); //3.注册表 tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency")); //4.执行查询 Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); //5.输出结果 //toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate //DataStream<WC> resultDS = tEnv.toAppendStream(resultTable, WC.class); DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class); resultDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class WC { public String word; public long frequency; } }
运行结果:
示例代码如下(Table方式):
/** * 使用SQL和Table两种方式对DataStream中的单词进行统计 * * @author : YangLinWei * @createTime: 2022/3/8 2:15 下午 */ public class Demo02Table { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source DataStream<WC> input = env.fromElements( new WC("Hello", 1), new WC("World", 1), new WC("Hello", 1) ); //3.注册表 Table table = tEnv.fromDataStream(input); //4.执行查询 Table resultTable = table .groupBy($("word")) .select($("word"), $("frequency").sum().as("frequency")) .filter($("frequency").isEqual(2)); //5.输出结果 DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class); resultDS.print(); env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class WC { public String word; public long frequency; } }
2.3 案例3(SQL与滚动窗口)
需求:使用Flink SQL
来统计5秒内 每个用户的订单总数、订单的最大金额、订单的最小金额,也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额。
上面的需求使用流处理的Window
的基于时间的滚动窗口就可以搞定!
编码步骤:
- 创建环境
- 使用自定义函数模拟实时流数据
- 设置事件时间和
Watermaker
- 注册表
- 执行
sql
-可以使用sql
风格或table
风格 - 输出结果
- 触发执行
SQL方式实现:
/** * SQL方式实现 * * @author : YangLinWei * @createTime: 2022/3/8 2:19 下午 */ public class Demo3SQL { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() { private Boolean isRunning = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (isRunning) { Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1); ctx.collect(order); } } @Override public void cancel() { isRunning = false; } }); //3.Transformation DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((event, timestamp) -> event.getCreateTime()) ); //4.注册表 tEnv.createTemporaryView("t_order", watermakerDS, $("orderId"), $("userId"), $("money"), $("createTime").rowtime()); //5.执行SQL String sql = "select " + "userId," + "count(*) as totalCount," + "max(money) as maxMoney," + "min(money) as minMoney " + "from t_order " + "group by userId," + "tumble(createTime, interval '5' second)"; Table ResultTable = tEnv.sqlQuery(sql); //6.Sink //将SQL的执行结果转换成DataStream再打印出来 //toAppendStream → 将计算后的数据append到结果DataStream中去 //toRetractStream → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class); resultDS.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long createTime; } }
Table实现方式:
/** * Table方式 * * @author : YangLinWei * @createTime: 2022/3/8 2:26 下午 */ public class Demo3Table { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() { private Boolean isRunning = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (isRunning) { Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1); ctx.collect(order); } } @Override public void cancel() { isRunning = false; } }); //3.Transformation DataStream<Order> watermakerDS = orderDS .assignTimestampsAndWatermarks( WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((event, timestamp) -> event.getCreateTime()) ); //4.注册表 tEnv.createTemporaryView("t_order", watermakerDS, $("orderId"), $("userId"), $("money"), $("createTime").rowtime()); //查看表约束 tEnv.from("t_order").printSchema(); //5.TableAPI查询 Table ResultTable = tEnv.from("t_order") //.window(Tumble.over("5.second").on("createTime").as("tumbleWindow")) .window(Tumble.over(lit(5).second()) .on($("createTime")) .as("tumbleWindow")) .groupBy($("tumbleWindow"), $("userId")) .select( $("userId"), $("userId").count().as("totalCount"), $("money").max().as("maxMoney"), $("money").min().as("minMoney")); //6.将SQL的执行结果转换成DataStream再打印出来 DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class); resultDS.print(); //7.excute env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long createTime; } }
2.4 案例4(SQL消费Kafka)
需求:从Kafka
中消费数据并过滤出状态为success
的数据再写入到Kafka
{"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "success"} {"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning
代码实现:
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/
- https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html
/** * 从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka * * @author : YangLinWei * @createTime: 2022/3/8 2:30 下午 */ public class Demo4 { public static void main(String[] args) throws Exception { //1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2.Source TableResult inputTable = tEnv.executeSql( "CREATE TABLE input_kafka (\n" + " `user_id` BIGINT,\n" + " `page_id` BIGINT,\n" + " `status` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'input_kafka',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" ); TableResult outputTable = tEnv.executeSql( "CREATE TABLE output_kafka (\n" + " `user_id` BIGINT,\n" + " `page_id` BIGINT,\n" + " `status` STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'output_kafka',\n" + " 'properties.bootstrap.servers' = 'node1:9092',\n" + " 'format' = 'json',\n" + " 'sink.partitioner' = 'round-robin'\n" + ")" ); String sql = "select " + "user_id," + "page_id," + "status " + "from input_kafka " + "where status = 'success'"; Table ResultTable = tEnv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class); resultDS.print(); tEnv.executeSql("insert into output_kafka select * from " + ResultTable); //7.excute env.execute(); } }
03 Flink SQL常用算子
3.1 SELECT
SELECT :用于从 DataSet/DataStream
中选择数据,用于筛选出某些列。
示例:
SELECT * FROM Table;
// 取出表中的所有列SELECT name,age FROM Table;
// 取出表中name
和age
两列
与此同时 SELECT
语句中可以使用函数和别名,例如我们上面提到的 WordCount
中:
SELECT word, COUNT(word) FROM table GROUP BY word; • 1
3.2 WHERE
WHERE :用于从数据集/流中过滤数据,与 SELECT
一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。
示例:
SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
SELECT * FROM Table WHERE age = 20;
WHERE
是从原数据中进行过滤,那么在WHERE
条件中,Flink SQL
同样支持 =、<、>、<>、>=、<=
,以及 AND
、OR
等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE
可以结合IN、NOT IN
联合使用。举个例子:
SELECT name, age FROM Table WHERE name IN (SELECT name FROM Table2)
3.3 DISTINCT
DISTINCT: 用于从数据集/流中去重根据 SELECT
的结果进行去重。
示例:
SELECT DISTINCT name FROM Table;
对于流式查询,计算查询结果所需的 State
可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。
3.4 GROUP BY
GROUP BY :是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。
示例:
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
3.5 UNION 和 UNION ALL
UNION: 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。
示例:
SELECT * FROM T1 UNION (ALL) SELECT * FROM T2; • 1
3.6 JOIN
JOIN :用于把来自两个表的数据联合起来形成结果表,Flink
支持的JOIN
类型包括:
- JOIN - INNER JOIN
- LEFT JOIN - LEFT OUTER JOIN
- RIGHT JOIN - RIGHT OUTER JOIN
- FULL JOIN - FULL OUTER JOIN
这里的 JOIN
的语义和我们在关系型数据库中使用的 JOIN
语义一致。
示例:JOIN
(将订单表数据和商品表进行关联)
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
LEFT JOIN
与 JOIN
的区别是当右表没有与左边相 JOIN
的数据时候,右边对应的字段补NULL
输出,RIGHT JOIN
相当于LEFT JOIN
左右两个表交互一下位置。FULL JOIN
相当于RIGHT JOIN
和 LEFT JOIN
之后进行UNION ALL
操作,示例:
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
3.7 Group Window
根据窗口数据划分的不同,目前 Apache Flink
有如下 3 种 Bounded Window:
- 滚动窗口(
Tumble
):窗口数据有固定的大小,窗口数据无叠加; - 滑动窗口(
Hop
):窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加; - 会话窗口(
Session
):窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。
3.7.1 Tumble Window滚动窗口
Tumble
滚动窗口有固定大小,窗口数据不重叠,具体语义如下:
Tumble 滚动窗口对应的语法如下:
SELECT [gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], TUMBLE(timeCol, size)
其中:
- [gk] :决定了是否需要按照字段进行聚合;
- TUMBLE_START: 代表窗口开始时间;
- TUMBLE_END :代表窗口结束时间;
- timeCol :是流表中表示时间字段;
- size :表示窗口的大小,如 秒、分钟、小时、天。
举个例子,假如我们要计算每个人每天的订单量,按照user
进行聚合分组:
SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;
3.7.2 Hop Window滑动窗口
Hop
滑动窗口和滚动窗口类似,窗口有固定的size
,与滚动窗口不同的是滑动窗口可以通过slide
参数控制滑动窗口的新建频率。因此当 slide
值小于窗口size
的值的时候多个滑动窗口会重叠,具体语义如下:
Hop 滑动窗口对应语法如下:
SELECT [gk], [HOP_START(timeCol, slide, size)] , [HOP_END(timeCol, slide, size)], agg1(col1), ... aggN(colN) FROM Tab1 GROUP BY [gk], HOP(timeCol, slide, size)
每次字段的意思和 Tumble 窗口类似:
- [gk] :决定了是否需要按照字段进行聚合;
- HOP_START: 表示窗口开始时间;
- HOP_END: 表示窗口结束时间;
- timeCol :表示流表中表示时间字段;
- slide: 表示每次窗口滑动的大小;
- size: 表示整个窗口的大小,如 秒、分钟、小时、天。
举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:
SELECT product, SUM(amount) FROM Orders GROUP BY product,HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY)
3.7.3 Session Window会话时间窗口
会话时间窗口没有固定的持续时间,但它们的界限由interval
不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。
Seeeion 会话窗口对应语法如下:
SELECT [gk], SESSION_START(timeCol, gap) AS winStart, SESSION_END(timeCol, gap) AS winEnd, agg1(col1), ... aggn(colN) FROM Tab1 GROUP BY [gk], SESSION(timeCol, gap)
字段解析:
- [gk] :决定了是否需要按照字段进行聚合;
- SESSION_START :表示窗口开始时间;
- SESSION_END :表示窗口结束时间;
- timeCol :表示流表中表示时间字段;
- gap :表示窗口数据非活跃周期的时长。
例如,我们需要计算每个用户访问时间 12 小时内的订单量:
SELECT user, SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user
04 文末
本文主要讲解Flink Table
和SQL
的案例以及Flink SQL
常用算子的总结,谢谢大家的阅读,本文完!