【Flink-API】Table API & SQL 以及自定义UDF函数

简介: 【Flink-API】Table API & SQL 以及自定义UDF函数

一、 Flink Table API & SQL简介


1.1 Table API & SQL的背景


Flink虽然已经拥有了强大的DataStream/DataSet API,而且非常的灵活,但是需要熟练使用Eva或Scala的编程Flink编程API编写程序,为了满足流计算和批计算中的各种场景需求,同时降低用户使用门槛,Flink供- -种关系型的API来实现流与批的统一,那么这就是Flink的Table & SQL API。

自2015年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于Flink打造新一代计算引擎,针对Flink存在的不足进行优化和改进,并且在2019年初将最终代码开源,也就是我们熟知的Blink。Blink 在原来的Flink基础_上最显著的一个贡献就是Flink SQL的实现。


1.2 Table API & SQL的特点


Table & SQL API是-种关系型API,用户可以像操作mysql数据库表一样的操作数据, 而不需要写java代码完成Flink Function,更不需要手工的优化java代码调优。另外,SQL 作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供SQL支持,将很容易被用户接受。

●Table API & SQL是关系型声明式的,是处理关系型结构化数据的

●Table API & SQL批流统一 ,支持stream流计算和batch离线计算

●Table API & SQL查询能够被有效的优化,查询可以高效的执行

●Table API & SQL编程比较容易,但是灵活度没有DataStream/DataSet API和底层Low-leve |API强


20200924172416804.png


二、离线计算TableAPI & SQL


2.1 ●BatchSQLEnvironmept (离线批处理Table API)

public class BachWordCountSQL {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        DataSet<WordCount> input = env.fromElements(
                new WordCount("storm", 1L),
                new WordCount("flink", 1L),
                new WordCount("hadoop", 1L),
                new WordCount("flink", 1L),
                new WordCount("storm", 1L),
                new WordCount("storm", 1L)
        );
        tEnv.registerDataSet("wordcount",input,"word,counts");
        String sql = "select word,sum(counts) as counts from wordcount group by word" +
                "having sum(counts) >=2 order by counts desc";
        Table table = tEnv.sqlQuery(sql);
        DataSet<WordCount> result = tEnv.toDataSet(table, WordCount.class);
        result.print();
    }
}

2.2 ●BatchTableEnvironmept (离线批处理Table API)

public class BachWordCountTable {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        DataSet<WordCount> input = env.fromElements(
                new WordCount("storm", 1L),
                new WordCount("flink", 1L),
                new WordCount("hadoop", 1L),
                new WordCount("flink", 1L),
                new WordCount("storm", 1L),
                new WordCount("storm", 1L)
        );
        Table table = tEnv.fromDataSet(input);
        Table filtered = table.groupBy("word")
                .select("word,counts.sum as counts")
                .filter("counts>=2")
                .orderBy("counts.desc");
        DataSet<WordCount> wordCountDataSet = tEnv.toDataSet(filtered, WordCount.class);
        wordCountDataSet.print();
    }
}

执行结果:


20200924192835516.png

三、实时计算TableAPI & SQL


3.1 ●StreamSQLEnvironment (实时流处理Table API)

public class StreamSqlWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.实时的table的上下文
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // socket 数据源[hadoop spark flink]
        DataStreamSource<String> lines = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                Arrays.stream(line.split(" ")).forEach(out::collect);
            }
        });
        //2.注册成为表
        tableEnv.registerDataStream("t_wordcount",words,"word");
        //3.SQL
        Table table = tableEnv.sqlQuery("SELECT word,COUNT(1) counts FROM t_wordcount GROUP BY word");
        //4.结果
        DataStream<Tuple2<Boolean, WordCount>> dataStream = tableEnv.toRetractStream(table, WordCount.class);
        dataStream.print();
        env.execute();
    }
}

运行结果如下:

20200924190153331.png


3.2 ●StreamTableEnvironment (实时流处理Table API)

    //2.注册成为表
        Table table = tableEnv.fromDataStream(words, "word");
        Table table2 = table.groupBy("word").select("word,count(1) as counts");
        DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table2, Row.class);
        dataStream.print();
        env.execute();


四、Window窗口和TableAPI & SQL


4.1 Thumb滚动窗口


实现滚动不同窗口内相同用户的金额计算,将窗口的起始结束时间,金额相加。


数据如下:

1000,user01,p1,5

2000,user01,p1,5

2000,user02,p1,3

3000,user01,p1,5

9999,user02,p1,3

19999,user01,p1,5

程序如下:

public class TumblingEventTimeWindowTable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> socketDataStream = env.socketTextStream("192.168.52.200", 8888);
        SingleOutputStreamOperator<Row> rowDataStream = socketDataStream.map(new MapFunction<String, Row>() {
            @Override
            public Row map(String line) throws Exception {
                String[] fields = line.split(",");
                Long time = Long.parseLong(fields[0]);
                String uid = fields[1];
                String pid = fields[2];
                Double money = Double.parseDouble(fields[3]);
                return Row.of(time, uid, pid, money);
            }
        }).returns(Types.ROW(Types.LONG, Types.STRING, Types.STRING, Types.DOUBLE));
        SingleOutputStreamOperator<Row> waterMarkRow = rowDataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Row>(Time.seconds(0)) {
                    @Override
                    public long extractTimestamp(Row row) {
                        return (long) row.getField(0);
                    }
                }
        );
        tableEnv.registerDataStream("t_orders",waterMarkRow,"atime,uid,pid,money,rowtime.rowtime");
        Table table = tableEnv.scan("t_orders")
                .window(Tumble.over("10.seconds").on("rowtime").as("win"))
                .groupBy("uid,win")
                .select("uid,win.start,win.end,win.rowtime,money.sum as total");
        tableEnv.toAppendStream(table,Row.class).print();
        env.execute();
    }
}

运行结果如下:

20200924200216704.png


五、Kafka数据源—>Table API & SQL


5.1 KafkaToSQL

public class KafkaWordCountToSql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.connect(new Kafka()
                .version("universal")
                .topic("json-input")
                .startFromEarliest()
                .property("bootstrap.servers","hadoop1:9092")
        ).withFormat(new Json().deriveSchema()).withSchema(new Schema()
                .field("name", TypeInformation.of(String.class))
                .field("gender",TypeInformation.of(String.class))
        ).inAppendMode().registerTableSource("kafkaSource");
        Table select = tableEnv.scan("kafkaSource").groupBy("gender")
                .select("gender,count(1) as counts");
        tableEnv.toRetractStream(select, Row.class).print();
        env.execute();
    }
}


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
5月前
|
SQL 人工智能 数据挖掘
如何在`score`表中正确使用`COUNT`和`AVG`函数?SQL聚合函数COUNT与AVG使用指南
本文三桥君通过score表实例解析SQL聚合函数COUNT和AVG的常见用法。详解COUNT(studentNo)、COUNT(score)、COUNT()的区别,以及AVG函数对数值/字符型字段的不同处理,特别指出AVG()是无效语法。实战部分提供6个典型查询案例及结果,包含创建表、插入数据的完整SQL代码。产品专家三桥君强调正确理解函数特性(如空值处理、字段类型限制)对数据分析的重要性,帮助开发者避免常见误区,提升查询效率。
361 0
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1546 27
|
SQL Oracle 关系型数据库
SQL优化-使用联合索引和函数索引
在一次例行巡检中,发现一条使用 `to_char` 函数将日期转换为字符串的 SQL 语句 CPU 利用率很高。为了优化该语句,首先分析了 where 条件中各列的选择性,并创建了不同类型的索引,包括普通索引、函数索引和虚拟列索引。通过对比不同索引的执行计划,最终确定了使用复合索引(包含函数表达式)能够显著降低查询成本,提高执行效率。
251 3
|
SQL 数据库 数据库管理
数据库SQL函数应用技巧与方法
在数据库管理中,SQL函数是处理和分析数据的强大工具
|
SQL 数据库 索引
SQL中COUNT函数结合条件使用的技巧与方法
在SQL查询中,COUNT函数是一个非常常用的聚合函数,用于计算表中满足特定条件的记录数
2527 5
|
SQL 关系型数据库 MySQL
SQL日期函数
SQL日期函数
271 0
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
221 0
|
5月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
578 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。