FlinkSQL 双流Join

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流处理模式下,双流join

Join过程




只是在流处理模式下,join的两个表是流式的,这就涉及到两边相互等待的问题了:例如左表来了⼀条数据,join 右表时发现没有匹配的记录,这时会直接放弃join吗?还是会等待⼀段时间呢再次join呢?会⽆限等待下去吗?



答案是,两边表⾥的数据都会在状态中保存⼀段时间,直到join上为⽌,默认是⽆限保存,可以设置过期时间(过期 时间类似于session,只要有join上的情况出现就会⾃动延期):




双流join的语法没什么特殊的


SELECT*FROMOrdersINNERJOINProductONOrders.product_id=Product.id;
SELECT*FROMOrdersLEFTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersRIGHTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersFULLOUTERJOINProductONOrders.product_id=Product.id





实例代码:

packagecom.blink.sb.join;
importcom.blink.sb.beans.Orders;
importcom.blink.sb.beans.Product;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** 这里演示下Flink的join*/publicclassFlinkJoin {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
//设置空闲状态清理时间,默认0标识永不清理//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));//也可以用这种方式设置Configurationconfiguration=tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "5s");
//3、读入订单数据DataStream<Orders>ordersStream=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnOrders                            .builder()
                            .user(arr[0])
                            .productId(Long.parseLong(arr[1]))
                            .amount(Integer.parseInt(arr[2]))
                            .orderTp(Long.parseLong(arr[3]))
                            .build();
                });
//4、读取产品数据DataStream<Product>productStream=env.socketTextStream("localhost", 9999)
                .map(event-> {
String[] arr=event.split(",");
returnProduct                            .builder()
                            .id(Long.parseLong(arr[0]))
                            .name(arr[1])
                            .build();
                });
tEnv.createTemporaryView("orders",ordersStream);
tEnv.createTemporaryView("product",productStream);
//5、双流JointEnv.sqlQuery("select * from orders o join product p on o.productId=p.id")
                .execute()
                .print();
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
SQL 关系型数据库 RDS
|
2月前
|
SQL 消息中间件 存储
Flink报错问题之flink双流join报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
SQL 流计算
Flink SQL提供了行转列的功能,可以通过使用`UNPIVOT`操作来实现
【1月更文挑战第1天】Flink SQL提供了行转列的功能,可以通过使用`UNPIVOT`操作来实现
120 0
|
6月前
|
SQL 关系型数据库 API
Flink--9、双流联结(窗口联结、间隔联结)
Flink--9、双流联结(窗口联结、间隔联结)
|
消息中间件 关系型数据库 MySQL
flink 维表join(一):广播流的使用
flink 维表join(一):广播流的使用
|
SQL 消息中间件 Kafka
FlinkSQL 几种join
FlinkSQL 几种join
FlinkSQL 几种join
|
消息中间件 缓存 Kafka
Flink 双流 Join 的3种操作示例
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:1、join();2、coGroup();3、intervalJoin()
Flink 双流 Join 的3种操作示例
|
SQL 消息中间件 缓存
Flink SQL 实战:双流 join 场景应用
大家都知道在使用 SQL 进行数据分析的过程中,join 是经常要使用的操作。在离线场景中,join 的数据集是有边界的,可以缓存数据有边界的数据集进行查询,有Nested Loop/Hash Join/Sort Merge Join 等多表 join;而在实时场景中,join 两侧的数据都是无边界的数据流,所以缓存数据集对长时间 job 来说,存储和查询压力很大。如何从容应对各种流式场景?
Flink SQL 实战:双流 join 场景应用
|
SQL 缓存 运维
Flink SQL 如何实现数据流的 Join?
Join 的实现依赖于缓存整个数据集,而 Streaming SQL Join 的对象却是无限的数据流,内存压力和计算效率在长期运行来说都是不可避免的问题。下文将结合 SQL 的发展解析 Flink SQL 是如何解决这些问题并实现两个数据流的 Join。
Flink SQL 如何实现数据流的 Join?