Flink – metrics V1.2

简介:

WebRuntimeMonitor

 

.GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher)))
.GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagerMetricsHandler(metricFetcher)))
.GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher)))
 

JobVertexMetricsHandler

 

AbstractMetricsHandler

 

MetricFetcher

核心就是fetchMetrics函数,会从JobManager获取数据,

复制代码
private void fetchMetrics() {
    try {
        Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
        if (jobManagerGatewayAndWebPort.isDefined()) {
            ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); //得到JobManager的ActorGateway

            /**
             * Remove all metrics that belong to a job that is not running and no longer archived.
             */
            Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout); //生成request获取job状态
            jobDetailsFuture
                .onSuccess(new OnSuccess<Object>() {
                    @Override
                    public void onSuccess(Object result) throws Throwable {
                        MultipleJobsDetails details = (MultipleJobsDetails) result;
                        ArrayList<String> toRetain = new ArrayList<>();
                        for (JobDetails job : details.getRunningJobs()) {
                            toRetain.add(job.getJobId().toString());
                        }
                        for (JobDetails job : details.getFinishedJobs()) {
                            toRetain.add(job.getJobId().toString());
                        }
                        synchronized (metrics) {
                            metrics.jobs.keySet().retainAll(toRetain); //只保留Runing和Finished的job,即不正常的都删掉
                        }
                    }
                }, ctx);
            logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed.");

            String jobManagerPath = jobManager.path();
            String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
            ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath);

            queryMetrics(jobManagerQueryService); //查询jobManager的Metrics

            /**
             * We first request the list of all registered task managers from the job manager, and then
             * request the respective metric dump from each task manager.
             *
             * All stored metrics that do not belong to a registered task manager will be removed.
             */
            Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); //查询所有taskManager
            registeredTaskManagersFuture
                .onSuccess(new OnSuccess<Object>() {
                    @Override
                    public void onSuccess(Object result) throws Throwable {
                        Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable();
                        List<String> activeTaskManagers = new ArrayList<>();
                        for (Instance taskManager : taskManagers) { //遍历taskManager
                            activeTaskManagers.add(taskManager.getId().toString());

                            String taskManagerPath = taskManager.getTaskManagerGateway().getAddress();
                            String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + taskManager.getTaskManagerID().getResourceIdString();
                            ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath);

                            queryMetrics(taskManagerQueryService); //查询每个taskMananger的metrics
                        }
                        synchronized (metrics) { // remove all metrics belonging to unregistered task managers
                            metrics.taskManagers.keySet().retainAll(activeTaskManagers); //删除所有的未注册的TaskManager
                        }
                    }
                }, ctx);
            logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed.");
        }
    } catch (Exception e) {
        LOG.warn("Exception while fetching metrics.", e);
    }
}
复制代码

 

queryMetrics

复制代码
/**
 * Requests a metric dump from the given actor.
 *
 * @param actor ActorRef to request the dump from
 */
private void queryMetrics(ActorRef actor) {
    Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout); //获取metrics dump
    metricQueryFuture
        .onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object result) throws Throwable {
                addMetrics(result);
            }
        }, ctx);
    logErrorOnFailure(metricQueryFuture, "Fetching metrics failed.");
}

private void addMetrics(Object result) throws IOException {
    byte[] data = (byte[]) result;
    List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
    for (MetricDump metric : dumpedMetrics) {
        metrics.add(metric); //把metrics dump加入metrics store
    }
}
复制代码

 

MetricStore

用嵌套的hashmap来存储metrics,瞬时值

final JobManagerMetricStore jobManager = new JobManagerMetricStore();
final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
final Map<String, JobMetricStore> jobs = new HashMap<>();

 

复制代码
public static class JobManagerMetricStore extends ComponentMetricStore {
}

private static abstract class ComponentMetricStore {
    public final Map<String, String> metrics = new HashMap<>(); //store就是一个map

    public String getMetric(String name, String defaultValue) {
        String value = this.metrics.get(name);
        return value != null
            ? value
            : defaultValue;
    }
}
复制代码

 

MetricQueryService

复制代码
public class MetricQueryService extends UntypedActor {
    private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class);

    public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService";

    private static final CharacterFilter FILTER = new CharacterFilter() {
        @Override
        public String filterCharacters(String input) {
            return replaceInvalidChars(input);
        }
    };

    private final MetricDumpSerializer serializer = new MetricDumpSerializer();

    private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>();
    private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>();
    private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>();
    private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>();
复制代码

 

收到CreateDump请求,

} else if (message instanceof CreateDump) {
    byte[] dump = serializer.serialize(counters, gauges, histograms, meters);
    getSender().tell(dump, getSelf());

 

Start

复制代码
   /**
     * Starts the MetricQueryService actor in the given actor system.
     *
     * @param actorSystem The actor system running the MetricQueryService
     * @param resourceID resource ID to disambiguate the actor name
     * @return actor reference to the MetricQueryService
     */
    public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) {
        String actorName = resourceID == null
            ? METRIC_QUERY_SERVICE_NAME
            : METRIC_QUERY_SERVICE_NAME + "_" + resourceID.getResourceIdString();
        return actorSystem.actorOf(Props.create(MetricQueryService.class), actorName);
    }
复制代码

 

在MetricRegistry中把metrics注册到QueryService中,

if (queryService != null) {
    MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
}

 

采集点

numRecordsIn

StreamInputProcessor –> processInput

复制代码
    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
        if (numRecordsIn == null) {
            numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        }
        //......
        
                        
        // now we can do the actual processing
        StreamRecord<IN> record = recordOrMark.asRecord();
        synchronized (lock) {
            numRecordsIn.inc(); //执行processElement前加一
            streamOperator.setKeyContextElement1(record);
            streamOperator.processElement(record);
        }
        return true;
复制代码

如果是chaining,

ChainingOutput

复制代码
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
    
    protected final OneInputStreamOperator<T, ?> operator;
    protected final Counter numRecordsIn;

    public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
        this.operator = operator;
        this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); //初始化
    }

    @Override
    public void collect(StreamRecord<T> record) {
        try {
            numRecordsIn.inc(); //对于chain,在output时调用processElement
            operator.setKeyContextElement1(record);
            operator.processElement(record);
        }
        catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
复制代码

 

numRecordsOut

在AbstractStreamOperator初始化时,

生成CountingOutput

复制代码
    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        this.container = containingTask;
        this.config = config;
        
        this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
        this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter()); //生成CountingOutput
复制代码

这个output,

在processWatermark,processElement中会用于emit数据

output.emitWatermark(mark);

 

复制代码
    public class CountingOutput implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            output.emitLatencyMarker(latencyMarker);
        }

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc(); //发出的时候,inc numRecordsOut
            output.collect(record);
        }

        @Override
        public void close() {
            output.close();
        }
    }
复制代码

 

numRecordInPerSecond,numRecordsOutPerSecond

在OperatorIOMetricGroup
复制代码
public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) {
        super(parentMetricGroup);
        numRecordsIn = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_IN);
        numRecordsOut = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT);
        numRecordsInRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
        numRecordsOutRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
    }
复制代码

可以看到numRecordsInRate和numRecordsOutRate,只是numRecordsIn和numRecordsOut的MeterView

复制代码
public class MeterView implements Meter, View {
    /** The underlying counter maintaining the count */
    private final Counter counter;
    /** The time-span over which the average is calculated */
    private final int timeSpanInSeconds;
    /** Circular array containing the history of values */
    private final long[] values;
    /** The index in the array for the current time */
    private int time = 0;
    /** The last rate we computed */
    private double currentRate = 0;

    public MeterView(Counter counter, int timeSpanInSeconds) {
        this.counter = counter;
        this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS); //timeSpanInSeconds需要是UPDATE_INTERVAL_SECONDS(5)的倍数,
        this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1]; //比如timeSpanInSeconds为60,那么就需要保存12个value
    }

    @Override
    public void markEvent() {
        this.counter.inc();
    }

    @Override
    public void markEvent(long n) {
        this.counter.inc(n);
    }

    @Override
    public long getCount() {
        return counter.getCount();
    }

    @Override
    public double getRate() { //获取平均值
        return currentRate;
    }

    @Override
    public void update() { //会被以UPDATE_INTERVAL_SECONDS为间隔调用
        time = (time + 1) % values.length;
        values[time] = counter.getCount();
        currentRate =  ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds); //values保存了timeSpanInSeconds时间段的counter的变化过程,所以用最新的减最老的,再除以timeSpanInSeconds
    }
}
复制代码

这个实现真是tricky,不好的设计

在MetricRegistry中,会创建

ViewUpdater
复制代码
    public void register(Metric metric, String metricName, AbstractMetricGroup group) {
        try {
            if (reporters != null) {
                for (int i = 0; i < reporters.size(); i++) {
                    MetricReporter reporter = reporters.get(i);
                    if (reporter != null) {
                        FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
                        reporter.notifyOfAddedMetric(metric, metricName, front);
                    }
                }
            }
            if (queryService != null) {
                MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
            }
            if (metric instanceof View) {
                if (viewUpdater == null) {
                    viewUpdater = new ViewUpdater(executor);
                }
                viewUpdater.notifyOfAddedView((View) metric);
            }
        } catch (Exception e) {
            LOG.error("Error while registering metric.", e);
        }
    }
复制代码

并且在register metrics的时候,除了注册到reporter,MetricQueryService

如果是view的子类还要,注册到ViewUpdater

    public ViewUpdater(ScheduledExecutorService executor) {
        executor.scheduleWithFixedDelay(new ViewUpdaterTask(lock, toAdd, toRemove), 5, UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
    }

ViewUpdater会定期执行ViewUpdaterTask,task中就会调用view的update

 

numBytesInLocal, numBytesInRemote

在RemoteInputChannel和LocalInputChannel中,

复制代码
    public LocalInputChannel(
        SingleInputGate inputGate,
        int channelIndex,
        ResultPartitionID partitionId,
        ResultPartitionManager partitionManager,
        TaskEventDispatcher taskEventDispatcher,
        int initialBackoff,
        int maxBackoff,
        TaskIOMetricGroup metrics) {

        super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter()); //metrics.getNumBytesInLocalCounter()
        
    public RemoteInputChannel(
        SingleInputGate inputGate,
        int channelIndex,
        ResultPartitionID partitionId,
        ConnectionID connectionId,
        ConnectionManager connectionManager,
        int initialBackOff,
        int maxBackoff,
        TaskIOMetricGroup metrics) {

        super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInRemoteCounter()); // metrics.getNumBytesInRemoteCounter()
复制代码

并且都会在

BufferAndAvailability getNextBuffer()

会调用,

numBytesIn.inc(next.getSize());

 

numBytesOut

RecordWriter
复制代码
public class RecordWriter<T extends IOReadableWritable> {
    private Counter numBytesOut = new SimpleCounter();
    
    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
            sendToTarget(record, targetChannel);
        }
    }
    
    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        synchronized (serializer) {
            SerializationResult result = serializer.addRecord(record);

            while (result.isFullBuffer()) {
                Buffer buffer = serializer.getCurrentBuffer();

                if (buffer != null) {
                    numBytesOut.inc(buffer.getSize()); //计数numBytesOut
                    writeAndClearBuffer(buffer, targetChannel, serializer);

                    // If this was a full record, we are done. Not breaking
                    // out of the loop at this point will lead to another
                    // buffer request before breaking out (that would not be
                    // a problem per se, but it can lead to stalls in the
                    // pipeline).
                    if (result.isFullRecord()) {
                        break;
                    }
                } else {
                    buffer = targetPartition.getBufferProvider().requestBufferBlocking();
                    result = serializer.setNextBuffer(buffer);
                }
            }
        }
    }
复制代码
RecordWriterOutput.collect –> StreamRecordWriter.emit –> RecordWriter.emit
 

inputQueueLength, outputQueueLength, inPoolUsage, outPoolUsage

TaskIOMetricGroup
复制代码
   /**
     * Initialize Buffer Metrics for a task
     */
    public void initializeBufferMetrics(Task task) {
        final MetricGroup buffers = addGroup("buffers");
        buffers.gauge("inputQueueLength", new InputBuffersGauge(task));
        buffers.gauge("outputQueueLength", new OutputBuffersGauge(task));
        buffers.gauge("inPoolUsage", new InputBufferPoolUsageGauge(task));
        buffers.gauge("outPoolUsage", new OutputBufferPoolUsageGauge(task));
    }
复制代码
 

inputQueueLength

for (SingleInputGate inputGate : task.getAllInputGates()) {
    totalBuffers += inputGate.getNumberOfQueuedBuffers();
}
inputGate.getNumberOfQueuedBuffers
for (InputChannel channel : inputChannels.values()) {
    if (channel instanceof RemoteInputChannel) { // 只统计RemoteInputChannel
        totalBuffers += ((RemoteInputChannel) channel).getNumberOfQueuedBuffers();
    }
}
getNumberOfQueuedBuffers
复制代码
/**
     * The received buffers. Received buffers are enqueued by the network I/O thread and the queue
     * is consumed by the receiving task thread.
     */
    private final Queue<Buffer> receivedBuffers = new ArrayDeque<>();

    public int getNumberOfQueuedBuffers() {
        synchronized (receivedBuffers) {
            return receivedBuffers.size();
        }
    }
复制代码
 
outputQueueLength
for (ResultPartition producedPartition : task.getProducedPartitions()) {
    totalBuffers += producedPartition.getNumberOfQueuedBuffers();
}
ResultPartition getNumberOfQueuedBuffers
for (ResultSubpartition subpartition : subpartitions) {
    totalBuffers += subpartition.getNumberOfQueuedBuffers();
}

SpillableSubpartition getNumberOfQueuedBuffers

复制代码
class SpillableSubpartition extends ResultSubpartition {
    /** Buffers are kept in this queue as long as we weren't ask to release any. */
    private final ArrayDeque<Buffer> buffers = new ArrayDeque<>();
    
    @Override
    public int getNumberOfQueuedBuffers() {
        return buffers.size();
    }
复制代码

inputQueueLength, outputQueueLength

指标的含义是,inputchannel和resultparitition,持有的buffer个数,这些buffer被读完后会release,所以链路通畅的话,length应该会很小

 

inPoolUsage
复制代码
int usedBuffers = 0;
int bufferPoolSize = 0;

for (SingleInputGate inputGate : task.getAllInputGates()) {
    usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
    bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
    return ((float) usedBuffers) / bufferPoolSize;
} else {
    return 0.0f;
}
复制代码

bestEffortGetNumOfUsedBuffers()

@Override
public int bestEffortGetNumOfUsedBuffers() {
    return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
}

numberOfRequestedMemorySegments,从bufferpool申请多少 
availableMemorySegments,可用的 
所以相减就是使用多少

 
outPoolUsage
复制代码
int usedBuffers = 0;
int bufferPoolSize = 0;

for (ResultPartition resultPartition : task.getProducedPartitions()) {
    usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
    bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}

if (bufferPoolSize != 0) {
    return ((float) usedBuffers) / bufferPoolSize;
} else {
    return 0.0f;
}
复制代码
和inPoolUsage类似,也是看bufferPool的情况
所以inPoolUsage,outPoolUsage表示的是inputgate和resultpartition中bufferpool的使用情况
这个bufferpool是inputgate初始化的时候,注册到NetworkEnvironment创建的,
复制代码
// Setup the buffer pool for each buffer reader
final SingleInputGate[] inputGates = task.getAllInputGates();

for (SingleInputGate gate : inputGates) {
    BufferPool bufferPool = null;

    try {
        bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
        gate.setBufferPool(bufferPool);
    }
复制代码

可以看到默认大小是,inputchanels的size

如果pool用完了,那么inputGate和ResultPartiton就无法继续读取新的数据

 

latency

在AbstractStreamOperator中,

setup,

protected LatencyGauge latencyGauge;
latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
 
注意,这里metrics是OperatorMetricGroup
this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());

TaskMetricGroup

复制代码
    public OperatorMetricGroup addOperator(String name) {
        OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);

        synchronized (this) {
            OperatorMetricGroup previous = operators.put(name, operator);
            if (previous == null) {
                // no operator group so far
                return operator;
            } else {
                // already had an operator group. restore that one.
                operators.put(name, previous);
                return previous;
            }
        }
    }
复制代码

 

 

LatencyGauge的定义,

复制代码
/**
     * The gauge uses a HashMap internally to avoid classloading issues when accessing
     * the values using JMX.
     */
    protected static class LatencyGauge implements Gauge<Map<String, HashMap<String, Double>>> {
    
        //LatencySourceDescriptor,包含vertexID和subtaskIndex
        //DescriptiveStatistics,统计模块
        private final Map<LatencySourceDescriptor, DescriptiveStatistics> latencyStats = new HashMap<>();
        private final int historySize;

        LatencyGauge(int historySize) {
            this.historySize = historySize;
        }

        public void reportLatency(LatencyMarker marker, boolean isSink) {
            LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, !isSink);
            DescriptiveStatistics sourceStats = latencyStats.get(sourceDescriptor);
            if (sourceStats == null) { //初始化DescriptiveStatistics
                // 512 element window (4 kb)
                sourceStats = new DescriptiveStatistics(this.historySize);
                latencyStats.put(sourceDescriptor, sourceStats);
            }
            long now = System.currentTimeMillis();
            sourceStats.addValue(now - marker.getMarkedTime()); //当前时间和source发出时时间差值作为延迟
        }

        @Override
        public Map<String, HashMap<String, Double>> getValue() {
            while (true) {
                try {
                    Map<String, HashMap<String, Double>> ret = new HashMap<>();
                    for (Map.Entry<LatencySourceDescriptor, DescriptiveStatistics> source : latencyStats.entrySet()) {
                        HashMap<String, Double> sourceStatistics = new HashMap<>(6);
                        sourceStatistics.put("max", source.getValue().getMax());
                        sourceStatistics.put("mean", source.getValue().getMean());
                        sourceStatistics.put("min", source.getValue().getMin());
                        sourceStatistics.put("p50", source.getValue().getPercentile(50));
                        sourceStatistics.put("p95", source.getValue().getPercentile(95));
                        sourceStatistics.put("p99", source.getValue().getPercentile(99));
                        ret.put(source.getKey().toString(), sourceStatistics);
                    }
                    return ret;
                    // Concurrent access onto the "latencyStats" map could cause
                    // ConcurrentModificationExceptions. To avoid unnecessary blocking
                    // of the reportLatency() method, we retry this operation until
                    // it succeeds.
                } catch(ConcurrentModificationException ignore) {
                    LOG.debug("Unable to report latency statistics", ignore);
                }
            }
        }
    }
  
复制代码

这个Gauge.getValue返回的是个map,太奇葩

latencyStats里面有多少entry,取决于有多少source,以及每个source有几个并发

因为他要记录,每个source operator的某个subtask,到当前operator的该subtask的延迟

复制代码
        public static LatencySourceDescriptor of(LatencyMarker marker, boolean ignoreSubtaskIndex) {
            if (ignoreSubtaskIndex) {
                return new LatencySourceDescriptor(marker.getVertexID(), -1);
            } else {
                return new LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex());
            }

        }
复制代码

LatencySourceDescriptor构造函数,由vertexid,和subtaskIndex组成

如果忽略subtaskindex,置为-1

 

 

流程

StreamSource

定义LatencyMarksEmitter

复制代码
private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(
                final ProcessingTimeService processingTimeService,
                final Output<StreamRecord<OUT>> output,
                long latencyTrackingInterval,
                final int vertexID,
                final int subtaskIndex) {

            latencyMarkTimer = processingTimeService.scheduleAtFixedRate( //根据processingTime定期发送latencyMarker
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        try {
                            // ProcessingTimeService callbacks are executed under the checkpointing lock
                            output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex)); //emitLatencyMarker,以processTime为初始时间
                        } catch (Throwable t) {
                            // we catch the Throwables here so that we don't trigger the processing
                            // timer services async exception handler
                            LOG.warn("Error while emitting latency marker.", t);
                        }
                    }
                },
                0L,
                latencyTrackingInterval);
        }
复制代码

 

source.run,当isLatencyTrackingEnabled,schedule latency marker

复制代码
public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        LatencyMarksEmitter latencyEmitter = null;
        if(getExecutionConfig().isLatencyTrackingEnabled()) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                getExecutionConfig().getLatencyTrackingInterval(),
                getOperatorConfig().getVertexID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }
复制代码

 

 

StreamInputProcessor –> processInput

如果是isLatencyMarker

复制代码
else if(recordOrMark.isLatencyMarker()) {
    // handle latency marker
    synchronized (lock) {
        streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
    }
    continue;
}
复制代码

对于,chaining, ChainingOutput

复制代码
private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
    
    protected final OneInputStreamOperator<T, ?> operator;
    protected final Counter numRecordsIn;

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        try {
            operator.processLatencyMarker(latencyMarker);
        }
        catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }
复制代码

 

AbstractStreamOperator

public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        reportOrForwardLatencyMarker(latencyMarker);
    }

 

复制代码
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        // all operators are tracking latencies
        this.latencyGauge.reportLatency(marker, false);

        // everything except sinks forwards latency markers
        this.output.emitLatencyMarker(marker);
    }
复制代码

调用到latencyGauge.reportLatency,逻辑如上

后续继续emitLatencyMarker

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 监控 API
Flink教程(27)- Flink Metrics监控
Flink教程(27)- Flink Metrics监控
1070 1
|
Oracle 关系型数据库 数据库连接
实时计算 Flink版操作报错之遇到Unable to register metrics as an,该怎么处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用合集之metrics缺少debezium的相关指标,如何获取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
监控 大数据 API
大数据Flink Metrics监控
大数据Flink Metrics监控
305 0
|
Prometheus 监控 Cloud Native
Flink Metrics 简介
Fink Metrics 是 Flink 内部的一个度量系统,除了可以在 Flink UI 上展示运行时的指标,也可以对外暴露接口集成到第三方系统,本文详述了这两方面的应用
1099 0
|
SQL 运维 监控
Apache Flink 进阶(八):详解 Metrics 原理与实战
本文由 Apache Flink Contributor 刘彪分享,本文对两大问题进行了详细的介绍,即什么是 Metrics、如何使用 Metrics,并对 Metrics 监控实战进行解释说明。
Apache Flink 进阶(八):详解 Metrics 原理与实战
|
Java 流计算 容器
|
8月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
756 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。