FlinkSQL 双流Join

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流处理模式下,双流join

Join过程




只是在流处理模式下,join的两个表是流式的,这就涉及到两边相互等待的问题了:例如左表来了⼀条数据,join 右表时发现没有匹配的记录,这时会直接放弃join吗?还是会等待⼀段时间呢再次join呢?会⽆限等待下去吗?



答案是,两边表⾥的数据都会在状态中保存⼀段时间,直到join上为⽌,默认是⽆限保存,可以设置过期时间(过期 时间类似于session,只要有join上的情况出现就会⾃动延期):




双流join的语法没什么特殊的


SELECT*FROMOrdersINNERJOINProductONOrders.product_id=Product.id;
SELECT*FROMOrdersLEFTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersRIGHTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersFULLOUTERJOINProductONOrders.product_id=Product.id





实例代码:

packagecom.blink.sb.join;
importcom.blink.sb.beans.Orders;
importcom.blink.sb.beans.Product;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** 这里演示下Flink的join*/publicclassFlinkJoin {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
//设置空闲状态清理时间,默认0标识永不清理//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));//也可以用这种方式设置Configurationconfiguration=tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "5s");
//3、读入订单数据DataStream<Orders>ordersStream=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnOrders                            .builder()
                            .user(arr[0])
                            .productId(Long.parseLong(arr[1]))
                            .amount(Integer.parseInt(arr[2]))
                            .orderTp(Long.parseLong(arr[3]))
                            .build();
                });
//4、读取产品数据DataStream<Product>productStream=env.socketTextStream("localhost", 9999)
                .map(event-> {
String[] arr=event.split(",");
returnProduct                            .builder()
                            .id(Long.parseLong(arr[0]))
                            .name(arr[1])
                            .build();
                });
tEnv.createTemporaryView("orders",ordersStream);
tEnv.createTemporaryView("product",productStream);
//5、双流JointEnv.sqlQuery("select * from orders o join product p on o.productId=p.id")
                .execute()
                .print();
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
23 3
|
SQL 关系型数据库 RDS
|
3月前
|
SQL 消息中间件 存储
Flink报错问题之flink双流join报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
消息中间件 Java Kafka
flink问题之在通过TableFunction实现行转列时Row一直是空如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
305 1
|
3月前
|
Java 数据库连接 API
Flink报错问题之用Tumble窗口函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
SQL 关系型数据库 API
Flink--9、双流联结(窗口联结、间隔联结)
Flink--9、双流联结(窗口联结、间隔联结)
|
消息中间件 关系型数据库 MySQL
flink 维表join(一):广播流的使用
flink 维表join(一):广播流的使用
|
SQL 消息中间件 Kafka
FlinkSQL 几种join
FlinkSQL 几种join
FlinkSQL 几种join
|
消息中间件 缓存 Kafka
Flink 双流 Join 的3种操作示例
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:1、join();2、coGroup();3、intervalJoin()
Flink 双流 Join 的3种操作示例
|
SQL 分布式计算 Hadoop
Flink SQL 如何实现列转行?
在 SQL 任务里面经常会遇到一列转多行的需求,今天就来总结一下在 Flink SQL 里面如何实现列转行的,先来看下面的一个具体案例. 需求 原始数据格式如下: name data JasonLee [{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}] data 格式化