8.1 窗口联结(Window Join)
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
package org.example.watermark; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class WindowJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env .fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("b", 3), Tuple2.of("c", 4) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env .fromElements( Tuple3.of("a", 1, 1), Tuple3.of("a", 11, 1), Tuple3.of("b", 2, 1), Tuple3.of("b", 12, 1), Tuple3.of("c", 14, 1), Tuple3.of("d", 15, 1) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps() .withTimestampAssigner((value, ts) -> value.f1 * 1000L) ); // TODO window join // 1. 落在同一个时间窗口范围内才能匹配 // 2. 根据keyby的key,来进行匹配关联 // 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join DataStream<String> join = ds1.join(ds2) .where(r1 -> r1.f0) // ds1的keyby .equalTo(r2 -> r2.f0) // ds2的keyby .window(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { /** * 关联上的数据,调用join方法 * @param first ds1的数据 * @param second ds2的数据 * @return * @throws Exception */ @Override public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception { return first + "<----->" + second; } }); join.print(); env.execute(); } }
其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。
8.2 间隔联结(Interval Join)
- 间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。
stream1 .keyBy(<KeySelector>) .intervalJoin(stream2.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(left + "," + right); } });
- 处理迟到数据
//2. 调用 interval join OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT)); OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT)); SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2) .between(Time.seconds(-2), Time.seconds(2)) .sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据,放入侧输出流 .sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流 .process( new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() { /** * 两条流的数据匹配上,才会调用这个方法 * @param left ks1的数据 * @param right ks2的数据 * @param ctx 上下文 * @param out 采集器 * @throws Exception */ @Override public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception { // 进入这个方法,是关联上的数据 out.collect(left + "<------>" + right); } }); process.print("主流"); process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据"); process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据"); env.execute();