Flink 源码:广播流状态源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析DNS,个人版 1个月
全局流量管理 GTM,标准版 1个月
简介: Broadcast State 是 Operator State 的一种特殊类型。它的引入是为了支持这样的场景: 一个流的记录需要广播到所有下游任务,在这些用例中,它们用于在所有子任务中维护相同的状态。然后可以在处理第二个流的数据时访问这个广播状态,广播状态有自己的一些特性。必须定义为一个 Map 结构。

Broadcast State 是 Operator State 的一种特殊类型。它的引入是为了支持这样的场景: 一个流的记录需要广播到所有下游任务,在这些用例中,它们用于在所有子任务中维护相同的状态。然后可以在处理第二个流的数据时访问这个广播状态,广播状态有自己的一些特性。


必须定义为一个 Map 结构。


广播状态只能在广播流侧修改,非广播侧不能修改状态。


Broadcast State 运行时的状态只能保存在内存中。


看到这相信你肯定会有下面的疑问:


广播状态为什么必须定义为 Map 结构,我用其他的状态类型不行吗?


广播状态为什么只能在广播侧修改,非广播侧为什么不能修改呢?


广播状态为什么只能保存在内存中,难道不能用 Rockdb 状态后端吗?


下面就带着这三个疑问通过阅读相关源码,回答上面的问题。


broadcast 源码


/**
 * Sets the partitioning of the {@link DataStream} so that the output elements are broadcasted
 * to every parallel instance of the next operation. In addition, it implicitly as many {@link
 * org.apache.flink.api.common.state.BroadcastState broadcast states} as the specified
 * descriptors which can be used to store the element of the stream.
 *
 * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
 * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)}
 *     to create a {@link BroadcastConnectedStream} for further processing of the elements.
 */
@PublicEvolving
public BroadcastStream<T> broadcast(
        final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
    Preconditions.checkNotNull(broadcastStateDescriptors);
    final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
    return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}


可以发现 broadcast 方法需要的参数是 MapStateDescriptor 也就是一个 Map 结构的状态描述符,我们在使用的时候就必须定义为 MapStateDescriptor,否则会直接报错,其实主要是因为广播状态的作用是和非广播流进行关联,你可以想象成双流 join 的场景,那么 join 的时候就必须要有一个主键,也就是相同的 key 才能 join 上,所以 Map(key-value) 结构是最适合这种场景的,key 可以存储要关联字段,value 可以是任意类型的广播数据,在关联的时候只需要获取到广播状态,然后 state.get(key) 就可以很容易拿到广播数据。


process 源码


@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function) {
  // 获取输出数据的类型信息
    TypeInformation<OUT> outTypeInfo =
            TypeExtractor.getBinaryOperatorReturnType(
                    function,
                    KeyedBroadcastProcessFunction.class,
                    1,
                    2,
                    3,
                    TypeExtractor.NO_INDEX,
                    getType1(),
                    getType2(),
                    Utils.getCallLocationName(),
                    true);
    return process(function, outTypeInfo);
}


process 方法需要的参数是 KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT>,跟普通的 KeyedProcessFunction<K, I, O> 相比,很容易发现多了一个泛型参数,因为这里的 process 上游连接的是两个数据流,所以需要两个类型。然后调用 process 的重载方法。


process 源码


@PublicEvolving
public <KEY, OUT> SingleOutputStreamOperator<OUT> process(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> function,
        final TypeInformation<OUT> outTypeInfo) {
    Preconditions.checkNotNull(function);
    Preconditions.checkArgument(
            nonBroadcastStream instanceof KeyedStream,
            "A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
    return transform(function, outTypeInfo);
}


这个 process 方法里面什么都没干,直接调用 transform 方法。


transform 源码


@Internal
private <KEY, OUT> SingleOutputStreamOperator<OUT> transform(
        final KeyedBroadcastProcessFunction<KEY, IN1, IN2, OUT> userFunction,
        final TypeInformation<OUT> outTypeInfo) {
    // read the output type of the input Transforms to coax out errors about MissingTypeInfo
    nonBroadcastStream.getType();
    broadcastStream.getType();
    KeyedStream<IN1, KEY> keyedInputStream = (KeyedStream<IN1, KEY>) nonBroadcastStream;
  // 构造 KeyedBroadcastStateTransformation
    final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation =
            new KeyedBroadcastStateTransformation<>(
                    "Co-Process-Broadcast-Keyed",
                    nonBroadcastStream.getTransformation(),
                    broadcastStream.getTransformation(),
                    clean(userFunction),
                    broadcastStateDescriptors,
                    keyedInputStream.getKeyType(),
                    keyedInputStream.getKeySelector(),
                    outTypeInfo,
                    environment.getParallelism());
    @SuppressWarnings({"unchecked", "rawtypes"})
    final SingleOutputStreamOperator<OUT> returnStream =
            new SingleOutputStreamOperator(environment, transformation);
  // 添加到 List<Transformation<?>> 集合
    getExecutionEnvironment().addOperator(transformation);
    return returnStream;
}


transform 方法里面主要做了两件事:


先是构造对应的 KeyedBroadcastStateTransformation 对象,其实  KeyedBroadcastStateTransformation 也是 Transformation 的一个子类。


然后把构造好的 transformation 添加到 List<Transformation<?>> 集合里,后面在构建 StreamGraph 的时候会从这个集合里获取 Transformation。


getStreamGraph 源码


@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
    final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
    if (clearTransformations) {
        transformations.clear();
    }
    return streamGraph;
}


getStreamGraph 的主要作用就是生成 StreamGraph。下面就会用到上一步生成的 List<Transformation<?>> 集合,因为这篇文章主要是分析 Flink 广播流的源码,所以只会对广播流相关的源码进行解析。


getStreamGraphGenerator 源码


private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
    if (transformations.size() <= 0) {
        throw new IllegalStateException(
                "No operators defined in streaming topology. Cannot execute.");
    }
    // We copy the transformation so that newly added transformations cannot intervene with the
    // stream graph generation.
    return new StreamGraphGenerator(
                    new ArrayList<>(transformations), config, checkpointCfg, configuration)
            .setStateBackend(defaultStateBackend)
            .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
            .setSavepointDir(defaultSavepointDirectory)
            .setChaining(isChainingEnabled)
            .setUserArtifacts(cacheFile)
            .setTimeCharacteristic(timeCharacteristic)
            .setDefaultBufferTimeout(bufferTimeout)
            .setSlotSharingGroupResource(slotSharingGroupResources);
}


getStreamGraphGenerator 方法主要就是构造 StreamGraphGenerator 对象,StreamGraphGenerator 构造完成后,就可以调用 generate 方法来产生 StreamGraph 了,在看 generate 方法之前先来看一下 StreamGraphGenerator 的静态代码块。


StreamGraphGenerator 源码


static {
    @SuppressWarnings("rawtypes")
    Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
            tmp = new HashMap<>();
    tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
    tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
    tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
    tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
    tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
    tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
    tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
    tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
    tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
    tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
    tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
    tmp.put(
            TimestampsAndWatermarksTransformation.class,
            new TimestampsAndWatermarksTransformationTranslator<>());
    tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
    tmp.put(
            KeyedBroadcastStateTransformation.class,
            new KeyedBroadcastStateTransformationTranslator<>());
    translatorMap = Collections.unmodifiableMap(tmp);
}


在初始化 StreamGraphGenerator 之前,会先执行其静态代码块生成一个 Transformation -> TransformationTranslator 映射关系的 Map 集合,后面会用到这个 Map。


transform 源码


// 根据 Transformation 获取对应的 TransformationTranslator 
final TransformationTranslator<?, Transformation<?>> translator =
        (TransformationTranslator<?, Transformation<?>>)
                translatorMap.get(transform.getClass());
Collection<Integer> transformedIds;
if (translator != null) {
    transformedIds = translate(translator, transform);
} else {
    transformedIds = legacyTransform(transform);
}


构造完 StreamGraphGenerator 对象后,紧接着会调用 generate 方法,然后又调用了 transform 方法,这里会从上面生成的 Map 里面获取到对应的 TransformationTranslator,然后调用 translate 方法。


translate#translateForStreaming#translateForStreamingInternal 源码


@Override
protected Collection<Integer> translateForStreamingInternal(
        final KeyedBroadcastStateTransformation<KEY, IN1, IN2, OUT> transformation,
        final Context context) {
    checkNotNull(transformation);
    checkNotNull(context);
  // 构建 CoBroadcastWithKeyedOperator 
    CoBroadcastWithKeyedOperator<KEY, IN1, IN2, OUT> operator =
            new CoBroadcastWithKeyedOperator<>(
                    transformation.getUserFunction(),
                    transformation.getBroadcastStateDescriptors());
    return translateInternal(
            transformation,
            transformation.getRegularInput(),
            transformation.getBroadcastInput(),
            SimpleOperatorFactory.of(operator),
            transformation.getStateKeyType(),
            transformation.getKeySelector(),
            null /* no key selector on broadcast input */,
            context);
}


translate 方法最终会调用到 KeyedBroadcastStateTransformationTranslator 的 translateForStreamingInternal 方法中,根据 UserFunction(用户代码)和 broadcastStateDescriptors(广播状态描述符)构造CoBroadcastWithKeyedOperator 对象。


CoBroadcastWithKeyedOperator 源码


/**
 * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction
 * KeyedBroadcastProcessFunctions}.
 *
 * @param <KS> The key type of the input keyed stream.
 * @param <IN1> The input type of the keyed (non-broadcast) side.
 * @param <IN2> The input type of the broadcast side.
 * @param <OUT> The output type of the operator.
 */
@Internal
public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>>
        implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<KS, VoidNamespace> {
    private static final long serialVersionUID = 5926499536290284870L;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    private transient TimestampedCollector<OUT> collector;
    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
    private transient ReadWriteContextImpl rwContext;
    private transient ReadOnlyContextImpl rContext;
    private transient OnTimerContextImpl onTimerContext;
    public CoBroadcastWithKeyedOperator(
            final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
            final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
        super(function);
        this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors);
    }
    @Override
    public void open() throws Exception {
        super.open();
        InternalTimerService<VoidNamespace> internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        collector = new TimestampedCollector<>(output);
        this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> descriptor : broadcastStateDescriptors) {
            broadcastStates.put(
                    descriptor, 
              // 初始化状态实现实例
              getOperatorStateBackend().getBroadcastState(descriptor));
        }
        rwContext =
                new ReadWriteContextImpl(
                        getExecutionConfig(),
                        getKeyedStateBackend(),
                        userFunction,
                        broadcastStates,
                        timerService);
        rContext =
                new ReadOnlyContextImpl(
                        getExecutionConfig(), userFunction, broadcastStates, timerService);
        onTimerContext =
                new OnTimerContextImpl(
                        getExecutionConfig(), userFunction, broadcastStates, timerService);
    }
    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        collector.setTimestamp(element);
        rContext.setElement(element);
        userFunction.processElement(element.getValue(), rContext, collector);
        rContext.setElement(null);
    }
    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        collector.setTimestamp(element);
        rwContext.setElement(element);
        userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
        rwContext.setElement(null);
    }
    private class ReadWriteContextImpl
            extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {
        private final ExecutionConfig config;
        private final KeyedStateBackend<KS> keyedStateBackend;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private StreamRecord<IN2> element;
        ReadWriteContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedStateBackend<KS> keyedStateBackend,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {
            function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }
        void setElement(StreamRecord<IN2> e) {
            this.element = e;
        }
        @Override
        public Long timestamp() {
            checkState(element != null);
            return element.getTimestamp();
        }
        @Override
        public <K, V> BroadcastState<K, V> getBroadcastState(
                MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(config);
            BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist. "
                                + "Check for typos in your state descriptor, or specify the state descriptor "
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }
        @Override
        public long currentProcessingTime() {
            return timerService.currentProcessingTime();
        }
        @Override
        public long currentWatermark() {
            return timerService.currentWatermark();
        }
        @Override
        public <VS, S extends State> void applyToKeyedState(
                final StateDescriptor<S, VS> stateDescriptor,
                final KeyedStateFunction<KS, S> function)
                throws Exception {
            keyedStateBackend.applyToAllKeys(
                    VoidNamespace.INSTANCE,
                    VoidNamespaceSerializer.INSTANCE,
                    Preconditions.checkNotNull(stateDescriptor),
                    Preconditions.checkNotNull(function));
        }
    }
    private class ReadOnlyContextImpl extends ReadOnlyContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private StreamRecord<IN1> element;
        ReadOnlyContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {
            function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }
        void setElement(StreamRecord<IN1> e) {
            this.element = e;
        }
        @Override
        public Long timestamp() {
            checkState(element != null);
            return element.hasTimestamp() ? element.getTimestamp() : null;
        }
        @Override
        public TimerService timerService() {
            return timerService;
        }
        @Override
        public long currentProcessingTime() {
            return timerService.currentProcessingTime();
        }
        @Override
        public long currentWatermark() {
            return timerService.currentWatermark();
        }
        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }
        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
                MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(config);
            ReadOnlyBroadcastState<K, V> state =
                    (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist. "
                                + "Check for typos in your state descriptor, or specify the state descriptor "
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
        @Override
        @SuppressWarnings("unchecked")
        public KS getCurrentKey() {
            return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();
        }
    }
    private class OnTimerContextImpl
            extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final TimerService timerService;
        private TimeDomain timeDomain;
        private InternalTimer<KS, VoidNamespace> timer;
        OnTimerContextImpl(
                final ExecutionConfig executionConfig,
                final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
                final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
                final TimerService timerService) {
            function.super();
            this.config = Preconditions.checkNotNull(executionConfig);
            this.states = Preconditions.checkNotNull(broadcastStates);
            this.timerService = Preconditions.checkNotNull(timerService);
        }
        @Override
        public Long timestamp() {
            checkState(timer != null);
            return timer.getTimestamp();
        }
        @Override
        public TimeDomain timeDomain() {
            checkState(timeDomain != null);
            return timeDomain;
        }
        @Override
        public KS getCurrentKey() {
            return timer.getKey();
        }
        @Override
        public TimerService timerService() {
            return timerService;
        }
        @Override
        public long currentProcessingTime() {
            return timerService.currentProcessingTime();
        }
        @Override
        public long currentWatermark() {
            return timerService.currentWatermark();
        }
        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            checkArgument(outputTag != null, "OutputTag must not be null.");
            output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
        }
        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
                MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(config);
            ReadOnlyBroadcastState<K, V> state =
                    (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException(
                        "The requested state does not exist. "
                                + "Check for typos in your state descriptor, or specify the state descriptor "
                                + "in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
    }
}


在分析 CoBroadcastWithKeyedOperator 源码之前,先来看一下 CoBroadcastWithKeyedOperator 的 UML 图。


CoBroadcastWithKeyedOperator UML 图



CoBroadcastWithKeyedOperator


可以看到 CoBroadcastWithKeyedOperator 实现了 TwoInputStreamOperator 这个接口,从命名上就能知道,这是一个具有两个输入流的 StreamOperator 接口,因为 CoBroadcastWithKeyedOperator 的上游连接的是两个数据流,所以就实现了这个接口,下面再来看一下 TwoInputStreamOperator 的源码。


TwoInputStreamOperator 源码


/**
 * Interface for stream operators with two inputs. Use {@link
 * org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if you want to
 * implement a custom operator.
 *
 * @param <IN1> The input type of the operator
 * @param <IN2> The input type of the operator
 * @param <OUT> The output type of the operator
 */
@PublicEvolving
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
    /**
     * Processes one element that arrived on the first input of this two-input operator. This method
     * is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement1(StreamRecord<IN1> element) throws Exception;
    /**
     * Processes one element that arrived on the second input of this two-input operator. This
     * method is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement2(StreamRecord<IN2> element) throws Exception;
}


TwoInputStreamOperator 接口里面定义了两个方法,其中 processElement1 是用来处理非广播流的数据,processElement2 是用来处理广播流的数据。


接着回到 CoBroadcastWithKeyedOperator 的 open 方法,首先会初始化 broadcastStates,用来保存 MapStateDescriptor -> BroadcastState 的映射关系,然后初始化 ReadWriteContextImpl 和 ReadOnlyContextImpl 对象,顾名思义 ReadWriteContextImpl 是既可以读也可以写状态,ReadOnlyContextImpl  是只能读状态,不能写状态,在 open 方法里面还有一个重要的事情,就是初始化广播状态的实现类。


getBroadcastState 源码


public <K, V> BroadcastState<K, V> getBroadcastState(
        final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
    Preconditions.checkNotNull(stateDescriptor);
    String name = Preconditions.checkNotNull(stateDescriptor.getName());
    BackendWritableBroadcastState<K, V> previous =
            (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
    if (previous != null) {
        checkStateNameAndMode(
                previous.getStateMetaInfo().getName(),
                name,
                previous.getStateMetaInfo().getAssignmentMode(),
                OperatorStateHandle.Mode.BROADCAST);
        return previous;
    }
    stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    TypeSerializer<K> broadcastStateKeySerializer =
            Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    TypeSerializer<V> broadcastStateValueSerializer =
            Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    BackendWritableBroadcastState<K, V> broadcastState =
            (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
    if (broadcastState == null) {
        broadcastState =
                new HeapBroadcastState<>(
                        new RegisteredBroadcastStateBackendMetaInfo<>(
                                name,
                                OperatorStateHandle.Mode.BROADCAST,
                                broadcastStateKeySerializer,
                                broadcastStateValueSerializer));
        registeredBroadcastStates.put(name, broadcastState);
    } else {
        // has restored state; check compatibility of new state access
        checkStateNameAndMode(
                broadcastState.getStateMetaInfo().getName(),
                name,
                broadcastState.getStateMetaInfo().getAssignmentMode(),
                OperatorStateHandle.Mode.BROADCAST);
        RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo =
                broadcastState.getStateMetaInfo();
        // check whether new serializers are incompatible
        TypeSerializerSchemaCompatibility<K> keyCompatibility =
                restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
        if (keyCompatibility.isIncompatible()) {
            throw new StateMigrationException(
                    "The new key typeSerializer for broadcast state must not be incompatible.");
        }
        TypeSerializerSchemaCompatibility<V> valueCompatibility =
                restoredBroadcastStateMetaInfo.updateValueSerializer(
                        broadcastStateValueSerializer);
        if (valueCompatibility.isIncompatible()) {
            throw new StateMigrationException(
                    "The new value typeSerializer for broadcast state must not be incompatible.");
        }
        broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
    }
    accessedBroadcastStatesByName.put(name, broadcastState);
    return broadcastState;
}


getBroadcastState 方法主要就是初始化 HeapBroadcastState 对象,也就是广播状态的具体实现类,再来看一下 HeapBroadcastState 源码。


HeapBroadcastState 源码


/**
 * A {@link BroadcastState Broadcast State} backed a heap-based {@link Map}.
 *
 * @param <K> The key type of the elements in the {@link BroadcastState Broadcast State}.
 * @param <V> The value type of the elements in the {@link BroadcastState Broadcast State}.
 */
public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K, V> {
    /** Meta information of the state, including state name, assignment mode, and serializer. */
    private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
    /** The internal map the holds the elements of the state. */
    private final Map<K, V> backingMap;
    /** A serializer that allows to perform deep copies of internal map state. */
    private final MapSerializer<K, V> internalMapCopySerializer;
    HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
        this(stateMetaInfo, new HashMap<>());
    }
    private HeapBroadcastState(
            final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo,
            final Map<K, V> internalMap) {
        this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
        this.backingMap = Preconditions.checkNotNull(internalMap);
        this.internalMapCopySerializer =
                new MapSerializer<>(
                        stateMetaInfo.getKeySerializer(), stateMetaInfo.getValueSerializer());
    }
    private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
        this(
                toCopy.stateMetaInfo.deepCopy(),
                toCopy.internalMapCopySerializer.copy(toCopy.backingMap));
    }
    @Override
    public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
        this.stateMetaInfo = stateMetaInfo;
    }
    @Override
    public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
        return stateMetaInfo;
    }
    @Override
    public HeapBroadcastState<K, V> deepCopy() {
        return new HeapBroadcastState<>(this);
    }
    @Override
    public void clear() {
        backingMap.clear();
    }
    @Override
    public String toString() {
        return "HeapBroadcastState{"
                + "stateMetaInfo="
                + stateMetaInfo
                + ", backingMap="
                + backingMap
                + ", internalMapCopySerializer="
                + internalMapCopySerializer
                + '}';
    }
    @Override
    public long write(FSDataOutputStream out) throws IOException {
        long partitionOffset = out.getPos();
        DataOutputView dov = new DataOutputViewStreamWrapper(out);
        dov.writeInt(backingMap.size());
        for (Map.Entry<K, V> entry : backingMap.entrySet()) {
            getStateMetaInfo().getKeySerializer().serialize(entry.getKey(), dov);
            getStateMetaInfo().getValueSerializer().serialize(entry.getValue(), dov);
        }
        return partitionOffset;
    }
    @Override
    public V get(K key) {
        return backingMap.get(key);
    }
    @Override
    public void put(K key, V value) {
        backingMap.put(key, value);
    }
    @Override
    public void putAll(Map<K, V> map) {
        backingMap.putAll(map);
    }
    @Override
    public void remove(K key) {
        backingMap.remove(key);
    }
    @Override
    public boolean contains(K key) {
        return backingMap.containsKey(key);
    }
    @Override
    public Iterator<Map.Entry<K, V>> iterator() {
        return backingMap.entrySet().iterator();
    }
    @Override
    public Iterable<Map.Entry<K, V>> entries() {
        return backingMap.entrySet();
    }
    @Override
    public Iterable<Map.Entry<K, V>> immutableEntries() {
        return Collections.unmodifiableSet(backingMap.entrySet());
    }
}


HeapBroadcastState 的代码比较简单,主要是对状态的读写操作,本质就是在操作 HashMap。


接着回到 CoBroadcastWithKeyedOperator 的 processElement1 方法里用的是 ReadOnlyContextImpl,processElement2 方法里用的是 ReadWriteContextImpl,换句话说,只有在广播侧才可以修改状态,在非广播侧不能修改状态,这里对应了上面的第二个问题。


虽然在广播侧和非广侧都可以获取到状态,但是 getBroadcastState 方法的返回值是不一样的。


BroadcastState & ReadOnlyBroadcastState UML 图



HeapBroadcastState


BroadcastState 接口继承了 ReadOnlyBroadcastState 接口又继承了 State 接口,BroadcastState 接口的唯一实现类是 HeapBroadcastState,从名字上就能看出广播状态是存储在 JVM 堆内存上的。底层就是一个 Map,上图中的 backingMap 就是用来保存状态数据的,这里对应了上面的第三个问题。


为了进一步解释上面的第二个问题,下面补充一个具体的场景来说明。


举例说明



BroadcastStream


我们来看上图中的场景,A 流读取 Kafka 的数据然后经过 keyby 返回一个 KeyedStream,B 流读取 mysql 的数据用于广播流返回一个 BroadcastStream,B 流有两条数据分别是 flink,spark,然后会广播到下游的每一个 subtask 上去,此时下游的 subtask-0,subtask-1 就拥有了广播状态中的 flink,spark 两条数据,这个时候往 Kafka 里写入两条数据分别是 flink 和 hive,经过 keyby 操作,flink 被分配到了下游的 subtask-0 上,hive 被分配到了 subtask-1 上,很明显 flink 这条数据可以和广播流数据关联上,hive 这条数据则关联不上,此时,如果在非广播侧也就是 A 流侧修改了状态,比如把 flink, hive 添加到了状态里面,此时 subtask-0 和 subtask-1 上的广播状态数据就会出现不一致的情况,所以,为了保证 operator 的所有并发实例持有的广播状态的一致性,在设计的时候就禁止在非广播侧修改状态。


总结


Broadcast State 是 Operator State 的一种特殊类型。主要是用来解决低吞吐量的流(小数据量)和另一个原始数据流关联的场景,广播状态必须定义为 Map 结构,并且只能在广播流侧修改状态,非广播流侧只能获取状态,不能修改状态。广播状态只能保存在堆内存中,所以在使用广播状态的时候需要给 TM 设置足够的内存,本文主要从源码的角度解释了 Flink 这么设计的原因,让大家对广播流状态有了更加深入的理解。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8天前
|
机器学习/深度学习 缓存 算法
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
netty源码解解析(4.0)-25 ByteBuf内存池:PoolArena-PoolChunk
|
2天前
|
Java 数据库连接 Spring
Spring 整合 MyBatis 底层源码解析
Spring 整合 MyBatis 底层源码解析
|
2天前
|
NoSQL Java Redis
【源码解析】自动配置的这些细节都不知道,别说你会 springboot
【源码解析】自动配置的这些细节都不知道,别说你会 springboot
|
9天前
|
存储 NoSQL 算法
Redis(四):del/unlink 命令源码解析
Redis(四):del/unlink 命令源码解析
|
2天前
|
Java 容器 Spring
Spring5源码解析5-ConfigurationClassPostProcessor (上)
Spring5源码解析5-ConfigurationClassPostProcessor (上)
|
2天前
|
分布式计算 API Apache
[白话解析] Flink的Watermark机制
[白话解析] Flink的Watermark机制
|
8天前
|
XML Java 数据格式
Spring容器启动源码解析
Spring容器启动源码解析
|
10天前
|
XML Java 数据格式
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
深度解析 Spring 源码:从 BeanDefinition 源码探索 Bean 的本质
23 3
|
10天前
|
XML Java 数据格式
深度解析 Spring 源码:揭秘 BeanFactory 之谜
深度解析 Spring 源码:揭秘 BeanFactory 之谜
16 1
|
20天前
|
SQL 缓存 算法
【源码解析】Pandas PandasObject类详解的学习与实践
【源码解析】Pandas PandasObject类详解的学习与实践

推荐镜像

更多