一.问题分析
原始程序使用 EventTime,JobGraph 为 Source + KeyBy + ProcessFunction + Window + Sink 形式,其中 ProcessFunction 内设置了 ValueState 与 onTimer 的机制,由于需要定时更新一些任务需要的实时变量,故引入 BroadcastStream 实现实时变量的不定时更新,经过修改后的 JobGraph 为 Source + keyd-Broadcast-Process + window + Sink,如图所示:
编辑
任务执行后发现第三步 Co-Process-Broadcast-keyed Stage 部分 Records-Received 正常,但是 Bytes Sent 消失了,即下游没有 (onTimer) 的数据写入到 window ,导致所有数据卡在第三步,window 无法触发:
编辑
再看下游算子发现 WaterMark 均为空:
编辑
这时才想起来,自己的任务设定为 EventTime,原始 Source 设置了时间戳和 watermark 但新增的 BroadcastStream 未设置时间戳和 watermark 遂导致另外一个 Source 不更新 watermark,导致整个任务的 watermark 无法推进,从而导致任务卡死。所以解决方法就是给 BroadcastStream 实现 assignTimestampsAndWatermarks 方法设置时间戳和 watermark 即可。
二.问题修复
broadcastStream 主题框架如下,如果需要完整的示例可以参考: DataStream Broadcast 示例详解
DataStream<T> output = dataStream .connect(BroadcastStream) .process( // KeyedBroadcastProcessFunction 中的类型参数表示: // 1. key stream 中的 key 类型 // 2. 非广播流中的元素类型 // 3. 广播流中的元素类型 // 4. 结果的类型,在这里是 string new KeyedBroadcastProcessFunction<Ks, In1, In2, Out>() { // 模式匹配逻辑 } );
下面详细说下 BroadcastStream,任务失败之前我的 BroadcastStream 是这样定义的:
MapStateDescriptor<String, T> descriptor = new MapStateDescriptor<>("T", String.class, T.class); BroadcastStream<T> contextStream = env .addSource(new SelfDefinedSource()) .setParallelism(1) .broadcast(descriptor);
这里通过自定义的 SelfDefinedSource 函数定期读取更新的在线变量,并后续通过 ctx.collect() 生产到 BroadcastStream 中,由于未对该数据流设置 watermark 导致任务失败,下面继承 BoundedOutOfOrdernessTimestampExtractor 类为该数据流设置 watermark:
public static class ContextTSExtrator extends BoundedOutOfOrdernessTimestampExtractor<T> { public MultiSortContextTSExtrator() { super(Time.seconds(MAX_SEND_EVENT_DELAY)); } @Override public long extractTimestamp(T t) { return System.currentTimeMillis(); } }
这里直接设置为 System.currentTimeMillis,如果需要从 T 中获取时间戳也可以生成 T 时绑定其 EventTimeStamp,随后为源数据通过 assign 方法增加时间戳:
BroadcastStream<T> contextStream = env .addSource(new SelfDefinedSource()) .assignTimestampsAndWatermarks(newContextTSExtrator()) .setParallelism(1) .broadcast(descriptor);
增加完时间戳后,任务正常执行,可以看到流程图上 keyd-Broadcast-Process 部分和 window 部分也都有了实时更新的 watermark,任务正常进行:
编辑
三.优化
processFunction 中处理后的数据设置时间=t 的过期时间,即 eventTime + t 为 onTimer 的时机,BroadCastStream 则时每隔 X 分钟读取线上最新变量并更新最后通过 context.collect(T) 发出,X 分钟通过 Times.sleep() 实现。理想情况每隔数据的间隔为 t ,但根据 stage 和日志中的日志发现每隔元素处理到 sink 的间隔为 t + X,即延时处理的时间与定时读取实时变量的间隔一致。
上述新增的 waterMark 为 System.currentimeMilles,但是因为每隔 X 分钟执行一次,所以 broadcastStream 发出的 waterMark 总是落后的,基于这个事实继续排查问题。
1.单流 Watermark 机制
首先看一下每个数据源的 watermark 更新机制,这个可以在 AssignerWithPeriodicWatermarks 类中看到,默认不需要我们复写:
public final Watermark getCurrentWatermark() { long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness; if (potentialWM >= this.lastEmittedWatermark) { this.lastEmittedWatermark = potentialWM; } return new Watermark(this.lastEmittedWatermark);
将当前元素的时间戳减去最大的迟到容忍时间获取 poetntialWaterMark,如果大于上一次发出的 waterMark,则重新赋值,即取 currentWaterMark = Max(poentitialWaterMark,lastEmittedWaterMark),该值代表 Flink 认为小于该 currentWaterMark 值的数据都到了。
2.多流 WaterMark 机制
co-broadcast-keyed-stream 属于双流任务,每个流都包含一个 waterMark,两个流同时发出 waterMar,Flink 底层通过继承 TwoInputStreamOperator 接口完成对双流 element 和 waterMark 的处理:
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> { void processElement1(StreamRecord<IN1> var1) throws Exception; void processElement2(StreamRecord<IN2> var1) throws Exception; void processWatermark1(Watermark var1) throws Exception; void processWatermark2(Watermark var1) throws Exception; void processLatencyMarker1(LatencyMarker var1) throws Exception; void processLatencyMarker2(LatencyMarker var1) throws Exception; }
基于我们上面遇到的延时问题,这里需要重点关注 processWatermark 的逻辑,processWatermark 的逻辑放在了 AbstractStreamOperator 抽象类下:
public void processWatermark(Watermark mark) throws Exception { if (this.timeServiceManager != null) { this.timeServiceManager.advanceWatermark(mark); } this.output.emitWatermark(mark); } public void processWatermark1(Watermark mark) throws Exception { this.input1Watermark = mark.getTimestamp(); long newMin = Math.min(this.input1Watermark, this.input2Watermark); if (newMin > this.combinedWatermark) { this.combinedWatermark = newMin; this.processWatermark(new Watermark(this.combinedWatermark)); } } public void processWatermark2(Watermark mark) throws Exception { this.input2Watermark = mark.getTimestamp(); long newMin = Math.min(this.input1Watermark, this.input2Watermark); if (newMin > this.combinedWatermark) { this.combinedWatermark = newMin; this.processWatermark(new Watermark(this.combinedWatermark)); } }
针对 Stream1 和 Stream2 分别执行 processWatermark1 和 processWatermark2,其内部通过 Min 方法选取两个流中最小的一个作为新的最小值 WaterMark,然后再和 lastEmittedWatermark 进行比较再推进整个任务的 watermark 流动。所以针对上述等待时间 X 过长的问题,我们需要提高 BroadcastStream 的 watermark 生产效率,即缩短产生 T 的间隔。
3.缩短 Watermark 发送间隔
将间隔 X 缩短至非常小的间隔 x,需要给生成的 T 增加一个 valid 属性,给 Source 函数增加一个 epoch 控制,只有当 epoch * x = X 的时候,读取线上更新并发送一个 valid = true 的 T,其余时间发送 valid = false 的 T(null),随后在 ProcessBroadcastValue 时判断 valid 状态,只有为 true 时才更新 Broadcast 的 value 到各个 task。
override def run(sourceContext: SourceFunction.SourceContext[T]): Unit = { Client client = new Socket(host, port); while (isRunning) { val t = if (epoch % X == 0) { ... 读取线上变量 re.setValid() // 生效 re } else { val re = new T(null) re } sourceContext.collect(t) TimeUnit.SECONDS.sleep(x) epoch += 1 } }
例如之前以 X=300s 的间隔生成 T,将间隔缩短为 x=5s,则推算出 epoch = 300/5 = 60,所以 if 逻辑内为 epoch % 60 == 0,其余时间设计以此类推。
4.更优的方案
上述方案把间隔从 X 优化至 x,但是任务执行的数据源其实是另一个 DataStream,BroadcastStream 只负责定时更新 DataStream 需要的变量,所以最好的方案是任务 watermark 流动完全取决于 DataStream。这时再回看双流 processWatermark 函数:
public void processWatermarkX(Watermark mark) throws Exception { this.input2Watermark = mark.getTimestamp(); long newMin = Math.min(this.input1Watermark, this.input2Watermark); if (newMin > this.combinedWatermark) { this.combinedWatermark = newMin; this.processWatermark(new Watermark(this.combinedWatermark)); } }
newMin 是通过取双流最小,然后再以最小的为基准,退化为单流更新 WaterMark 更新逻辑,只取决于 DataStream ,那我们把 Broadcast 的 waterMark 设定为 Max 不就完了,这样每次 newMin 的值都取决于 DataStream,从而保证 watrtMark 流动完全取决于 DataStream,这样最后很短的间隔 x 也没有了,任务执行的延迟完全为 onTimer 设置的 valueState 的过期时间。
修改 BroadcastStream 设定时间戳和 watermark 的函数:
直接设置为 Watermark 类自带的 MAX_WARTERMARK.getTimeStamp(),自己设置一个非常大的时间戳也可以。
public static class ContextTSExtrator extends BoundedOutOfOrdernessTimestampExtractor<T> { public MultiSortContextTSExtrator() { super(Time.seconds(MAX_SEND_EVENT_DELAY)); } @Override public long extractTimestamp(T t) { return Watermark.MAX_WATERMARK.getTimestamp(); } }
其值为:
static final Watermark MAX_WATERMARK = new Watermark(9223372036854775807L);
四.总结
通过增加和修改 watermark 逻辑,增加 broadcastStream 的双流任务终于正常执行,且数据流动完全取决于原始数据流,非常的完美。出现这样的问题还是对 Flink 的 watermark 机制不是很清晰,后续还需要继续加深对 watermark 的理解。