Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(2)https://developer.aliyun.com/article/1532276
5、 查询
5.1、 DataGen & Print
1)创建数据生成器源表
CREATE TABLE source ( id INT, ts BIGINT, vc INT ) WITH ( -- flink 自带的数据生成器 'connector' = 'datagen', -- 每s生成的数据条数 'rows-per-second'='1', -- 生成类型 sequence代表自增序列,需要指定起始值和结束值 'fields.id.kind'='sequence', -- id字段自增起始值 'fields.id.start'='1', -- id字段自增结束值 'fields.id.end'='10000', -- ts字段的生成类型 'fields.ts.kind'='sequence', 'fields.ts.start'='1', 'fields.ts.end'='1000000', -- vc字段类型 随机值 'fields.vc.kind'='random', -- 最小值 1 'fields.vc.min'='1', -- 最大值 100 'fields.vc.max'='100' ); CREATE TABLE sink ( id INT, ts BIGINT, vc INT ) WITH ( connector' = 'print' );
2)查询源表
查询数据:select * from source;
注意:如果发现刷新不动,就退出去查看一下log4j输出了什么警告,有的警告可以忽略,但是有的可能就是原因。比如我是因为没有在环境变量中添加 HADOOP_CONF_DIR ,导致我的数据生成器不生成数据。
我们可以看到结果显示模式是 table 模式,这是默认的显示模式,我们在前面的常用配置里讲过,还有一种 changelog 模式可以设置。
我们再次查询:
我们可以看到,这种模式下,它的显示比 table 模式多了一列 op ,代表操作,+I 代表新增数据,撤回就是 -U。
此外还有一种模式叫做 tableau:
可以看到,这种模式喜爱,我们不会进入那个专门的数据展示界面,更加方便。
select * from source;
3)插入sink表并查询
创建 Sink表:
我们试着把 source 中的数据输出到 sink:
insert into sink select * from source;
可以看到它给我们返回了 一个 Job Id,我们可以直接查询 sink 表,或者也可以在 Web UI 中查看:
select * from sink;
或者
5.2、With子句
WITH提供了一种编写辅助语句的方法,以便在较大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression, CTE),可以认为它们定义了仅为一个查询而存在的临时视图。
1)语法
WITH <with_item_definition> [ , ... ] SELECT ... FROM ...; <with_item_defintion>: with_item_name (column_name[, ...n]) AS ( <select_query> )
2)案例
我们查询这个临时表就相当于执行了 with 内部的查询,比如下面:
WITH source_with_total AS ( SELECT id, vc+10 AS total FROM source ) -- 注意这里没有分号 这两个句子是一个作业里面的 SELECT id, SUM(total) FROM source_with_total GROUP BY id;
我们查询 source_with_total 就相当于查询了它内部的语句:select id,vc+10 as total from source;当然,我们在查这张临时表的时候可以选择字段。
需要注意的地方就是我们生成临时表的句子和查询临时表的句子是一个语句没有分号的,它们同属于一个作业,这个临时表只在这里生效,就像帮我们的查询语句简化了一下,作业结束它也就不存在了。我们完全可以写成这样:
select id,vc+10 as total from source;
5.3、SELECT & WHERE 子句
1)语法
SELECT select_list FROM table_expression [ WHERE boolean_expression ]
2)案例
-- 自定义 Source 的数据 -- 不需要给表 t 的字段显示添加类型(添加会报错) flink会自动识别 SELECT id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price); SELECT vc + 10 FROM source WHERE id >10;
通过查询结果,我们可以知道id=10的这条数据它的 vc 是<=10 的。
5.4、SELECT DISTINCT 子句
用作根据 key 进行数据去重
SELECT DISTINCT vc FROM source;
对于流查询,计算查询结果所需的状态可能无限增长。状态大小取决于不同行数。可以设置适当的状态生存时间(TTL)的查询配置,以防止状态过大。但是,这可能会影响查询结果的正确性。如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。
Flink(十三)【Flink SQL(上)SqlClient、DDL、查询】(4)https://developer.aliyun.com/article/1532280