Flink教程(17)- Flink Table与SQL(案例与SQL算子)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(17)- Flink Table与SQL(案例与SQL算子)

01 引言

在前面的博客,我们学习了FlinkTableAP和SQL,有兴趣的同学可以参阅下:

本文主要讲的是Flink TableSQL的一些案例。

02 Flink Table&SQL 案例

2.1 案例1(DataStream SQL统计)

需求:将DataStream注册为TableView并进行SQL统计。

代码如下

import static org.apache.flink.table.api.Expressions.$;
/**
 * 案例1:将DataStream注册为Table和View并进行SQL统计
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:07 下午
 */
public class Demo1 {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 2)));
        DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 3),
                new Order(2L, "rubber", 3),
                new Order(4L, "beer", 1)));
        //3.注册表
        // convert DataStream to Table
        Table tableA = tEnv.fromDataStream(orderA, $("user"), $("product"), $("amount"));
        // register DataStream as Table
        tEnv.createTemporaryView("OrderB", orderB, $("user"), $("product"), $("amount"));
        //4.执行查询
        System.out.println(tableA);
        // union the two tables
        Table resultTable = tEnv.sqlQuery(
                "SELECT * FROM " + tableA + " WHERE amount > 2 " +
                        "UNION ALL " +
                        "SELECT * FROM OrderB WHERE amount < 2"
        );
        //5.输出结果
        DataStream<Order> resultDS = tEnv.toAppendStream(resultTable, Order.class);
        resultDS.print();
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Order {
        public Long user;
        public String product;
        public int amount;
    }
}

运行结果:

2.2 案例2(DataStream Table&SQL统计)

需求:使用SQLTable两种方式对DataStream中的单词进行统计。

示例代码如下(SQL方式):

import static org.apache.flink.table.api.Expressions.$;
/**
 * 使用SQL和Table两种方式对DataStream中的单词进行统计。
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:13 下午
 */
public class Demo02 {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStream<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );
        //3.注册表
        tEnv.createTemporaryView("WordCount", input, $("word"), $("frequency"));
        //4.执行查询
        Table resultTable = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
        //5.输出结果
        //toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate
        //DataStream<WC> resultDS = tEnv.toAppendStream(resultTable, WC.class);
        DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
        resultDS.print();
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC {
        public String word;
        public long frequency;
    }
}

运行结果:

示例代码如下(Table方式):

/**
 * 使用SQL和Table两种方式对DataStream中的单词进行统计
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:15 下午
 */
public class Demo02Table {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStream<WC> input = env.fromElements(
                new WC("Hello", 1),
                new WC("World", 1),
                new WC("Hello", 1)
        );
        //3.注册表
        Table table = tEnv.fromDataStream(input);
        //4.执行查询
        Table resultTable = table
                .groupBy($("word"))
                .select($("word"), $("frequency").sum().as("frequency"))
                .filter($("frequency").isEqual(2));
        //5.输出结果
        DataStream<Tuple2<Boolean, WC>> resultDS = tEnv.toRetractStream(resultTable, WC.class);
        resultDS.print();
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class WC {
        public String word;
        public long frequency;
    }
}

2.3 案例3(SQL与滚动窗口)

需求:使用Flink SQL来统计5秒内 每个用户的订单总数、订单的最大金额、订单的最小金额,也就是每隔5秒统计最近5秒的每个用户的订单总数、订单的最大金额、订单的最小金额。

上面的需求使用流处理的Window的基于时间的滚动窗口就可以搞定!

编码步骤:

  1. 创建环境
  2. 使用自定义函数模拟实时流数据
  3. 设置事件时间和Watermaker
  4. 注册表
  5. 执行sql-可以使用sql风格或table风格
  6. 输出结果
  7. 触发执行

SQL方式实现:

/**
 * SQL方式实现
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:19 下午
 */
public class Demo3SQL {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
            private Boolean isRunning = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(order);
                }
            }
            @Override
            public void cancel() {
                isRunning = false;
            }
        });
        //3.Transformation
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
                );
        //4.注册表
        tEnv.createTemporaryView("t_order", watermakerDS,
                $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
        //5.执行SQL
        String sql = "select " +
                "userId," +
                "count(*) as totalCount," +
                "max(money) as maxMoney," +
                "min(money) as minMoney " +
                "from t_order " +
                "group by userId," +
                "tumble(createTime, interval '5' second)";
        Table ResultTable = tEnv.sqlQuery(sql);
        //6.Sink
        //将SQL的执行结果转换成DataStream再打印出来
        //toAppendStream → 将计算后的数据append到结果DataStream中去
        //toRetractStream  → 将计算后的新的数据在DataStream原数据的基础上更新true或是删除false
        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }
}

Table实现方式:

/**
 * Table方式
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:26 下午
 */
public class Demo3Table {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        DataStreamSource<Order> orderDS = env.addSource(new RichSourceFunction<Order>() {
            private Boolean isRunning = true;
            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Order order = new Order(UUID.randomUUID().toString(), random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(order);
                }
            }
            @Override
            public void cancel() {
                isRunning = false;
            }
        });
        //3.Transformation
        DataStream<Order> watermakerDS = orderDS
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner((event, timestamp) -> event.getCreateTime())
                );
        //4.注册表
        tEnv.createTemporaryView("t_order", watermakerDS,
                $("orderId"), $("userId"), $("money"), $("createTime").rowtime());
        //查看表约束
        tEnv.from("t_order").printSchema();
        //5.TableAPI查询
        Table ResultTable = tEnv.from("t_order")
                //.window(Tumble.over("5.second").on("createTime").as("tumbleWindow"))
                .window(Tumble.over(lit(5).second())
                        .on($("createTime"))
                        .as("tumbleWindow"))
                .groupBy($("tumbleWindow"), $("userId"))
                .select(
                        $("userId"),
                        $("userId").count().as("totalCount"),
                        $("money").max().as("maxMoney"),
                        $("money").min().as("minMoney"));
        //6.将SQL的执行结果转换成DataStream再打印出来
        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();
        //7.excute
        env.execute();
    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order {
        private String orderId;
        private Integer userId;
        private Integer money;
        private Long createTime;
    }
}

2.4 案例4(SQL消费Kafka)

需求:从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka

{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"}
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic input_kafka
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic output_kafka
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic input_kafka
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic output_kafka --from-beginning

代码实现:

/**
 * 从Kafka中消费数据并过滤出状态为success的数据再写入到Kafka
 *
 * @author : YangLinWei
 * @createTime: 2022/3/8 2:30 下午
 */
public class Demo4 {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        //2.Source
        TableResult inputTable = tEnv.executeSql(
                "CREATE TABLE input_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'input_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'properties.group.id' = 'testGroup',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json'\n" +
                        ")"
        );
        TableResult outputTable = tEnv.executeSql(
                "CREATE TABLE output_kafka (\n" +
                        "  `user_id` BIGINT,\n" +
                        "  `page_id` BIGINT,\n" +
                        "  `status` STRING\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'output_kafka',\n" +
                        "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                        "  'format' = 'json',\n" +
                        "  'sink.partitioner' = 'round-robin'\n" +
                        ")"
        );
        String sql = "select " +
                "user_id," +
                "page_id," +
                "status " +
                "from input_kafka " +
                "where status = 'success'";
        Table ResultTable = tEnv.sqlQuery(sql);
        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(ResultTable, Row.class);
        resultDS.print();
        tEnv.executeSql("insert into output_kafka select * from " + ResultTable);
        //7.excute
        env.execute();
    }
}

03 Flink SQL常用算子

3.1 SELECT

SELECT :用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

示例:

  • SELECT * FROM Table;// 取出表中的所有列
  • SELECT name,age FROM Table;// 取出表中 nameage两列

与此同时 SELECT语句中可以使用函数和别名,例如我们上面提到的 WordCount中:

SELECT word, COUNT(word) FROM table GROUP BY word; 
• 1

3.2 WHERE

WHERE :用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。

示例:

  • SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
  • SELECT * FROM Table WHERE age = 20;

WHERE是从原数据中进行过滤,那么在WHERE条件中,Flink SQL同样支持 =、<、>、<>、>=、<=,以及 ANDOR等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合IN、NOT IN联合使用。举个例子:

SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)

3.3 DISTINCT

DISTINCT: 用于从数据集/流中去重根据 SELECT 的结果进行去重。

示例

SELECT DISTINCT name FROM Table;

对于流式查询,计算查询结果所需的 State可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。

3.4 GROUP BY

GROUP BY :是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。

示例:

SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;

3.5 UNION 和 UNION ALL

UNION: 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。

示例:

SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
• 1

3.6 JOIN

JOIN :用于把来自两个表的数据联合起来形成结果表,Flink支持的JOIN 类型包括:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN

这里的 JOIN的语义和我们在关系型数据库中使用的 JOIN 语义一致。

示例:JOIN(将订单表数据和商品表进行关联)

SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id

LEFT JOINJOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应的字段补NULL输出,RIGHT JOIN 相当于LEFT JOIN左右两个表交互一下位置。FULL JOIN相当于RIGHT JOINLEFT JOIN之后进行UNION ALL 操作,示例:

SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

3.7 Group Window

根据窗口数据划分的不同,目前 Apache Flink有如下 3 种 Bounded Window:

  • 滚动窗口(Tumble:窗口数据有固定的大小,窗口数据无叠加;
  • 滑动窗口(Hop:窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;
  • 会话窗口(Session:窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。

3.7.1 Tumble Window滚动窗口

Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下:

Tumble 滚动窗口对应的语法如下:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)

其中:

  • [gk] :决定了是否需要按照字段进行聚合;
  • TUMBLE_START: 代表窗口开始时间;
  • TUMBLE_END :代表窗口结束时间;
  • timeCol :是流表中表示时间字段;
  • size :表示窗口的大小,如 秒、分钟、小时、天。

举个例子,假如我们要计算每个人每天的订单量,按照user进行聚合分组:

SELECT user, TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, SUM(amount) 
FROM Orders 
GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;

3.7.2 Hop Window滑动窗口

Hop 滑动窗口和滚动窗口类似,窗口有固定的size,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的新建频率。因此当 slide 值小于窗口size 的值的时候多个滑动窗口会重叠,具体语义如下:

Hop 滑动窗口对应语法如下:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,  
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)

每次字段的意思和 Tumble 窗口类似:

  • [gk] :决定了是否需要按照字段进行聚合;
  • HOP_START: 表示窗口开始时间;
  • HOP_END: 表示窗口结束时间;
  • timeCol :表示流表中表示时间字段;
  • slide: 表示每次窗口滑动的大小;
  • size: 表示整个窗口的大小,如 秒、分钟、小时、天。

举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:

SELECT product, SUM(amount) 
FROM Orders 
GROUP BY product,HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY)

3.7.3 Session Window会话时间窗口

会话时间窗口没有固定的持续时间,但它们的界限由interval不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。

Seeeion 会话窗口对应语法如下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,  
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)

字段解析:

  • [gk] :决定了是否需要按照字段进行聚合;
  • SESSION_START :表示窗口开始时间;
  • SESSION_END :表示窗口结束时间;
  • timeCol :表示流表中表示时间字段;
  • gap :表示窗口数据非活跃周期的时长。

例如,我们需要计算每个用户访问时间 12 小时内的订单量:

SELECT user, SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, SUM(amount) 
FROM Orders 
GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user

04 文末

本文主要讲解Flink TableSQL的案例以及Flink SQL常用算子的总结,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
本文整理自阿里云智能集团 Apache Flink Committer 刘大龙老师在2024FFA流批一体论坛的分享,涵盖三部分内容:数据工程师用户故事、Materialized Table 构建流批一体 ETL 及 Demo。文章通过案例分析传统 Lambda 架构的挑战,介绍了 Materialized Table 如何简化流批处理,提供统一 API 和声明式 ETL,实现高效的数据处理和维护。最后展示了基于 Flink 和 Paimon 的实际演示,帮助用户更好地理解和应用这一技术。
369 7
Flink Materialized Table:构建流批一体 ETL
|
16天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
|
16天前
|
SQL 存储 API
Flink Materialized Table:构建流批一体 ETL
Flink Materialized Table:构建流批一体 ETL
|
17天前
|
SQL 数据库
数据库数据恢复—SQL Server报错“错误 823”的数据恢复案例
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复。
|
2月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
285 26
|
3月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
639 2
探索Flink动态CEP:杭州银行的实战案例
|
3月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
312 27
|
3月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
232 14
|
4月前
|
SQL 数据库
gbase 8a 数据库 SQL优化案例-关联顺序优化
gbase 8a 数据库 SQL优化案例-关联顺序优化
|
6月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")

热门文章

最新文章