flink-sql(table api 编程)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: table api 基本使用tableEnvironment 和 streamTableEnvironment注册表,临时表,持久表Table api 和 table sql 混用table api 和 datastream 混用table api 的输入和输出(kafka)kafka的高级特性option

Table API 标准结构


EXP

下面是一个标准的Table API 的例子


public class FlinkTableStandardStructure {
    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()//默认就是StreamingMode
                //.inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        //2、创建source table: 1)读取外部表;2)从Table API或者SQL查询结果创建表
        Table projTable = tEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
                        DataTypes.FIELD("name", DataTypes.STRING())
                ),
                row(1, "zhangsan"),
                row(2L, "lisi")
        ).select($("id"), $("name"));
        //注册表到catalog(可选的)
        tEnv.createTemporaryView("sourceTable", projTable);
        //3、创建sink table
        final Schema schema = Schema.newBuilder()
                .column("id", DataTypes.DECIMAL(10, 2))
                .column("name", DataTypes.STRING())
                .build();
        tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
                .schema(schema)
                .build());
        //4、Table API执行查询(可以执行多次查询,中间表可以注册到catalog也可以不注册)
        Table resultTable = tEnv.from("sourceTable").select($("id"), $("name"));
//        Table resultTable = projTable.select($("id"), $("name"));
        //5、输出(包括执行,不需要单独在调用tEnv.execute("job"))
        resultTable.executeInsert("sinkTable");
    }
}



两种TableEnvironment



1、TableEnvironment


标准的做法是创建TableEnvironment,在创建 TableEnv 的时候,可以传⼊⼀个 EnvironmentSettings 或者 TableConfig 参数, ⽤来配置 TableEnvironment 的⼀些特性: 注意:Flink1.14开始删除了其他的执⾏器了,只保留了BlinkPlanner


功能:

  • 注册catalog(可以理解为数据系统实例,⽐如某个HBase集群,某个MySQL服务器)
  • 在catalog注册库和表
  • 加载插件模块
  • 执⾏SQL查询
  • 注册UDF
  • DataStream和Table互转(仅仅在StreamExecutionEnvironment下)


EnvironmentSettings settings = EnvironmentSettings
 .newInstance()
 .useBlinkPlanner()//Flink1.14开始就删除了其他的执⾏器了,只保留了BlinkPlanner
 .inStreamingMode()//默认就是StreamingMode
 //.inBatchMode()
 .build();
TableEnvironment tEnv = TableEnvironment.create(settings);

2、StreamTableEnvironment

如果要混⽤DataStream和Table API/SQL,可以使⽤StreamTableEnvironment:

//1、获取Stream执⾏环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执⾏环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


TableEnvironment和StreamExecutionEnvironment⼆选⼀即可。


TableEnviromnet的配置

// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");

具体配置项请参考如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/


创建表和注册表

有了TableEnvironment,我们接下来就需要创建表并注册表,注册表是可选的。


创建表的两种⽅式


创建表是为了在表上执⾏TableAPI或者SQL查询,可以通过如下两种⽅式创建表



创建表的方式

说明

代码示例

虚拟表

从现有的table创建表,通常是从Table API&SQL查询结果创建 表,这种一般是中间表

tEnv.createTemporaryView("sourceTable", projTable);

常规表

另外⼀种更实⽤的⽅式是通过外部数据创建表,例如⽂件, 数据 库表, 或者消息队列,这种⼀般是创建输⼊表或者输出表,采⽤ connector方式创建

tableEnv.createTable("SourceTableA", sourceDescriptor); tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor)

创建虚拟表

Table projTable = tEnv.fromValues(
 DataTypes.ROW(
 DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)),
 DataTypes.FIELD("name", DataTypes.STRING())
 ),
 row(1, "zhangsan"),
 row(2L, "lisi")
).select($("id"), $("name"));
//createTemporaryView
tEnv.createTemporaryView("sourceTable", projTable);


创建常规表

final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
 .schema(Schema.newBuilder()
 .column("f0", DataTypes.STRING())
 .build())
 .option(DataGenOptions.ROWS_PER_SECOND, 100)
 .build();
tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
// 也可使⽤SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")

注意: 具体的Connector可以参考如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/overview/


临时表和持久表


注册表是为了把计算的中间结果注册为⼀个表,供同⼀Flink session后续访问或者跨Flink session访问


只有常规 的表(TABLES)才分临时表和持久表


表类型

说明


临时表

同一个Flink session可访问,跨Flink session 不能访问


持久表

同一个Flink session和跨Flink session 都能访问


EXP

final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
 .schema(Schema.newBuilder()
 .column("f0", DataTypes.STRING())
 .build())
 .option(DataGenOptions.ROWS_PER_SECOND, 100)
 .build();
//持久表
tableEnv.createTable("SourceTableA", sourceDescriptor);
//临时表
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor)


表的标识符


注册表时可以指定唯⼀标识符,格式为:


Catalog(⽬录).database(数据库).objectname(表名)


如果没有指定Catalog或database,就使⽤当前默认值,即tEnv.useCatalog("...")和tEnv.useDatabase("...")指定的 默认值,如果tEnv没有指定,则默认值default_catalog.default_database



EXP


TableEnvironment tEnv = ...;
//当前Flink会话切换到custom_catalog.custom_database
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
//在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("exampleView", table);
//在custom_catalog.other_database下,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("other_database.exampleView", table);
//在custom_catalog.custom_database下,把table注册为⼀个名叫exampleView.View的视图表,注意转义字
tableEnv.createTemporaryView("`example.View`", table);
//other_catalog.other_database,把table注册为⼀个名叫exampleView的视图表
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);


Table API 查询



常规查询

/**
 * Flink Table API执行聚合操作
 */
public class FlinkTableAggr {
    public static void main(String[] args) throws Exception{
        //1、获取Stream执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        //3、读取数据
        DataStream<ClickLogs> clickLogs = env.fromElements(
                "Mary,./home,2022-02-02 12:00:00",
                "Bob,./cart,2022-02-02 12:00:00",
                "Mary,./prod?id=1,2022-02-02 12:00:05",
                "Liz,./home,2022-02-02 12:01:00",
                "Bob,./prod?id=3,2022-02-02 12:01:30",
                "Mary,./prod?id=7,2022-02-02 12:01:45"
        ).map(event -> {
            String[] props = event.split(",");
            return ClickLogs
                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
        //4、流转换为动态表
        Table table = tEnv.fromDataStream(clickLogs);
        //5、执行Table API查询/SQL查询
        /**
         * select
         *  user,
         *  count(url) as cnt
         * from clicks
         * group by user
         */
        Table resultTable = table
                .groupBy($("user"))
                .aggregate($("url").count().as("cnt"))
                .select($("user"),$("cnt"));
//        System.out.println(resultTable.explain());
        //6、将Table转换为DataStream
        //聚合操作必须是撤回流
        DataStream<Tuple2<Boolean,Row>> selectedClickLogs = tEnv.toRetractStream(resultTable,Row.class);
        //7、处理结果:打印/输出
        selectedClickLogs.print();
        //8、执行
        env.execute("FlinkTableAggr");
    }
}


混⽤Table API和SQL查询

public class MixTableAPIAndSQL {
    public static void main(String[] args) {
        //1、创建TableEnvironment
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        //2、创建source table
        Table projTable = tEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("user", DataTypes.STRING()),
                        DataTypes.FIELD("url", DataTypes.STRING()),
                        DataTypes.FIELD("cTime", DataTypes.STRING())
                ),
                row("Mary", "./home","2022-02-02 12:00:00"),
                row("Bob", "./cart","2022-02-02 12:00:00"),
                row("Mary", "./prod?id=1","2022-02-02 12:00:05"),
                row("Liz", "./home","2022-02-02 12:01:00"),
                row("Bob", "./prod?id=3","2022-02-02 12:01:30"),
                row("Mary", "./prod?id=7","2022-02-02 12:01:45")
        ).select($("user"), $("url"),$("cTime"));
        //注册表到catalog(可选的)
        tEnv.createTemporaryView("sourceTable", projTable);
        //3、创建sink table
        final Schema schema = Schema.newBuilder()
                .column("user", DataTypes.STRING())
                .column("url", DataTypes.STRING())
                .build();
        tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
                .schema(schema)
                .build());
        //4、Table API和SQL混用
        Table resultTable = tEnv.from("sourceTable").select($("user"), $("url"));
        tEnv.createTemporaryView("resultTableView", resultTable);
        Table result = tEnv.sqlQuery("select * from resultTableView where user = 'Mary'");
        //5、输出(包括执行,不需要单独在调用tEnv.execute("job"))
        result.executeInsert("sinkTable");
        result.printSchema();
    }
}


Table API 和DataStream混用

package com.blink.sb;
import com.blink.sb.beans.ClickLogs;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class MixFlinkTableAndDataStream {
    public static void main(String[] args) throws Exception {
        //1、获取Stream执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //3、读取数据(source)
        DataStream<ClickLogs> clickLogs = env.fromElements(
                "Mary,./home,2022-02-02 12:00:00",
                "Bob,./cart,2022-02-02 12:00:00",
                "Mary,./prod?id=1,2022-02-02 12:00:05",
                "Liz,./home,2022-02-02 12:01:00",
                "Bob,./prod?id=3,2022-02-02 12:01:30",
                "Mary,./prod?id=7,2022-02-02 12:01:45"
        ).map(event -> {
            String[] props = event.split(",");
            return ClickLogs
                    .builder()
                    .user(props[0])
                    .url(props[1])
                    .cTime(props[2])
                    .build();
        });
        //4、流转换为动态表
        Table table = tEnv.fromDataStream(clickLogs);
        //5、执行Table API查询/SQL查询
        Table resultTable = table
                .where($("user").isEqual("Mary"))
                .select($("user"), $("url"), $("cTime"));
        //6、将Table转换为DataStream
        DataStream<ClickLogs> selectedClickLogs = tEnv.toDataStream(resultTable, ClickLogs.class);
        //7、处理结果:打印/输出
        selectedClickLogs.print();
        //8、执行
        env.execute("FlinkTableFirstExample");
    }
}


Table api Connectors (内置连接器)


Flink 的 Table API & SQL通过Connectors连接外部系统,并执⾏批/流⽅式的读写操作。Connectors提供了丰富的 外部系统连接器,根据source和sink的类型,它们⽀持不同的格式,例如 CSV、Avro、Parquet 或 ORC。 ⽬前⽀持的内置Connectors如下表所示(截⽌到Flink 1.14.3):


image.png

注意:如果作为sink⼤家还要注意⽀持的输出模式(Append/Retract/Upsert)


参考:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/


Table API 输入(kafka)和输出(kafka)

package com.blink.sb;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkTableAPIKafka2Kafka {
    public static final String input_topic = "clicklog_input";
    public static final String output_topic = "clicklog_output";
    public static final String output_topic_change = "clicklog_output_change";
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、创建TableEnvironment
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        //2、创建kafka source table
        final Schema schema = Schema.newBuilder()
                .column("user", DataTypes.STRING())
                .column("url", DataTypes.STRING())
                .column("cTime", DataTypes.STRING())
                .build();
        tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
                .schema(schema)
                .format("json")
                .option("topic", input_topic)
                .option("properties.bootstrap.servers", "node02:6667")
                .option("properties.group.id", "testGroup")//每次都从最早的offsets开始
                .option("scan.startup.mode", "latest-offset")
                .build());
        //3、创建kafka sink table
        final Schema sinkSchema = Schema.newBuilder()
                .column("user", DataTypes.STRING())
                .column("url", DataTypes.STRING())
                .column("cTime", DataTypes.STRING())
//                .column("cnt", DataTypes.BIGINT())
                .build();
        tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka")
                .schema(sinkSchema)
                .format("csv")
                .option("topic", output_topic)
                .option("properties.bootstrap.servers", "node02:6667")
                .build());
        //4、Table API从sourceTable读取数据并以csv格式写入sinkTable
        /**
         *
         * 如果报错:Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath
         * 原因:缺省 flink-connector-kafka_${scala.binary.version}  这个库的支持。 这个库支持 upsert-kafka
         * 请查看 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
         *
         * 这种模式 insert_only
         */
        tEnv.from("sourceTable")
                .select($("user"), $("url"), $("cTime"))
                .executeInsert("sinkTable");
        /**
         * Table sink 'default_catalog.default_database.sinkTable' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user, url], select=[user, url, COUNT(url) AS EXPR$0])
         * 向sinkTable中注入数据时, 不支持 分组聚合的table
         *
         * 解决方式: 将聚合类型的table 转换为 ChangelogStream类型的<>DataStream</>, 然后,DataStream.sink 到 kafka 中
         */
        Table select = tEnv.from("sourceTable")
                .groupBy($("user"), $("url"))
                .select($("user"), $("url").count().as("cnt"));
        DataStream<Row> rowDataStream = tEnv.toChangelogStream(select,
                Schema.newBuilder()
                        .column("user", DataTypes.STRING())
                        .column("cnt", DataTypes.BIGINT()).build(),
                ChangelogMode.all());
        DataStream<String> stringDataStream = rowDataStream.map(new MapFunction<Row, String>() {
            @Override
            public String map(Row row) throws Exception {
                return "xxxx";
            }
        });
        stringDataStream.sinkTo(KafkaSink.<String>builder()
                .setBootstrapServers("node02:6667")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(output_topic_change)
                        .setKafkaValueSerializer(StringSerializer.class)
                        .build())
                .build());
    }
}


kafka输入和输出高级特性(更多option)


可以通过更多option来启⽤⾼级特性,kafka connector⽀持的option

详⻅如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#connector-options



key和value格式

Kafka消息的key和value均可指定format。

  • 仅指定value format


kafka消息中key是可以选的,可以只指定value format


tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
 .schema(schema)
 .format("json")
 ...
 .build());


  • 指定key和value format
tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka")
 .schema(schema)
// .format("json")
 .option("topic",input_topic)
 .option("key.format","json")
 .option("value.format","json")
 ...
 .build())


注意:format("json")和option("value.format","json")⼆选⼀,⼆者等价


Topic和Partition发现

可以通过topic或者topic-pattern来配置主题。



配置项

source

sink

topic

⽀持单个topic或者topic列表,topic列表⽤分号分割:topic-1;topic-2,topic和topic-pattern⼆选⼀

注意:作为sink时不⽀ 持topic列表

topic-pattern

通过topic名字的正则表达式来指定多个topic,例如:test-topic-[0- 9],topic和topic-pattern⼆选⼀

不能作⽤于sink


注意:要允许在作业开始运⾏后发现动态创建的topic,请为 scan.topic-partition-discovery.interval 设置⼀个⾮负 值


source读取位置

作为source是,可以通过scan.startup.mode选项指定从哪个位置开始消费,可选的值如下


选项值

说明

group-offsets(默认 值)

使⽤当前消费组提交到ZK / Kafka的offset开始消费,Flink1.14有bug:https://issues.apach e.org/jira/browse/FLINK-2469

earliest-offset

从earliest offset开始消费(每次)

latest-offset

从latest offset开始消费(每次)

specific-offsets

从⽤户指定的每个分区的特定偏移量开始消费,必须跟scan.startup.specific-offsets配置搭 配使⽤:例如:partition:0,offset:42;partition:1,offset:300


sink分区

当kafka作为sink时,可以通过sink.partitioner指定partitioner。


选项值

说明

default

使用kafka默认的partitioner

fixed

每个Flink分区上的消息最多分区到一个kafka分区,可以降低网络开销

round-robin

只有当kafka消息没有key时,才有用

自定义FlinkKafkaPartitioner的子类



一致性保证

默认情况下,如果启⽤checkpoint,Kafka sink使⽤at-least-once⼀致性语意。 在启⽤checkpoint的前题下,可通过sink.delivery-guarantee来调整⼀致性语意:




选项值

说明

none

Flink不做任何保证,生成的记录可能会丢失或重复

at-least-once(default)

至少一次保证,但可能会重复

exactly-once

使用kafka事务提供精确一次语义


⼀旦启⽤了事物来保证exactly-once语意,⼀定要注意下游消费者要配置isolation.level为read_committed(默认 是read_uncommitted)。









相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
SQL Perl
PL/SQL编程基本概念
PL/SQL编程基本概念
13 0
|
3月前
|
流计算 Windows
Flink窗口与状态编程开发(一)
Flink窗口与状态编程开发(一)
|
2天前
|
SQL Java 数据库连接
Java从入门到精通:2.3.2数据库编程——了解SQL语言,编写基本查询语句
Java从入门到精通:2.3.2数据库编程——了解SQL语言,编写基本查询语句
|
14天前
|
存储 Java 关系型数据库
掌握Java 8 Stream API的艺术:详解流式编程(一)
掌握Java 8 Stream API的艺术:详解流式编程
46 1
|
1月前
|
算法 Linux API
【Linux系统编程】一文了解 Linux目录的创建和删除API 创建、删除与读取
【Linux系统编程】一文了解 Linux目录的创建和删除API 创建、删除与读取
28 0
【Linux系统编程】一文了解 Linux目录的创建和删除API 创建、删除与读取
|
1月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
36 5
|
1月前
|
SQL Go
SQL编程
SQL编程
9 0
|
1月前
|
SQL 分布式计算 测试技术
Flink API的4个层次
【2月更文挑战第28天】
|
1月前
|
Linux API C++
【Linux C/C++ 线程同步 】Linux API 读写锁的编程使用
【Linux C/C++ 线程同步 】Linux API 读写锁的编程使用
21 1
|
1月前
|
消息中间件 SQL Kafka
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。
287 0
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展

热门文章

最新文章