Flink-Table-&-SQL

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 简介Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。

简介

Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

Flink SQL的编程模型

创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要负责:
  1、在内部目录中注册一个Table
  2、注册一个外部目录
  3、执行SQL查询
  4、注册一个用户自定义函数(标量、表及聚合)
  5、将DataStream或者DataSet转换成Table
  6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。
TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。

在目录中注册表

TableEnvironment允许通过各种源来注册一个表:

  1、一个已存在的Table对象,通常是Table API或者SQL查询的结果

     Table projTable = tableEnv.scan("X").select(...);

  2、TableSource,可以访问外部数据如文件、数据库或者消息系统

     TableSource csvSource = new CsvTableSource("/path/to/file", ...);

  3、DataStream或者DataSet程序中的DataStream或者DataSet

     //将DataSet转换为Table
     Table table= tableEnv.fromDataSet(tableset);

注册TableSink

注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],......):
  

TableSink csvSink = new CsvTableSink("/path/to/file", ...); 
  
  2、 String[] fieldNames = {"a", "b", "c"}; 
                TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 
                tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

实战案例一

基于Flink SQL的WordCount:

public class WordCountSQL {

    public static void main(String[] args) throws Exception{

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        List list  =  new ArrayList();
        String wordsStr = "Hello Flink Hello TOM";
        String[] words = wordsStr.split("\\W+");
        for(String word : words){
            WC wc = new WC(word, 1);
            list.add(wc);
        }
        DataSet<WC> input = env.fromCollection(list);
        tEnv.registerDataSet("WordCount", input, "word, frequency");
        Table table = tEnv.sqlQuery(
                "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
        DataSet<WC> result = tEnv.toDataSet(table, WC.class);
        result.print();
    }//main

    public static class WC {
        public String word;//hello
        public long frequency;//1

        // public constructor to make it a Flink POJO
        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }

}

输出如下:

WC TOM 1
WC Hello 2
WC Flink 1

实战案例二

本例稍微复杂,首先读取一个文件中的内容进行统计,并写入到另外一个文件中:

public class SQLTest {

    public static void main(String[] args) throws Exception{

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
        env.setParallelism(1);

        DataSource<String> input = env.readTextFile("test.txt");
        input.print();
        //转换成dataset
        DataSet<Orders> topInput = input.map(new MapFunction<String, Orders>() {
            @Override
            public Orders map(String s) throws Exception {
                String[] splits = s.split(" ");
                return new Orders(Integer.valueOf(splits[0]), String.valueOf(splits[1]),String.valueOf(splits[2]), Double.valueOf(splits[3]));
            }
        });
        //将DataSet转换为Table
        Table order = tableEnv.fromDataSet(topInput);
        //orders表名
        tableEnv.registerTable("Orders",order);

        Table tapiResult = tableEnv.scan("Orders").select("name");
        tapiResult.printSchema();

        Table sqlQuery = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");

        //转换回dataset
        DataSet<Result> result = tableEnv.toDataSet(sqlQuery, Result.class);

        //将dataset map成tuple输出
        /*result.map(new MapFunction<Result, Tuple2<String,Double>>() {
            @Override
            public Tuple2<String, Double> map(Result result) throws Exception {
                String name = result.name;
                Double total = result.total;
                return Tuple2.of(name,total);
            }
        }).print();*/


        TableSink sink = new CsvTableSink("SQLTEST.txt", "|");
        //writeToSink

        /*sqlQuery.writeToSink(sink);
        env.execute();*/

        String[] fieldNames = {"name", "total"};
        TypeInformation[] fieldTypes = {Types.STRING, Types.DOUBLE};
        tableEnv.registerTableSink("SQLTEST", fieldNames, fieldTypes, sink);
        sqlQuery.insertInto("SQLTEST");
        env.execute();
    }

    /**
     * 源数据的映射类
     */
    public static class Orders {
        /**
         * 序号,姓名,书名,价格
         */
        public Integer id;
        public String name;
        public String book;
        public Double price;

        public Orders() {
            super();
        }
        public Orders(Integer id, String name, String book, Double price) {
            this.id = id;
            this.name = name;
            this.book = book;
            this.price = price;
        }
    }
    /**
     * 统计结果对应的类
     */
    public static class Result {
        public String name;
        public Double total;

        public Result() {}
    }
    }//
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
213 15
|
18天前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
95 14
|
3月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
64 0
|
4月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
96 2
|
4月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
54 1
|
5月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
56 1
|
5月前
|
SQL 安全 流计算
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
94 1
|
5月前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
5月前
|
SQL 设计模式 数据处理
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
Flink SQL 在快手实践问题之状态兼容的终极方案特点内容如何解决
31 0