十二、多流转换
多流转换可以分为“分流”和“合流”两大类。目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用union、connect、join以及coGroup等接口进行连接合并操作。
12.1 分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream,如图所示。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
12.1.1 简单实现
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。例如,我们可以将电商网站收集到的用户行为数据进行一个拆分,根据类型(type)的不同,分为“Mary”的浏览数据、“Bob”的浏览数据等等。那么代码就可以这样实现:
public class SplitStreamByFilter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Event> stream = env .addSource(new ClickSource()); // 筛选Mary的浏览行为放入MaryStream流中 DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.user.equals("Mary"); } }); // 筛选Bob的购买行为放入BobStream流中 DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.user.equals("Bob"); } }); // 筛选其他人的浏览行为放入elseStream流中 DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return !value.user.equals("Mary") && !value.user.equals("Bob") ; } }); MaryStream.print("Mary pv"); BobStream.print("Bob pv"); elseStream.print("else pv"); env.execute(); } // 这里重新实现了ClickSource(),因为我们在自定义数据源中发送了水位线 public static class ClickSource implements SourceFunction<Event> { private boolean running = true; @Override public void run(SourceContext<Event> sourceContext) throws Exception { Random random = new Random(); String[] userArr = {"Mary", "Bob", "Alice"}; String[] urlArr = {"./home", "./cart", "./prod?id=1"}; while (running) { long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳 String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs); // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段 sourceContext.collectWithTimestamp(event, event.timestamp); // 发送水位线 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?在早期的版本中,DataStream API中提供了一个.split()方法,专门用来将一条流“切分”成多个。它的基本思路其实就是按照给定的筛选条件,给数据分类“盖戳”;然后基于这条盖戳之后的流,分别拣选想要的“戳”就可以得到拆分后的流。这样我们就不必再对流进行复制了。不过这种方法有一个缺陷:因为只是“盖戳”拣选,所以无法对数据进行转换,分流后的数据类型必须跟原始流保持一致。这就极大地限制了分流操作的应用场景。现在split方法已经淘汰掉了,我们以后分流只使用下面要讲的侧输出流。
12.1.2 使用侧输出流
在Flink 1.13版本中,已经弃用了.split()方法,取而代之的是直接用处理函数(process function)的侧输出流(side output)。关于处理函数中侧输出流的用法,我们已经在7.5节做了详细介绍。简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),它就相当于split()分流时的“戳”,指定了侧输出流的id和类型。
我们可以使用侧输出流将上一小节的分流代码改写如下:
public class SplitStreamByOutputTag { // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp) private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){}; private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Event> stream = env .addSource(new ClickSource()); SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() { @Override public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception { if (value.user.equals("Mary")){ ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp)); } else if (value.user.equals("Bob")){ ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp)); } else { out.collect(value); } } }); processedStream.getSideOutput(MaryTag).print("Mary pv"); processedStream.getSideOutput(BobTag).print("Bob pv"); processedStream.print("else"); env.execute(); } // 这里重新实现了ClickSource(),因为我们在自定义数据源中发送了水位线 public static class ClickSource implements SourceFunction<Event> { private boolean running = true; @Override public void run(SourceContext<Event> sourceContext) throws Exception { Random random = new Random(); String[] userArr = {"Mary", "Bob", "Alice"}; String[] urlArr = {"./home", "./cart", "./prod?id=1"}; while (running) { long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳 String username = userArr[random.nextInt(userArr.length)]; String url = urlArr[random.nextInt(urlArr.length)]; Event event = new Event(username, url, currTs); // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段 sourceContext.collectWithTimestamp(event, event.timestamp); // 发送水位线 sourceContext.emitWatermark(new Watermark(event.timestamp - 1L)); Thread.sleep(1000L); } } @Override public void cancel() { running = false; } } }
12.2 基本合流操作
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。
12.2.1 联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union),如图所示。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:
stream1.union(stream2, stream3, ...)
注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。
这里需要考虑一个问题。在事件时间语义下,水位线是时间的进度标志;不同的流中可能水位线的进展快慢完全不同,如果它们合并在一起,水位线又该以哪个为准呢?还以要考虑水位线的本质含义,是“之前的所有数据已经到齐了”;所以对于合流之后的水位线,也是要以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据。换句话说,多流合并时处理的时效性是以最慢的那个流为准的。我们自然可以想到,这与之前介绍的并行任务水位线传递的规则是完全一致的;多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程。
我们可以用下面的代码做一个简单测试:
public class UnionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env .socketTextStream("localhost", 8888) .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] arr = value.split(" "); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); } }) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) { return element.f1; } }) ); SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env .socketTextStream("localhost", 9999) .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { String[] arr = value.split(" "); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); } }) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) { return element.f1; } }) ); // 合并两条流进行处理 stream1 .union(stream2) .process(new ProcessFunction<Tuple2<String, Long>, String>() { @Override public void processElement(Tuple2<String, Long> in, Context context, Collector<String> collector) throws Exception { collector.collect("当前水位线是:" + context.timerService().currentWatermark()); } }) .print(); env.execute(); } }
这里为了更清晰地看到水位线的进展,我们创建了两条流来读取socket文本数据,并从数据中提取时间戳作为生成水位线的依据。用union将两条流合并后,用一个ProcessFunction来进行处理,获取当前的水位线进行输出。我们会发现两条流中每输入一个数据,合并之后的流中都会有数据出现;而水位线只有在两条流中水位线最小值增大的时候,才会真正向前推进。
12.2.2 连接(Connect)
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。
1. 连接流(ConnectedStreams)为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的并不是DataStream,而是一个“连接流”(ConnectedStreams)。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中,如图所示。
在代码实现上,需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。
public class CoMapExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Integer> stream1 = env.fromElements(1,2,3); DataStream<Long> stream2 = env.fromElements(1L,2L,3L); ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2); SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() { @Override public String map1(Integer value) { return "Integer: " + value; } @Override public String map2(Long value) { return "Long: " + value; } }); result.print(); env.execute(); } }
上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。
值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2); 这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。
2. CoProcessFunction对于连接流ConnectedStreams的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)。与CoMapFunction类似,如果是调用.flatMap()就需要传入一个CoFlatMapFunction,需要实现flatMap1()、flatMap2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。
CoProcessFunction也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。CoProcessFunction同样可以通过上下文ctx来访问timestamp、水位线,并通过TimerService注册定时器;另外也提供了.onTimer()方法,用于定义定时触发的处理操作。
下面是CoProcessFunction的一个具体示例:我们可以实现一个实时对账的需求,也就是app的支付操作和第三方的支付操作的一个双流Join。App的支付事件和第三方的支付事件将会互相等待5秒钟,如果等不来对应的支付事件,那么就输出报警信息。程序如下:
// 实时对账 public class BillCheckExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env .fromElements( Tuple3.of("order-1", "app", 1000L), Tuple3.of("order-2", "app", 2000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() { @Override public long extractTimestamp(Tuple3<String, String, Long> in, long l) { return in.f2; } }) ); SingleOutputStreamOperator<Tuple3<String, String, Long>> thirdPartyStream = env .fromElements( Tuple3.of("order-1", "third-party", 3000L), Tuple3.of("order-3", "third-party", 4000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() { @Override public long extractTimestamp(Tuple3<String, String, Long> in, long l) { return in.f2; } }) ); appStream.keyBy(r -> r.f0).connect(thirdPartyStream.keyBy(r -> r.f0)).process(new Match()).print(); env.execute(); } public static class Match extends CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String> { // 声明状态变量用来保存app的支付事件 private ValueState<Tuple3<String, String, Long>> appEvent; // 声明状态变量用来保存第三方平台的到账事件 private ValueState<Tuple3<String, String, Long>> thirdPartyEvent; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); appEvent = getRuntimeContext().getState( new ValueStateDescriptor<Tuple3<String, String, Long>>("app", Types.TUPLE( Types.STRING, Types.STRING, Types.LONG )) ); thirdPartyEvent = getRuntimeContext().getState( new ValueStateDescriptor<Tuple3<String, String, Long>>("third-party", Types.TUPLE( Types.STRING, Types.STRING, Types.LONG )) ); } @Override public void processElement1(Tuple3<String, String, Long> app, Context context, Collector<String> collector) throws Exception { if (thirdPartyEvent.value() != null) { collector.collect(app.f0 + " 对账成功"); thirdPartyEvent.clear(); } else { appEvent.update(app); context.timerService().registerEventTimeTimer(app.f2 + 5000L); } } @Override public void processElement2(Tuple3<String, String, Long> thirdParty, Context context, Collector<String> collector) throws Exception { if (appEvent.value() != null) { collector.collect(thirdParty.f0 + " 对账成功"); appEvent.clear(); } else { thirdPartyEvent.update(thirdParty); context.timerService().registerEventTimeTimer(thirdParty.f2 + 5000L); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { super.onTimer(timestamp, ctx, out); if (appEvent.value() != null) { out.collect(appEvent.value().f0 + "对账失败,订单的第三方支付信息未到"); appEvent.clear(); } if (thirdPartyEvent.value() != null) { out.collect(thirdPartyEvent.value().f0 + "对账失败,订单的app支付信息未到"); thirdPartyEvent.clear(); } } } }
在程序中,我们声明了两个值状态(ValueState)分别用来保存App的支付信息和第三方的支付信息。App的支付信息到达以后,会检查对应的第三方支付信息是否已经先到达(先到达会保存在对应的状态变量中),如果已经到达了,那么对账成功,直接输出对账成功的信息,并将保存第三方支付消息的状态变量清空。如果App对应的第三方支付信息没有到来,那么我们会注册一个5秒钟之后的定时器,也就是说等待第三方支付事件5秒钟。当定时器触发时,检查保存app支付信息的状态变量是否还在,如果还在,说明对应的第三方支付信息没有到来,所以输出报警信息。
12.3 基于时间的合流——双流联结(Join)
可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义状态和TimerService灵活实现各种需求,其实已经能够处理双流join的大多数场景。
不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。
12.3.1 窗口联结(Window Join)
Flink为基于一段时间的双流合并专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
1. 窗口联结的调用窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:
stream1.join(stream2) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)
上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。
这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。
而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。
传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join ... on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。
2. 窗口联结实例在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流,按照用户ID进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如1小时)来统计的,那我们就可以使用窗口join来实现这样的需求。
下面是一段示例代码:
// 基于窗口的join public class WindowJoinExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple2<String, Long>> stream1 = env .fromElements( Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } } ) ); DataStream<Tuple2<String, Long>> stream2 = env .fromElements( Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple2<String, Long>>forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner<Tuple2<String, Long>>() { @Override public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) { return stringLongTuple2.f1; } } ) ); stream1 .join(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() { @Override public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception { return left + "=>" + right; } }) .print(); env.execute(); } }
12.3.2 间隔联结(Interval Join)
在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。
为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。
1. 间隔联结的原理间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。如图所示,我们可以清楚地看到间隔联结的方式:
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。
2. 间隔联结的调用间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。
通用调用形式如下:
stream1 .keyBy(<KeySelector>) .intervalJoin(stream2.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(left + "," + right); } });
可以看到,抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据,right则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。
3. 间隔联结实例在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
下面是一段示例代码:
// 基于间隔的join public class IntervalJoinExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env .fromElements( Tuple3.of("Mary", "order", 20 * 60 * 1000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() { @Override public long extractTimestamp(Tuple3<String, String, Long> e, long l) { return e.f2; } }) ); SingleOutputStreamOperator<Tuple3<String, String, Long>> pvStream = env .fromElements( Tuple3.of("Mary", "pv", 15 * 60 * 1000L), Tuple3.of("Mary", "pv", 11 * 60 * 1000L), Tuple3.of("Mary", "pv", 9 * 60 * 1000L), Tuple3.of("Mary", "pv", 21 * 60 * 1000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() { @Override public long extractTimestamp(Tuple3<String, String, Long> e, long l) { return e.f2; } }) ); // a -> b // (lower, higher) orderStream .keyBy(r -> r.f0) .intervalJoin(pvStream.keyBy(r -> r.f0)) .between(Time.minutes(-10), Time.minutes(5)) .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() { @Override public void processElement(Tuple3<String, String, Long> left, Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception { collector.collect(left + "=>" + right); } }) .print(); // b -> a // (-higher, -lower) pvStream .keyBy(r -> r.f0) .intervalJoin(orderStream.keyBy(r -> r.f0)) .between(Time.minutes(-5), Time.minutes(10)) .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() { @Override public void processElement(Tuple3<String, String, Long> left, Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception { collector.collect(right + "->" + left); } }) .print(); env.execute(); } }