【Flink】小白级入门,Flink sql 的基础用法(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】小白级入门,Flink sql 的基础用法

Flink sql 是什么

sql 的诞生就是为了简化我们对数据开发,可以使用少量的 sql 代码,帮助我完成对数据的查询,分析等功能

声明式 & 易于理解

对于用户只需要表达我想要什么,具体处理逻辑交给框架,系统处理,用户无需关心,对于一些非专业的开发人员有了解 sql,并且 sql 相对我们学习 java,c 等语言更简单,学习成本更低,如果跨团队,或者非大数据开发人员,也可以通过 sql 来进行 flink 任务的开发

自动调优

查询优化器,会对我们编写的 sql 进行优化,生成效率更好的执行计划,所以用户不需要了解底层细节,即高效的获取结果

稳定

sql 语义发展几十年是一个很稳定的语言,少有变动,当我们引擎的升级,甚至替换成另一个引擎,都可以做到兼容地,平滑地升级,无需更改我们的已经编写好的 sql 代码

流批统一的基础

对于 flink 通过 sql 的表达式,来完成流批的统一,一套 sql 代码,既可以跑流任务,也可以跑批任务,减少我们开发的成本

Flink sql 使用

数据类型
-- 字符串类型
# char类型
CHAR
CHAR(n) -- n在 1 和 2147483647 之间  未设置n=1
# 字符串类型
VARCHAR
VARCHAR(n)  -- n在 1 和 2147483647 之间  未设置n=1
STRING  -- 等于最大的varchar(max)
# 二进制类型
BINARY
BINARY(n) -- 范围同上
# 可变长度二进制类型
VARBINARY
VARBINARY(n)  -- 类似于string
BYTES
-- 数字类型
# 带有精度的十进制数字类型  -- 类似于java中的
DECIMAL
DECIMAL(p)
DECIMAL(p, s)
DEC
DEC(p)
DEC(p, s)
NUMERIC
NUMERIC(p)
NUMERIC(p, s)
# 带符号
TINYINT  -- -128 to 127
SMALLINT -- -32768 to 32767
# 不带符号的
INT  -- 2147483,648 to 2147483647
INTEGER
BIGINT  --  -9223372036854775808 to 9223372036854775807
# 带小数的
FLOAT
DOUBLE
-- 时间类型
#日期
DATE  -- 2020-10-12
#时间
TIME
TIME(p) -- 10:10:12.p  不指定p,p= 0
#时间戳
TIMESTAMP
TIMESTAMP(p) -- 2020-12-12 12:10:11.p
-- 其他类型
#
ARRAY<t>
t ARRAY
#map类型
MAP<kt, vt>
-- 对应java的类型
Class                    Type
  java.lang.String              STRING
  java.lang.Boolean              BOOLEAN
  boolean                   BOOLEAN NOT NULL
  java.lang.Byte               TINYINT
  byte                    TINYINT NOT NULL
  java.lang.Short               SMALLINT
  short                    SMALLINT NOT NULL
  java.lang.Integer              INT
  int                     INT NOT NULL
  java.lang.Long               BIGINT
  long                    BIGINT NOT NULL
  java.lang.Float               FLOAT
  float                    FLOAT NOT NULL
  java.lang.Double              DOUBLE
  double                   DOUBLE NOT NULL
  java.sql.Date                DATE
  java.time.LocalDate             DATE
  java.sql.Time                TIME(0)
  java.time.LocalTime             TIME(9)
  java.sql.Timestamp             TIMESTAMP(9)
  java.time.LocalDateTime      TIMESTAMP(9)
  java.time.OffsetDateTime     TIMESTAMP(9) WITH TIME ZONE
  java.time.Instant         TIMESTAMP(9) WITH LOCAL TIME ZONE
  java.time.Duration        INVERVAL SECOND(9)
  java.time.Period I        NTERVAL YEAR(4) TO MONTH
  byte[]              BYTES
  T[]                ARRAY<T>
  java.util.Map<K, V>         MAP<K, V>
系统函数 & 自定义函数
/*
下面是1.12版本的系统内置的函数,具体我们可以到官网查看,根据需求使用即可
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
*/
// TODO 主要介绍自定义函数
/*
 udf 和 udaf 需要定义eval方法,实现自己的逻辑,具体系统会调用对应的方法
 udf : 传入一个值/多个/或者不传入,返回一个新的值,可以重载该方法,具体会根据传入的参数调用对应eval烦恼歌发 类似`map`算子,作用于sql
 udaf : 自定义聚合函数,根据自己的逻辑定义累加器
 udtf : 用作与表中,可返回一个或多个值,
*/
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.sql.SQLException;
public class UDFDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
                EnvironmentSettings.newInstance().build());
        // 注册函数
        tEnv.registerFunction("customFunc1", new CustomUDF());
        tEnv.registerFunction("customFunc2", new CustomUDAF());
        tEnv.registerFunction("customFunc3", new CustomUDTF());
    }
    static class Acc {
        int result;
        public Integer gerResult() {
            return result;
        }
        public Acc merge(Acc acc) {
            result = acc.gerResult() + result;
            return this;
        }
        public void incr() {
            result++;
        }
    }
    static class CustomUDF extends ScalarFunction {
        // UDF 需要定义该方法
        public int eval(String str) {
            int hc = 0;
            for (char c : str.toUpperCase().toCharArray()) {
                hc = hashCode() >> c;
            }
            hc = hc - 1 - str.length();
            hc = hc >> 7;
            return hc;
        }
    }
    static class CustomUDTF extends TableFunction<Row> {
        // udtf 需要定义该方法,在该方法实现逻辑
        public void eval(String str) throws SQLException {
            if (str != null) {
                for (String s : str.split(",")) {
                    Row row = new Row(2);
                    row.setField(0, s);
                    row.setField(1, 1);
                    collect(row);
                }
            }
        }
        @Override
        public TypeInformation<Row> getResultType() {
            return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        }
    }
    static class CustomUDAF extends AggregateFunction<Integer, Acc> {
        @Override
        public Integer getValue(Acc accumulator) {
            return accumulator.gerResult();
        }
        @Override
        public Acc createAccumulator() {
            return new Acc();
        }
      // 累加
        public void accumulate(Acc acc,String input){
                if("*".equals(input)){
                    return;
                }
                acc.incr();
        }
        public void accumulate(Acc acc){
                acc.incr();
        }
    }
}
简单案例
代码

flink sql 中时间机制本质与 dataStream api 相同,只不过使用少于区别,稍加注意即可,注意指定 watermark 需要使用 sql 中 timestamp(3)类型(具体对应 java 类型可根据上面类型自行判断),设置 watermark 后可使用 ROWTIEM 字段(具体看 sql 代码),没有设置可直接使用 PROCTIME 字段

注意 : 不同的时间语义要严格对应环境配置的时间语义,否则可能出现异常

时间字段为两种,属于非用户指定字段,设置完时间语义后,根据需求使用具体的时间字段

ROWTIME : 事件时间

PROCTIME : 处理时间字段

场景 :

  • join : 场景与双流 join 或者 维表 join,目前 flink 支持的不是很好
  • topN & 去重 : 语法基本相同,row_num > 1 即 topN , 当=1 则是去重操作

topN 场景一些热搜,排名等内容

去重顾名思义,就是为了去重,去重会涉及到 retract 流(以后会详细讲)内容,会更新之前已经存在的结果

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
6月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
|
6月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】共享利用Python脚本解决Oracle的SQL脚本@@用法
本文来自YashanDB官网,介绍如何处理Oracle客户端sql*plus中使用@@调用同级目录SQL脚本的场景。崖山数据库23.2.x.100已支持@@用法,但旧版本可通过Python脚本批量重写SQL文件,将@@替换为绝对路径。文章通过Oracle示例展示了具体用法,并提供Python脚本实现自动化处理,最后调整批处理脚本以适配YashanDB运行环境。
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1411 27
|
8月前
|
SQL 存储 机器学习/深度学习
如何让SQL速度飞起来 入门YashanDB优化器
优化器,SQL引擎的核心组成部分,是数据库中用于把关系表达式转换成最优执行计划的核心组件,影响数据库系统执行性能的关键组件之一。
78 15
|
9月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
637 14
|
11月前
|
SQL 数据库
SQL数据库基础语法入门
[link](http://www.vvo.net.cn/post/082935.html)
|
11月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
195 0
|
SQL 关系型数据库 MySQL
sql入门基础
好用的mysql客户端 https://www.quora.com/What-is-the-best-free-DB-schema-design-tool https://www.quora.com/What-is-the-best-MySQL-client-for-Mac-OS-X-or-Windows MySql string 函数 http://dev.
682 0
|
12月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")

热门文章

最新文章