FlinkSQL 双流Join

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 流处理模式下,双流join

Join过程




只是在流处理模式下,join的两个表是流式的,这就涉及到两边相互等待的问题了:例如左表来了⼀条数据,join 右表时发现没有匹配的记录,这时会直接放弃join吗?还是会等待⼀段时间呢再次join呢?会⽆限等待下去吗?



答案是,两边表⾥的数据都会在状态中保存⼀段时间,直到join上为⽌,默认是⽆限保存,可以设置过期时间(过期 时间类似于session,只要有join上的情况出现就会⾃动延期):




双流join的语法没什么特殊的


SELECT*FROMOrdersINNERJOINProductONOrders.product_id=Product.id;
SELECT*FROMOrdersLEFTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersRIGHTJOINProductONOrders.product_id=Product.idSELECT*FROMOrdersFULLOUTERJOINProductONOrders.product_id=Product.id





实例代码:

packagecom.blink.sb.join;
importcom.blink.sb.beans.Orders;
importcom.blink.sb.beans.Product;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.datastream.DataStream;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/*** 这里演示下Flink的join*/publicclassFlinkJoin {
publicstaticvoidmain(String[] args) throwsException {
//1、获取Stream执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执行环境StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);
//设置空闲状态清理时间,默认0标识永不清理//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));//也可以用这种方式设置Configurationconfiguration=tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "5s");
//3、读入订单数据DataStream<Orders>ordersStream=env.socketTextStream("localhost", 8888)
                .map(event-> {
String[] arr=event.split(",");
returnOrders                            .builder()
                            .user(arr[0])
                            .productId(Long.parseLong(arr[1]))
                            .amount(Integer.parseInt(arr[2]))
                            .orderTp(Long.parseLong(arr[3]))
                            .build();
                });
//4、读取产品数据DataStream<Product>productStream=env.socketTextStream("localhost", 9999)
                .map(event-> {
String[] arr=event.split(",");
returnProduct                            .builder()
                            .id(Long.parseLong(arr[0]))
                            .name(arr[1])
                            .build();
                });
tEnv.createTemporaryView("orders",ordersStream);
tEnv.createTemporaryView("product",productStream);
//5、双流JointEnv.sqlQuery("select * from orders o join product p on o.productId=p.id")
                .execute()
                .print();
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
15
分享
相关文章
出现了 FlinkServerException 或 FlinkSQLException 错误
出现了 FlinkServerException 或 FlinkSQLException 错误
766 1
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
298 1
京东物流基于Flink & StarRocks的湖仓建设实践
Flink报错问题之Flink报错:Table sink 'a' doesn't support consuming update and delete changes which is produced by node如何解决
Flink报错通常是指在使用Apache Flink进行实时数据处理时遇到的错误和异常情况;本合集致力于收集Flink运行中的报错信息和解决策略,以便开发者及时排查和修复问题,优化Flink作业的稳定性。
万字长文:一文彻底搞懂Elasticsearch中Geo数据类型查询、聚合、排序
万字长文:一文彻底搞懂Elasticsearch中Geo数据类型查询、聚合、排序
95535 140
实时计算 Flink版操作报错合集之TaskExecutor 如何解决ElasticsearchConnectorOptions类被废弃的问题
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
基于Hadoop的大数据可视化方法
【8月更文第28天】在大数据时代,有效地处理和分析海量数据对于企业来说至关重要。Hadoop作为一个强大的分布式数据处理框架,能够处理PB级别的数据量。然而,仅仅完成数据处理还不够,还需要将这些数据转化为易于理解的信息,这就是数据可视化的重要性所在。本文将详细介绍如何使用Hadoop处理后的数据进行有效的可视化分析,并会涉及一些流行的可视化工具如Tableau、Qlik等。
742 0
实时计算 Flink版操作报错之遇到设置之后报错:java.sql.BatchUpdateException: ORA-01461:,如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
Flink报错问题之提交flink sql任务报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问