开发者社区> 问答> 正文

当我使用Flink SlidingEventTimeWindows时,“缓冲池被破坏”

"当我使用“SlidingEventTimeWindows”时,Flink抛出“java.lang.IllegalStateException:Buffer pool is destroyed”,但是当我改为“SlidingProcessingTimeWindows”时,每件事情都没问题。
我的DataMockSource在这里:

堆栈跟踪如下:
18:37:53,728 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker.

        java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)
at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.

at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:230)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)
... 10 more

首先,在My DataMockSource中将“collect”替换为“collectWithTimestamp”,用于生成流数据。执行此操作后,“在发出延迟标记时出错”将在控制台中消失。

其次,用AscendingTimestampExtractor替换BoundedOutOfOrdernessTimestampExtractor,它用于EventTime处理。在我的DataMockSource中,我生成数据并同时发出,因此AscendingTimestampExtractor是生成水印的正确方法。

我在这里发布主要代码,并在github上发布完整项目。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(10000); //

DataStreamSource mockDataDataStreamSource = env.addSource(new DataMockSource());
mockDataDataStreamSource.assignTimestampsAndWatermarks(

new AscendingTimestampExtractor<MockData>() {
  @Override
  public long extractAscendingTimestamp(MockData element) {
    return element.getTimestamp();
  }
});

SingleOutputStreamOperator> countStream = mockDataDataStreamSource

.keyBy(""country"").window(
    SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))

// .allowedLateness(Time.seconds(5))

.process(

    new FlinkEventTimeCountFunction()).name(""count elements"");

countStream.addSink(new SinkFunction>() {
@Override
public void invoke(Tuple2 value, Context context) throws Exception {

System.out.println(value);

}
});

env.execute(""count test "");
我的DataMockSource在这里:

private volatile boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {

while (running){
  MockData mockData = new MockData();
  mockData.setAge(ThreadLocalRandom.current().nextInt(1,99));
  mockData.setCountry(""country ""+ThreadLocalRandom.current().nextInt(2,5));
  mockData.setId(ThreadLocalRandom.current().nextLong());
  mockData.setTimestamp(Instant.now().toEpochMilli());
  // emit record with timestamp
  sourceContext.collectWithTimestamp(mockData,Instant.now().toEpochMilli());

// sourceContext.collect(mockData);

  TimeUnit.SECONDS.sleep(3);
}

}

@Override
public void cancel() {

 running = false;

}"

展开
收起
flink小助手 2018-11-28 15:47:28 7770 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "在事件时间工作时,你需要在源中或使用assignTimestampsAndWatermarks安排时间戳提取和水印。看起来你没有这样做,这可以解释为什么你不会得到任何输出(事件时间窗口永远不会被触发)。

    此外,你的来源应该有取消方法。像这样的东西:

    private volatile boolean running = true;

    @Override
    public void run(SourceContext ctx) throws Exception {

    while (running) {
        ...
    }

    }

    @Override
    public void cancel() {

    running = false;

    }
    我想这可能解释了你所看到的异常。在作业开始关闭之后,源可能会继续运行并发送延迟标记。"

    2019-07-17 23:16:47
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink 流式应用中状态的数据结构定义升级 立即下载
Flink中的两类新型状态存储 立即下载
HBase2.0重新定义小对象实时存取 立即下载