FlinkSQL 双流Join

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流处理模式下,双流join

Join过程




只是在流处理模式下,join的两个表是流式的,这就涉及到两边相互等待的问题了:例如左表来了⼀条数据,join 右表时发现没有匹配的记录,这时会直接放弃join吗?还是会等待⼀段时间呢再次join呢?会⽆限等待下去吗?



答案是,两边表⾥的数据都会在状态中保存⼀段时间,直到join上为⽌,默认是⽆限保存,可以设置过期时间(过期 时间类似于session,只要有join上的情况出现就会⾃动延期):




双流join的语法没什么特殊的


SELECT*FROMOrdersINNERJOINProductONOrders.product_id=Product.id;
SELECT*FROMOrdersLEFTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersRIGHTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersFULLOUTERJOINProductONOrders.product_id=Product.id





实例代码:

packagecom.blink.sb.join;
importcom.blink.sb.beans.Orders;
importcom.blink.sb.beans.Product;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** 这里演示下Flink的join*/publicclassFlinkJoin {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
//设置空闲状态清理时间,默认0标识永不清理//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));//也可以用这种方式设置Configurationconfiguration=tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "5s");
//3、读入订单数据DataStream<Orders>ordersStream=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnOrders                            .builder()
                            .user(arr[0])
                            .productId(Long.parseLong(arr[1]))
                            .amount(Integer.parseInt(arr[2]))
                            .orderTp(Long.parseLong(arr[3]))
                            .build();
                });
//4、读取产品数据DataStream<Product>productStream=env.socketTextStream("localhost", 9999)
                .map(event-> {
String[] arr=event.split(",");
returnProduct                            .builder()
                            .id(Long.parseLong(arr[0]))
                            .name(arr[1])
                            .build();
                });
tEnv.createTemporaryView("orders",ordersStream);
tEnv.createTemporaryView("product",productStream);
//5、双流JointEnv.sqlQuery("select * from orders o join product p on o.productId=p.id")
                .execute()
                .print();
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
103 3
|
SQL 关系型数据库 RDS
|
2月前
|
SQL 安全 流计算
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
Flink SQL 在快手实践问题之Group Window Aggregate 中的数据倾斜问题如何解决
57 1
|
4月前
|
Java 数据处理 Apache
实时计算 Flink版产品使用问题之lookup Join hologres的维表,是否可以指定查bitmap
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 消息中间件 存储
Flink报错问题之flink双流join报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
5月前
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
5月前
|
SQL 流计算
Flink SQL提供了行转列的功能,可以通过使用`UNPIVOT`操作来实现
【1月更文挑战第1天】Flink SQL提供了行转列的功能,可以通过使用`UNPIVOT`操作来实现
423 0
|
SQL 关系型数据库 API
Flink--9、双流联结(窗口联结、间隔联结)
Flink--9、双流联结(窗口联结、间隔联结)
|
消息中间件 关系型数据库 MySQL
flink 维表join(一):广播流的使用
flink 维表join(一):广播流的使用
|
消息中间件 缓存 Kafka
Flink 双流 Join 的3种操作示例
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:1、join();2、coGroup();3、intervalJoin()
Flink 双流 Join 的3种操作示例