Flink1.7.2 Source、Window数据交互源码分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Source如何按partition发射数据到对应的Window Window 如何处理对应的Source发过来的对应的partition数据 理解Flink 数据从Source到Window,上下游数据交换的过程

Flink1.7.2 Source、Window数据交互源码分析

源码

概述

  • Source如何按partition发射数据到对应的Window
  • Window 如何处理对应的Source发过来的对应的partition数据
  • 理解Flink 数据从Source到Window,上下游数据交换的过程

StreamGraph 图

输入数据

1 2 3 4 5 6 7 8 9 10

WordCount程序

package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {



  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      /**
        * 每20秒刷新一次,相当于重新开始计数,
        * 好处,不需要一直拿所有的数据统计
        * 只需要在指定时间间隔内的增量数据,减少了数据规模
        */
      .timeWindow(Time.seconds(5))
      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)


      .sum("count" )

    textResult
      .setParallelism(3)
      .print()




    if(args == null || args.size ==0){


      println("==================================以下为执行计划==================================")
      println("执行地址(firefox效果更好):https://flink.apache.org/visualizer")
      //执行计划
      println(env.getExecutionPlan)
      println("==================================以上为执行计划 JSON串==================================\n")
      //StreamGraph
     //println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

      env.execute("默认作业")

    }else{
      env.execute(args(0))
    }

    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long){
    //override def toString: String = Thread.currentThread().getName + word + " : " + count
  }


  def getConfiguration(isDebug:Boolean = false):Configuration = {

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }


}

源码分析(Source)

RecordWriter.emit

  • record 数据为: WordWithCount(1,1)
  • 也就是Source.通过Socket.connect 得到流数据后,按行拆分,进行flatMap,map函数之后,得到的数据
  • numChannels为Source partitions,这个并行度是在DataStream.setParallelism(3)设置的
  • channelSelector.selectChannels(record, numChannels),这个是按key,进行hash,算得当前元素所分的partition,即为change
  • 续续调用 RecordWriter.emit()

    public void emit(T record) throws IOException, InterruptedException {
        emit(record, channelSelector.selectChannels(record, numChannels));
    }

RecordWriter.emit

  • 经调试,按key,hash % 并行度,分配的数据如下

    key:2     partition:0
    key:3     partition:0
    key:4     partition:0
    key:6     partition:0
    
    key:1     partition:1
    key:5     partition:1
    key:8     partition:1
    key:10    partition:1
    
    key:7     partition:2
    key:9     partition:2
  • record进行序列化,数据长度写进ByteBuffer lengthBuffer,数据写进ByteBuffer dataBuffer;

    serializer.serializeRecord(record);
  • 调用RecordWriter.copyFromSerializerToTargetChannel(channel)往通道中写数据
    private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
        serializer.serializeRecord(record);

        boolean pruneAfterCopying = false;
        for (int channel : targetChannels) {
            if (copyFromSerializerToTargetChannel(channel)) {
                pruneAfterCopying = true;
            }
        }

        // Make sure we don't hold onto the large intermediate serialization buffer for too long
        if (pruneAfterCopying) {
            serializer.prune();
        }
    }

RecordWriter.copyFromSerializerToTargetChannel

  • getBufferBuilder(targetChannel)通过channel,得到BufferBuilder,就是得到当前的partition写入数据对象BufferBuilder,其实就是操作ResultPartition.subPartitions

    BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
  • 每个partition写入的数据,对应到各自的window,这样就实现了,在source端将数据分区,对应的window处理source对应的分区数据

    subpartitions = {ResultSubpartition[3]@5749} 
    0 = {PipelinedSubpartition@5771} "PipelinedSubpartition#0 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? true]"
    1 = {PipelinedSubpartition@5772} "PipelinedSubpartition#1 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? true]"
    2 = {PipelinedSubpartition@5773} "PipelinedSubpartition#2 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? true]"
  • BufferConsumer是消费BufferBuilder中的数据,BufferBuilder.append 是产生数据
/**
     * @param targetChannel
     * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
     */
    private boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
        // We should reset the initial position of the intermediate serialization buffer before
        // copying, so the serialization results can be copied to multiple target buffers.
        serializer.reset();

        boolean pruneTriggered = false;
        BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
        SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
        while (result.isFullBuffer()) {
            numBytesOut.inc(bufferBuilder.finish());
            numBuffersOut.inc();

            // If this was a full record, we are done. Not breaking out of the loop at this point
            // will lead to another buffer request before breaking out (that would not be a
            // problem per se, but it can lead to stalls in the pipeline).
            if (result.isFullRecord()) {
                pruneTriggered = true;
                bufferBuilders[targetChannel] = Optional.empty();
                break;
            }

            bufferBuilder = requestNewBufferBuilder(targetChannel);
            result = serializer.copyToBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
        return pruneTriggered;
    }

rdWrecoiter.flushAll

  • 把bufferBuilders存储着BufferBuilder 数组,数组个数,对应着并行度的个数,每个BufferBuilder对应着ResultSubpartition.subpartitions的partition的PipelinedSubpartition,会对这些PipelinedSubpartition进行Flush,并给对应的Window进行处理
  • 调用PipelinedSubpartition.flushAll()
    public void flushAll() {
        targetPartition.flushAll();
    }

PipelinedSubpartition.flushAll()

  • 调用PipelinedSubpartition.notifyDataAvailable
  • 调用PipelinedSubpartitionView.notifyDataAvailable
public void flush() {
        final boolean notifyDataAvailable;
        synchronized (buffers) {
            if (buffers.isEmpty()) {
                return;
            }
            // if there is more then 1 buffer, we already notified the reader
            // (at the latest when adding the second buffer)
            notifyDataAvailable = !flushRequested && buffers.size() == 1;
            flushRequested = true;
        }
        if (notifyDataAvailable) {
            notifyDataAvailable();
        }
    }

PipelinedSubpartitionView.notifyDataAvailable

  • 调用LocalInputChannel.notifyDataAvailable
    public void notifyDataAvailable() {
        availabilityListener.notifyDataAvailable();
    }

LocalInputChannel.notifyDataAvailable

  • 调用InputChannel.notifyChannelNonEmpty()
  • 调用SingleInputGate.notifyChannelNonEmpty
    public void notifyDataAvailable() {
        notifyChannelNonEmpty();
    }

SingleInputGate.queueChannel

  • 通知Window,有数据产生了,可以开始消费了(处理数据)

      inputChannelsWithData.notifyAll();
private void queueChannel(InputChannel channel) {
        int availableChannels;

        synchronized (inputChannelsWithData) {
            if (enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                return;
            }
            availableChannels = inputChannelsWithData.size();

            inputChannelsWithData.add(channel);
            enqueuedInputChannelsWithData.set(channel.getChannelIndex());

            if (availableChannels == 0) {
                inputChannelsWithData.notifyAll();
            }
        }

        if (availableChannels == 0) {
            InputGateListener listener = inputGateListener;
            if (listener != null) {
                listener.notifyInputGateNonEmpty(this);
            }
        }
    }

源码分析(Window)

OneInputStreamTask.run()

  • 调用StreamInputProcessor.processInput()进行处理,这个函数,会进行阻塞,如果没有读到数据的话
  • 注意,这是Window的处理线程,有多少个并行度,就会开多少个Window,就有多少个这个方法在一直处理Source不断发过来的数据
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }

StreamInputProcessor.processInput()

  • 这是一个阻塞的方法,读取Source中对应的partition中的数据,调用BarrierTracker.getNextNonBlocked()

    final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }

BarrierTracker.getNextNonBlocked()

  • 调用inputGate.getNextBufferOrEvent();调到的是SingleInputGate.getNextBufferOrEvent()去读取Source发过来到当前Window中的数据
    public BufferOrEvent getNextNonBlocked() throws Exception {
        while (true) {
            Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
            if (!next.isPresent()) {
                // buffer or input exhausted
                return null;
            }

            BufferOrEvent bufferOrEvent = next.get();
            if (bufferOrEvent.isBuffer()) {
                return bufferOrEvent;
            }
            else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
            }
            else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
            }
            else {
                // some other event
                return bufferOrEvent;
            }
        }
    }

SingleInputGate.getNextBufferOrEvent

  • 这个才真正读Source发过来对应的partition,对应当前的Window中的数据,currentChannel.getNextBuffer();
  • inputChannelsWithData 这个对象进行线程阻塞,通知,就是一个开关,Source发过来的数据处理完了就关了,Source有新的数据发过来,就打开了,开关打开时,就可以进行Window数据处理了
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        if (hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }

        if (isReleased) {
            throw new IllegalStateException("Released");
        }

        requestPartitions();

        InputChannel currentChannel;
        boolean moreAvailable;
        Optional<BufferAndAvailability> result = Optional.empty();

        do {
            synchronized (inputChannelsWithData) {
                while (inputChannelsWithData.size() == 0) {
                    if (isReleased) {
                        throw new IllegalStateException("Released");
                    }

                    if (blocking) {
                        inputChannelsWithData.wait();
                    }
                    else {
                        return Optional.empty();
                    }
                }

                currentChannel = inputChannelsWithData.remove();
                enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
                moreAvailable = !inputChannelsWithData.isEmpty();
            }

            result = currentChannel.getNextBuffer();
        } while (!result.isPresent());

        // this channel was now removed from the non-empty channels queue
        // we re-add it in case it has more data, because in that case no "non-empty" notification
        // will come for that channel
        if (result.get().moreAvailable()) {
            queueChannel(currentChannel);
            moreAvailable = true;
        }

        final Buffer buffer = result.get().buffer();
        if (buffer.isBuffer()) {
            return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
        }
        else {
            final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());

            if (event.getClass() == EndOfPartitionEvent.class) {
                channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());

                if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
                    // Because of race condition between:
                    // 1. releasing inputChannelsWithData lock in this method and reaching this place
                    // 2. empty data notification that re-enqueues a channel
                    // we can end up with moreAvailable flag set to true, while we expect no more data.
                    checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent());
                    moreAvailable = false;
                    hasReceivedAllEndOfPartitionEvents = true;
                }

                currentChannel.notifySubpartitionConsumed();

                currentChannel.releaseAllResources();
            }

            return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
        }
    }

输出数据

  • 注意,打印输出的数据,是有规则的,按partition打印的,而且,parition打印的顺序,是在window时按key去重,此时没有排序,然后发送给sink进行打印输出
--------------------------
partition:0
WordWithCount(2,1)
WordWithCount(6,1)
WordWithCount(4,1)
WordWithCount(3,1)

partition:1
WordWithCount(1,1)
WordWithCount(10,1)
WordWithCount(8,1)
WordWithCount(5,1)

partition:2
WordWithCount(7,1)
WordWithCount(9,1)


``

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
11天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
31 2
|
20天前
|
消息中间件 Kafka 数据处理
实时计算 Flink版产品使用问题之如何处理数据并记录每条数据的变更
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
20天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之同步时,上游批量删除大量数据(如20万条),如何提高删除效率
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
20天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
688 7
阿里云实时计算Flink在多行业的应用和实践
|
30天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
23天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
30天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。