Flink之join多流合并

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 笔记

(1)window多流合并


在Flink中支持窗口上的多流合并,即在一个窗口中按照相同条件对两个输入数据流进行关联操作,需要保证输入的Stream要构建在相同的Window上,并使用相同类型的Key作为关联条件。

2.png3.png


(2)Window join


4.png5.png


Windows类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作;interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理;


(2.1)Tumbling Window Join详解

滚动窗口关联数据操作是将滚动窗口中相同的Key的两个Datastream数据集中的元素进行关联,并应用用户自定义的JoinFunction计算关联结果。


6.png

Tumbling Window Join代码开发

MySource:

package com.aikfk.flink.base;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public  class MySource implements SourceFunction<String> {
    @Override
    public void cancel() {
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String[] datas = {
                "a,1575159390000",
                "a,1575159402000",
                "b,1575159427000",
                "c,1575159382000",
                "b,1575159407000",
                "a,1575159302000"
        };
        for (int k = 0; k < datas.length; k++) {
            Thread.sleep(100);
            ctx.collect(datas[k]);
        }
    }
}

MySource2:

package com.aikfk.flink.base;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public  class MySource2 implements SourceFunction<String> {
    @Override
    public void cancel() {
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String[] datas = {
                "a,1575159381300",
                "a,1575159399000",
                "d,1575159397000",
                "f,1575159384000"
        };
        for (int k = 0; k < datas.length; k++) {
            ctx.collect(datas[k]);
        }
    }
}

WindowJoin:

window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法

package com.aikfk.flink.datastream.window;
import com.aikfk.flink.base.MySource;
import com.aikfk.flink.base.MySource2;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/25 1:57 下午
 */
public class WindowJoin {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.生成dataStream1,window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream1 = env.addSource(new MySource()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        })
        .assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // 3.生成dataStream2,window join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream2 = env.addSource(new MySource2()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // TumblingEvent window join
        dataStream1.join(dataStream2)
                .where(key -> key.f0)
                .equalTo(key -> key.f0)
                .window(TumblingEventTimeWindows.of(Time.minutes(1L)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Object>() {
                    @Override
                    public Object join(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
                        return new Tuple4<>(t1.f0,t1.f1,t2.f0,t2.f1);
                    }
                })
                .print();
        env.execute("Window WordCount");
    }
}

运行结果:

11> (a,1575159390000,a,1575159381300)
11> (a,1575159390000,a,1575159399000)
11> (a,1575159402000,a,1575159381300)
11> (a,1575159402000,a,1575159399000)

(2.2)Sliding Window Join详解

两个Datastream数据元素在单个窗口中根据相同的Key进行关联,且关联数据会发生重叠同时滑动窗口关联也是基于内连接,如果一个窗口中只出现了一个Datastream中的Key,则不会输出关联计算结果。

10.png


(2.3)Session Window Join详解

会话窗口是根据Session Gap将数据集划分成不同的窗口,会话窗口关联对两个Stream的数据元素进行窗口关联操作,窗口中含有两个数据集元素,并且元素具有相同的Key,则输出关联计算结果。

11.png


(3)Interval join


12.png13.png




Interval join代码开发:

interval join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法

package com.aikfk.flink.datastream.window;
import com.aikfk.flink.base.MySource;
import com.aikfk.flink.base.MySource2;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/3/25 1:57 下午
 */
public class IntervalJoin {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.生成dataStream1,interval join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream1 = env.addSource(new MySource()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        })
        .assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // 3.生成dataStream2,interval join之前必须要生成WM,即实现assignTimestampsAndWatermarks方法
        DataStream<Tuple2<String,Long>> dataStream2 = env.addSource(new MySource2()).map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] words = s.split(",");
                return new Tuple2<>(words[0] , Long.parseLong(words[1]));
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofMinutes(1L))
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String,Long> input, long l) {
                        return input.f1;
                    }
                }));
        // interval join
        dataStream1.keyBy(key -> key.f0)
                .intervalJoin(dataStream2.keyBy(key -> key.f0))
                .between(Time.seconds(0L),Time.seconds(2L))
                .process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Object>() {
                    @Override
                    public void processElement(Tuple2<String, Long> t1,
                                               Tuple2<String, Long> t2,
                                               Context context, Collector<Object> collector) throws Exception {
                        collector.collect(new Tuple4<>(t1.f0,t1.f1,t2.f0,t2.f1));
                    }
                }).print();
        env.execute("Window WordCount");
    }
}


(4)几个流合并区别


join , coGroup , connect, union的区别

join :


可用于DataStream和DataSet。只能2个DataStream一起join,或者2个DataSet一起join

用于DataStream时返回是JoinedStreams,用于DataSet时返回是Join0peratorSets

用于DataStream时需要与窗口同时使用,语法是:join where equalTo window apply,用于DataSet时的语法是:join where equalTo with (where是指定第一个输入的分区字段,equalTo是指定第二个输入的分区字段,这2个字段类型需要一致)

与SQL中的inner join同义,只输出2个实时窗口内或2个数据集合内能匹配上的笛卡尔积,不能匹配上的不输出。

apply方法中或with方法中均可以使用JoinFunction或FlatJoinFunction处理匹配上的数据对(用于DataStream和DataSet时均可)

侧重对2个输入里的数据对进行处理,join方法的入参是单个数据

可以join2个类型不同的流或join2个类型不同的数据集(比如Tuple2<String, Long> join Tuple2<Long, String>),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

coGroup :


可用于DataStream和DataSet。只能2个DataStream一起coGroup,或者2个DataSet一起coGroup

用于DataStream时返回是CoGroupedStreams,用于DataSet时返回是CoGroup0peratorSets

用于DataStream时需要与窗口同时使用,语法是:coGroup where equalTo window apply,用于DataSet时的语法是: coGroup where equalTo with

把2个实时窗口内或2个数据集合内key相同的数据分组同一个分区,key不能匹配上的数据(只在一个窗口或集合内存在的数据)也分组到另一个分区上。

apply方法中或with方法中均可以使用CoGroupFunction对数据分组(用于DataStream和DataSet时均可,无FlatCoGroupFunction)

侧重对2个输入的集合进行处理,coGroup方法的入参是Iterable类型

可以coGroup2个类型不同的流或coGroup2个类型不同的数据集(比如Tuple2<String, Long> join Tuple2<Long,String>),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

connect:


只能用于DataStream,返回是ConnectedStreams。不能用于DataSet.

只能2个流一起connect(stream1.connect(stream2))

connect后可以对2个流分别处理(使用CoMapFunction或CoFlatMapFunction)

可以connect2个类型不同的流(比如Tuple2<String,Long> connect Tuple2<Long,String>)

union:


可以多个流一起合并(stream1.union(stream2, stream3, stream4)),合并结果是一个新Datastream;只能2个DataSet一起

用于DataStream时,返回是Datas tream;用于DataSet时,返回是DataSet;"2

合并,合并结果是一个新DataSet

无论是合并Datastream还是合并DataSet,都不去重,2个源的消息或记录都保存。

不可以union 2个类型不同的流或union 2个类型不同的数据集




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
23 3
|
5月前
|
流计算
Flink 多个stream合并聚合
Flink 多个stream合并聚合
44 0
Flink 多个stream合并聚合
|
6月前
|
SQL 存储 API
Flink教程(20)- Flink高级特性(双流Join)
Flink教程(20)- Flink高级特性(双流Join)
94 0
|
7月前
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
27 1
|
2月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之从EARLIEST_OFFSET启动就报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
存储 监控 数据库
Flink CDC产品常见问题之Lookup Join之后再分组聚合部分数据从零开始如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
3月前
|
SQL 消息中间件 存储
Flink报错问题之flink双流join报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
4月前
|
SQL Java 数据库连接
这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
【1月更文挑战第17天】【1月更文挑战第85篇】这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
23 8
|
5月前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
102 2
|
5月前
|
流计算
请问一下Flink怎么给join设置parallelism?
请问一下Flink怎么给join设置parallelism?
29 0