大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink CEP 案例

检测交易活跃用户

超时未交付

Flink SQL

Flink SQL 是 Apache Flink 提供的一种高层次的查询语言接口,它基于 SQL 标准,为开发者提供了处理流式数据和批处理数据的能力。Flink SQL 允许用户使用标准 SQL 查询语言在数据流和数据表上执行复杂的操作,适用于多种应用场景,如实时分析、数据流处理、机器学习等。下面是 Flink SQL 的一些重要概念和功能:


流与批统一的查询模式

Flink SQL 的一大特点是流处理和批处理的统一性。通过同一套 SQL 语法,用户可以同时处理静态数据(批处理)和动态数据(流处理)。这使得应用程序的开发更加简化,因为可以用相同的逻辑编写实时流数据处理和历史数据的查询。


动态表 (Dynamic Tables)

Flink SQL 通过动态表的概念将流数据建模为不断变化的表。这种动态表随着时间推移不断更新,数据的每个变化(插入、更新、删除)都会影响表的状态。通过动态表的概念,Flink 可以使用 SQL 查询连续的流数据,并在查询执行时获得不断更新的结果。


窗口操作 (Windowing)

在流式数据处理场景中,窗口操作非常重要。Flink SQL 提供了多种类型的窗口操作,包括:


滚动窗口 (Tumbling Window):将数据按照固定长度分割成不重叠的窗口。

滑动窗口 (Sliding Window):窗口之间存在重叠,数据可能被分配到多个窗口。

会话窗口 (Session Window):窗口由活动间隔定义,不同的事件可能会聚合在一个窗口中。

连接操作 (Joins)

Flink SQL 支持多种连接操作:


流与流的连接:允许用户将多个流结合在一起,基于时间或键进行匹配。

流与表的连接:将静态表与流数据进行匹配,从而使流式数据处理能够结合历史数据或参考数据。

时态表连接 (Temporal Table Join):用于将流数据与一个时态表进行连接,时态表会随着时间不断更新。

内置函数和自定义函数

Flink SQL 提供了丰富的内置函数,涵盖了字符串操作、数学运算、时间日期处理、聚合操作等。此外,Flink SQL 还支持用户自定义函数(UDF、UDTF、UDAF),用户可以根据具体需求扩展 SQL 的功能。


Table API 与 SQL API 的互操作性

Flink 提供了两种高级数据处理 API:


Table API:一种与关系代数类似的编程接口,支持链式调用,功能类似于 SQL。

SQL API:用户可以直接使用标准 SQL 语句进行数据处理。

Table API 和 SQL API 具有很高的互操作性,用户可以在同一个程序中混合使用这两者。例如,可以先用 Table API 进行表定义和部分操作,再通过 SQL 语句执行复杂的查询。


支持多种数据源和数据接收器

Flink SQL 支持连接多种数据源和数据接收器,如 Kafka、文件系统、数据库(如 MySQL、PostgreSQL)、Hive、HBase 等。通过 SQL 语法,用户可以轻松地将流数据写入这些外部系统,也可以从这些系统中读取数据进行处理。


状态管理与容错机制

Flink SQL 继承了 Flink 强大的状态管理和容错机制。在流处理任务中,Flink SQL 能够有效地处理有状态的计算,并保证在失败时自动恢复。基于 Flink 的检查点(Checkpointing)和保存点(Savepoint)机制,Flink SQL 提供了 Exactly-Once 的状态一致性保障。


实时分析与 ETL

Flink SQL 可以用于实时数据的分析与处理,常用于构建实时 ETL (Extract, Transform, Load) 流程。例如,用户可以通过 SQL 查询对从 Kafka、数据库等数据源接收到的流数据进行清洗、过滤、转换,并将结果写入到其他系统中(如 Elasticsearch、HDFS、JDBC)。


HelloWorld

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <type>pom</type>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

依赖说明:


flink-table-api-java-bridge_2.12:桥接器,主要负责 TableAPI 和 DataStream/DataSetAPI 的连接支持,按照语言分Java和Scala。

flink-table-planner-blink_2.12:计划期,是TableAPI最主要的部分,提供了运行时环境和生成程序执行计划的Planner。

如果是生产环境,则已经有 planner,就只需要有bridge就可以了

flink-table:基础依赖

编写代码

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;


public class TableApiDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("name", 10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });

        // =======================
        // Table 方式
        Table table = tableEnvironment.fromDataStream(data, $("name"), $("age"));
        // 对Table的数据查询
        Table name = table.select($("name"));
        // 将数据输出到控制台
        DataStream<Tuple2<Boolean, Row>> result = tableEnvironment.toRetractStream(name, Row.class);
        result.print();
        System.out.println("=========================");
        // =======================
        // SQL 方式
        tableEnvironment.createTemporaryView("users",data, $("name"), $("age"));
        String sql = "select name from users";
        table = tableEnvironment.sqlQuery(sql);
        result = tableEnvironment.toRetractStream(table, Row.class);
        result.print();
        System.out.println("=========================");
        env.execute("TableApiDemo");
    }

}

运行代码

控制台会一直不间断的输出如下的内容:

=========================
=========================
1> (true,name)
6> (true,name)
2> (true,name)
7> (true,name)
3> (true,name)
8> (true,name)
4> (true,name)
1> (true,name)
5> (true,name)
2> (true,name)
6> (true,name)
3> (true,name)

控制台的运行结果如下所示:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
47 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
7天前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
1月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
28 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1
zdl
|
3天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
17 0
|
1月前
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
26 1
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
1月前
|
存储 大数据 分布式数据库
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
大数据-165 Apache Kylin Cube优化 案例 2 定义衍生维度及对比 & 聚合组 & RowKeys
33 1
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
52 1