Flink之join多流合并

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)window多流合并


在Flink中支持窗口上的多流合并,即在一个窗口中按照相同条件对两个输入数据流进行关联操作,需要保证输入的Stream要构建在相同的Window上,并使用相同类型的Key作为关联条件。

2.png3.png


(2)Window join


4.png5.png


Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作;interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理;


(2.1)Tumbling Window Join详解

滚动窗口关联数据操作是将滚动窗口中相同的Key的两个Datastream数据集中的元素进行关联,并应用用户自定义的JoinFunction计算关联结果。


6.png

Tumbling Window Join代码开发

MySource:

package com.aikfk.flink.base;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public  class MySource implements SourceFunction<String> {
    @Override
    public void cancel() {
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String[] datas = {
                "a,1575159390000",
                "a,1575159402000",
                "b,1575159427000",
                "c,1575159382000",
                "b,1575159407000",
                "a,1575159302000"
        };
        for (int k = 0; k < datas.length; k++) {
            Thread.sleep(100);
            ctx.collect(datas[k]);
        }
    }
}

MySource2:

package com.aikfk.flink.base;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public  class MySource2 implements SourceFunction<String> {
    @Override
    public void cancel() {
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String[] datas = {
                "a,1575159381300",
                "a,1575159399000",
                "d,1575159397000",
                "f,1575159384000"
        };
        for (int k = 0; k < datas.length; k++) {
            ctx.collect(datas[k]);
        }
    }
}

WindowJoin:

window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法

package com.aikfk.flink.datastream.window;
import com.aikfk.flink.base.MySource;
import com.aikfk.flink.base.MySource2;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/25 1:57 下午
 */
public class WindowJoin {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.生成dataStream1,window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream1 = env.addSource(new MySource()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        })
        .assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // 3.生成dataStream2,window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream2 = env.addSource(new MySource2()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // TumblingEvent window join
        dataStream1.join(dataStream2)
                .where(key -> key.f0)
                .equalTo(key -> key.f0)
                .window(TumblingEventTimeWindows.of(Time.minutes(1L)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Object>() {
                    @Override
                    public Object join(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
                        return new Tuple4<>(t1.f0,t1.f1,t2.f0,t2.f1);
                    }
                })
                .print();
        env.execute("Window WordCount");
    }
}

运行结果:

11> (a,1575159390000,a,1575159381300)
11> (a,1575159390000,a,1575159399000)
11> (a,1575159402000,a,1575159381300)
11> (a,1575159402000,a,1575159399000)

(2.2)Sliding Window Join详解

两个Datastream数据元素在单个窗口中根据相同的Key进行关联,且关联数据会发生重叠同时滑动窗口关联也是基于内连接,如果一个窗口中只出现了一个Datastream中的Key,则不会输出关联计算结果。

10.png


(2.3)Session Window Join详解

会话窗口是根据Session Gap将数据集划分成不同的窗口,会话窗口关联对两个Stream的数据元素进行窗口关联操作,窗口中含有两个数据集元素,并且元素具有相同的Key,则输出关联计算结果。

11.png


(3)Interval join


12.png13.png




Interval join代码开发:

interval join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法

package com.aikfk.flink.datastream.window;
import com.aikfk.flink.base.MySource;
import com.aikfk.flink.base.MySource2;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/25 1:57 下午
 */
public class IntervalJoin {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.生成dataStream1,interval join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream1 = env.addSource(new MySource()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        })
        .assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // 3.生成dataStream2,interval join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream2 = env.addSource(new MySource2()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // interval join
        dataStream1.keyBy(key -> key.f0)
                .intervalJoin(dataStream2.keyBy(key -> key.f0))
                .between(Time.seconds(0L),Time.seconds(2L))
                .process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Object>() {
                    @Override
                    public void processElement(Tuple2<String, Long> t1,
                                               Tuple2<String, Long> t2,
                                               Context context, Collector<Object> collector) throws Exception {
                        collector.collect(new Tuple4<>(t1.f0,t1.f1,t2.f0,t2.f1));
                    }
                }).print();
        env.execute("Window WordCount");
    }
}


(4)几个流合并区别


join , coGroup , connect, union的区别

join :


可用于DataStream和DataSet。只能2个DataStream一起join,或者2个DataSet一起join

用于DataStream时返回是JoinedStreams,用于DataSet时返回是Join0peratorSets

用于DataStream时需要与窗口同时使用,语法是:join where equalTo window apply,用于DataSet时的语法是:join where equalTo with (where是指定第一个输入的分区字段,equalTo是指定第二个输入的分区字段,这2个字段类型需要一致)

与SQL中的inner join同义,只输出2个实时窗口内或2个数据集合内能匹配上的笛卡尔积,不能匹配上的不输出。

apply方法中或with方法中均可以使用JoinFunction或FlatJoinFunction处理匹配上的数据对(用于DataStream和DataSet时均可)

侧重对2个输入里的数据对进行处理,join方法的入参是单个数据

可以join2个类型不同的流或join2个类型不同的数据集(比如Tuple2<String, Long> join Tuple2<Long, String>),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

coGroup :


可用于DataStream和DataSet。只能2个DataStream一起coGroup,或者2个DataSet一起coGroup

用于DataStream时返回是CoGroupedStreams,用于DataSet时返回是CoGroup0peratorSets

用于DataStream时需要与窗口同时使用,语法是:coGroup where equalTo window apply,用于DataSet时的语法是: coGroup where equalTo with

把2个实时窗口内或2个数据集合内key相同的数据分组同一个分区,key不能匹配上的数据(只在一个窗口或集合内存在的数据)也分组到另一个分区上。

apply方法中或with方法中均可以使用CoGroupFunction对数据分组(用于DataStream和DataSet时均可,无FlatCoGroupFunction)

侧重对2个输入的集合进行处理,coGroup方法的入参是Iterable类型

可以coGroup2个类型不同的流或coGroup2个类型不同的数据集(比如Tuple2<String, Long> join Tuple2<Long,String>),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

connect:


只能用于DataStream,返回是ConnectedStreams。不能用于DataSet.

只能2个流一起connect(stream1.connect(stream2))

connect后可以对2个流分别处理(使用CoMapFunction或CoFlatMapFunction)

可以connect2个类型不同的流(比如Tuple2<String,Long> connect Tuple2<Long,String>)

union:


可以多个流一起合并(stream1.union(stream2, stream3, stream4)),合并结果是一个新Datastream;只能2个DataSet一起

用于DataStream时,返回是Datas tream;用于DataSet时,返回是DataSet;"2

合并,合并结果是一个新DataSet

无论是合并Datastream还是合并DataSet,都不去重,2个源的消息或记录都保存。

不可以union 2个类型不同的流或union 2个类型不同的数据集




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
128 3
|
3月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之使用StarRocks作为Lookup Join的表是否合适
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
3月前
|
存储 监控 Oracle
实时计算 Flink版产品使用问题之如何解决双流Join导致的状态膨胀和资源压力问题
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Java 数据处理 Apache
实时计算 Flink版产品使用问题之lookup Join hologres的维表,是否可以指定查bitmap
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 算法 关系型数据库
实时计算 Flink版产品使用合集之全量历史数据比较多,全量同步阶段时间长,是否会同时读取binlog进行合并输出
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 消息中间件 数据库
实时计算 Flink版操作报错合集之监听表和维表join的时,维表的字段超过两个时就会报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
63 1
|
6月前
|
SQL Java API
实时计算 Flink版产品使用合集之使用 left interval join 和 timestamp assigner 进行灰度切换,并发现在灰度完成后水印停滞不前如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用合集之flinksql多流join,left右边任意一张表数据到后,是否都会更新test中对应的数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。