Flink kafka source & sink 源码解析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 本文基于 Flink 1.9.0 和 Kafka 2.3 版本,对 Flink Kafka source 和 sink 端的源码进行解析,主要分为 Flink-kafka-source 源码解析、Flink-kafka-sink 源码解析两部分。

Flink kafka source & sink 源码解析
原创 吴鹏 Flink 中文社区 4天前
摘要:本文基于 Flink 1.9.0 和 Kafka 2.3 版本,对 Flink Kafka source 和 sink 端的源码进行解析,主要内容分为以下两部分:

1.Flink-kafka-source 源码解析

  • 流程概述
  • 非 checkpoint 模式 offset 的提交
  • checkpoint 模式下 offset 的提交
  • 指定 offset 消费

2.Flink-kafka-sink 源码解析

  • 初始化
  • Task运行
  • 小结

1.Flink-kafka-source 源码解析

流程概述

一般在 Flink 中创建 kafka source 的代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema为自定义的数据字段解析类
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

而 Kafka 的 KafkaConsumer API 中消费某个 topic 使用的是 poll 方法如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));

下面将分析这两个流程是如何衔接起来的。

初始化

初始化执行 env.addSource 的时候会创建 StreamSource 对象,即 final StreamSource sourceOperator = new StreamSource<>(function);这里的function 就是传入的 FlinkKafkaConsumer 对象,StreamSource 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量,源码如下:

■ StreamSource.java

public StreamSource(SRC sourceFunction) {
    super(sourceFunction);
    this.chainingStrategy = ChainingStrategy.HEAD;
}

■ AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}

Task运行

task 启动后会调用到 SourceStreamTask 中的 performDefaultAction() 方法,这里面会启动一个线程 sourceThread.start();,部分源码如下:

private final LegacySourceFunctionThread sourceThread;

@Override
protected void performDefaultAction(ActionContext context) throws Exception {
    sourceThread.start();
}

在 LegacySourceFunctionThread 的 run 方法中,通过调用 headOperator.run 方法,最终调用了 StreamSource 中的 run 方法,部分源码如下:

public void run(final Object lockingObject,
                final StreamStatusMaintainer streamStatusMaintainer,
                final Output<StreamRecord<OUT>> collector,
                final OperatorChain<?, ?> operatorChain) throws Exception {

  //省略部分代码
  this.ctx = StreamSourceContexts.getSourceContext(
    timeCharacteristic,
    getProcessingTimeService(),
    lockingObject,
    streamStatusMaintainer,
    collector,
    watermarkInterval,
    -1);

  try {
    userFunction.run(ctx);
    //省略部分代码
  } finally {
    // make sure that the context is closed in any case
    ctx.close();
    if (latencyEmitter != null) {
      latencyEmitter.close();
    }
  }
}

这里最重要的就是 userFunction.run(ctx);,这个 userFunction 就是在上面初始化的时候传入的 FlinkKafkaConsumer 对象,也就是说这里实际调用了 FlinkKafkaConsumer 中的 run 方法,而具体的方法实现在其父类 FlinkKafkaConsumerBase中,至此,进入了真正的 kafka 消费阶段。

Kafka消费阶段

在 FlinkKafkaConsumerBase#run 中创建了一个 KafkaFetcher 对象,并最终调用了 kafkaFetcher.runFetchLoop(),这个方法的代码片段如下:

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;

@Override
public void runFetchLoop() throws Exception {
  try {
    final Handover handover = this.handover;

    // kick off the actual Kafka consumer
    consumerThread.start();
    
    //省略部分代码
}

可以看到实际启动了一个 KafkaConsumerThread 线程。进入到 KafkaConsumerThread#run 中,下面只是列出了这个方法的部分源码,完整代码请参考 KafkaConsumerThread.java。

@Override
public void run() {
  // early exit check
  if (!running) {
    return;
  }
  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  } catch (Throwable t) {
    handover.reportError(t);
    return;
  }
  try {

    // main fetch loop
    while (running) {
      try {
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          } catch (WakeupException we) {
            continue;
          }
        }
      }
      // end main fetch loop
    }
  } catch (Throwable t) {
    handover.reportError(t);
  } finally {
    handover.close();
    try {
      consumer.close();
    } catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t);
    }
  }
}

至此,终于走到了真正从 kafka 拿数据的代码,即 records = consumer.poll(pollTimeout);。因为 KafkaConsumer 不是线程安全的,所以每个线程都需要生成独立的 KafkaConsumer 对象,即 this.consumer = getConsumer(kafkaProperties);。

KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  return new KafkaConsumer<>(kafkaProperties);
}

小结:本节只是介绍了 Flink 消费 kafka 数据的关键流程,下面会更详细的介绍在AT_LEAST_ONCE和EXACTLY_ONCE 不同场景下 FlinkKafkaConsumer 管理 offset 的流程。

非 checkpoint 模式 offset 的提交

消费 kafka topic 最为重要的部分就是对 offset 的管理,对于 kafka 提交 offset 的机制,可以参考 kafka 官方网。

而在 flink kafka source 中 offset 的提交模式有3种:

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}

初始化 offsetCommitMode

在 FlinkKafkaConsumerBase#open 方法中初始化 offsetCommitMode:

// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
        ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
  • 方法 getIsAutoCommitEnabled() 的实现如下:
protected boolean getIsAutoCommitEnabled() {
   return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
      PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
  • 也就是说只有 enable.auto.commit=true 并且 auto.commit.interval.ms>0 这个方法才会返回 true
  • 变量 enableCommitOnCheckpoints 默认是 true,可以调用 setCommitOffsetsOnCheckpoints 改变这个值
  • 当代码中调用了 env.enableCheckpointing 方法,isCheckpointingEnabled 才会返回 true

通过下面的代码返回真正的提交模式:

/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
   }
}

暂时不考虑 checkpoint 的场景,所以只考虑 (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;。

也就是如果客户端设置了 enable.auto.commit=true 那么就是 KAFKA_PERIODIC,否则就是 KAFKA_DISABLED。

offset 的提交

■ 自动提交

这种方式完全依靠 kafka 自身的特性进行提交,如下方式指定参数即可:

Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

■ 非自动提交

通过上面的分析,如果 enable.auto.commit=false,那么 offsetCommitMode 就是 DISABLED 。

kafka 官方文档中,提到当 enable.auto.commit=false 时候需要手动提交 offset,也就是需要调用 consumer.commitSync(); 方法提交。

但是在 flink 中,非 checkpoint 模式下,不会调用 consumer.commitSync();, 一旦关闭自动提交,意味着 kafka 不知道当前的 consumer group 每次消费到了哪。

可以从两方面证实这个问题:

  • 源码
    KafkaConsumerThread#run 方法中是有 consumer.commitSync();,但是只有当 commitOffsetsAndCallback != null 的时候才会调用。只有开启了checkpoint 功能才会不为 null,这个变量会在后续的文章中详细分析。
  • 测试
    可以通过消费 __consumer_offsets 观察是否有 offset 的提交

重启程序,还是会重复消费之前消费过的数据

小结:本节介绍了在非 checkpoint 模式下,Flink kafka source 提交 offset 的方式,下文会重点介绍 checkpoint 模式下提交 offset 的流程。

checkpoint 模式下 offset 的提交

上面介绍了在没有开启 checkpoint 的时候,offset 的提交方式,下面将重点介绍开启 checkpoint 后,Flink kafka consumer 提交 offset 的方式。

初始化 offsetCommitMode

通过上文可以知道,当调用了 env.enableCheckpointing 方法后 offsetCommitMode 的值就是 ON_CHECKPOINTS,而且会通过下面方法强制关闭 kafka 自动提交功能,这个值很重要,后续很多地方都是根据这个值去判断如何操作的。

/**
 * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS.
 * This overwrites whatever setting the user configured in the properties.
 * @param properties - Kafka configuration properties to be adjusted
 * @param offsetCommitMode offset commit mode
 */
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
      properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
   }
}

保存 offset

在做 checkpoint 的时候会调用 FlinkKafkaConsumerBase#snapshotState 方法,其中 pendingOffsetsToCommit 会保存要提交的 offset。

if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
   // the map cannot be asynchronously updated, because only one checkpoint call can happen
   // on this function at a time: either snapshotState() or notifyCheckpointComplete()
   pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}

同时,下面的变量会作为 checkpoint 的一部分保存下来,以便恢复时使用。

/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

在 snapshotState 方法中会同时保存 offset:

for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}

提交 offset

在 checkpoint 完成以后,task 会调用 notifyCheckpointComplete 方法,里面判断 offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS 的时候,调用fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback); 方法,最终会将要提交的 offset 通过 KafkaFetcher#doCommitInternalOffsetsToKafka 方法中的 consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback); 保存到 KafkaConsumerThread.java 中的 nextOffsetsToCommit 成员变量里面。

这样就会保证当有需要提交的 offset 的时候,下面代码会执行 consumer.commitAsync,从而完成了手动提交 offset 到 kafka。

final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);

if (commitOffsetsAndCallback != null) {
  log.debug("Sending async offset commit request to Kafka broker");

  // also record that a commit is already in progress
  // the order here matters! first set the flag, then send the commit command.
  commitInProgress = true;
  consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}

小结:本节介绍了在 checkpoint 模式下,Flink kafka source 提交 offset 的方式,后续会介绍 consumer 读取 offset 的流程。

指定 offset 消费

消费模式

在 Flink 的 kafka source 中有以下5种模式指定 offset 消费:

public enum StartupMode {

   /** Start from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
   GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

   /** Start from the earliest offset possible. */
   EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),

   /** Start from the latest offset. */
   LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

   /**
    * Start from user-supplied timestamp for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   TIMESTAMP(Long.MIN_VALUE),

   /**
    * Start from user-supplied specific offsets for each partition.
    * Since this mode will have specific offsets to start with, we do not need a sentinel value;
    * using Long.MIN_VALUE as a placeholder.
    */
   SPECIFIC_OFFSETS(Long.MIN_VALUE);
}

默认为 GROUP_OFFSETS,表示根据上一次 group id 提交的 offset 位置开始消费。每个枚举的值其实是一个 long 型的负数,根据不同的模式,在每个 partition 初始化的时候会默认将 offset 设置为这个负数。其他的方式和 kafka 本身的语义类似,就不在赘述。

指定 offset

此处只讨论默认的 GROUP_OFFSETS 方式,下文所有分析都是基于这种模式。但是还是需要区分是否开启了 checkpoint。在开始分析之前需要对几个重要的变量进行说明:

  • subscribedPartitionsToStartOffsets

    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToSt

说明:保存订阅 topic 的所有 partition 以及初始消费的 offset。

  • subscribedPartitionStates

    • 所属类:AbstractFetcher.java
    • 定义:
/** All partitions (and their state) that this fetcher is subscribed to. */
private final List<KafkaTopicPartitionState<KPH>> subscribedPar

说明:保存了所有订阅的 partition 的 offset 等详细信息,例如:

/** The offset within the Kafka partition that we already processed. */
private volatile long offset;
/** The offset of the Kafka partition that has been committed. */
private volatile long committedOffset;

每次消费完数据之后都会更新这些值,这个变量非常的重要,在做 checkpoint 的时候,保存的 offset 等信息都是来自于这个变量。这个变量的初始化如下:

// initialize subscribed partition states with seed partitions
this.subscribedPartitionStates = createPartitionStateHolders(
  seedPartitionsWithInitialOffsets,
  timestampWatermarkMode,
  watermarksPeriodic,
  watermarksPunctuated,
  userCodeClassLoader);

消费之后更新相应的 offset 主要在 KafkaFetcher#runFetchLoop
方法 while 循环中调用 emitRecord(value, partition, record.
offset(), record);。

  • restoredState

    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
/**
     * The offsets to restore to, if the consumer restores state from a checkpoint.
     *
     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
     *
     * <p>Using a sorted map as the ordering is important when using restored state
     * to seed the partition discoverer.
     */
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;

说明:如果指定了恢复的 checkpoint 路径,启动时候将会读取这个变量里面的内容获取起始 offset,而不再是使用 StartupMode 中的枚举值作为初始的 offset。

  • unionOffsetStates

    • 所属类:FlinkKafkaConsumerBase.java
    • 定义:
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

说明:保存了 checkpoint 要持久化存储的内容,例如每个 partition 已经消费的 offset 等信息

■ 非 checkpoint 模式

在没有开启 checkpoint 的时候,消费 kafka 中的数据,其实就是完全依靠 kafka 自身的机制进行消费。

■ checkpoint 模式

开启 checkpoint 模式以后,会将 offset 等信息持久化存储以便恢复时使用。但是作业重启以后如果由于某种原因读不到 checkpoint 的结果,例如 checkpoint 文件丢失或者没有指定恢复路径等。

  • 第一种情况,如果读取不到 checkpoint 的内容

subscribedPartitionsToStartOffsets 会初始化所有 partition 的起始 offset为 -915623761773L 这个值就表示了当前为 GROUP_OFFSETS 模式。

default:
   for (KafkaTopicPartition seedPartition : allPartitions) {
      subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
   }

第一次消费之前,指定读取 offset 位置的关键方法是 KafkaConsumerThread#reassignPartitions 代码片段如下:

for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
  if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
    consumerTmp.seekToBeginning(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
    consumerTmp.seekToEnd(Collections.singletonList(newPartitionState.getKafkaPartitionHandle()));
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
    // the KafkaConsumer by default will automatically seek the consumer position
    // to the committed group offset, so we do not need to do it.
    newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
  } else {
    consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
  }
}

因为是 GROUP_OFFSET 模式 ,所以会调用 newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); 需要说明的是,在 state 里面需要存储的是成功消费的最后一条数据的 offset,但是通过 position 这个方法返回的是下一次应该消费的起始 offset,所以需要减1。这里更新这个的目的是为了 checkpoint 的时候可以正确的拿到 offset。

这种情况由于读取不到上次 checkpoint 的结果,所以依旧是依靠 kafka 自身的机制,即根据__consumer_offsets 记录的内容消费。

  • 第二种情况,checkpoint 可以读取到

这种情况下, subscribedPartitionsToStartOffsets 初始的 offset 就是具体从checkpoint 中恢复的内容,这样 KafkaConsumerThread#reassignPartitions 实际走的分支就是:

consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);

这里加1的原理同上,state 保存的是最后一次成功消费数据的 offset,所以加1才是现在需要开始消费的 offset。

小结:本节介绍了程序启动时,如何确定从哪个 offset 开始消费,下文会继续分析 flink kafka sink 的相关源码。

2.Flink-kafka-sink 源码解析

初始化

通常添加一个 kafka sink 的代码如下:

input.addSink(
   new FlinkKafkaProducer<>(
      "bar",
      new KafkaSerializationSchemaImpl(),
         properties,
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)).name("Example Sink");

初始化执行 env.addSink 的时候会创建 StreamSink 对象,即 StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));这里的 sinkFunction 就是传入的 FlinkKafkaProducer 对象,StreamSink 构造函数中将这个对象传给父类 AbstractUdfStreamOperator 的 userFunction 变量,源码如下:

■ StreamSink.java

public StreamSink(SinkFunction<IN> sinkFunction) {
  super(sinkFunction);
  chainingStrategy = ChainingStrategy.ALWAYS;
}

■ AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}

Task 运行

StreamSink 会调用下面的方法发送数据:

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}
``

也就是实际调用的是 FlinkKafkaProducer#invoke 方法。在 FlinkKafkaProducer 的构造函数中需要指 FlinkKafkaProducer.Semantic,即:

public enum Semantic {
EXACTLY_ONCE,
AT_LEAST_ONCE,
NONE
}


下面就基于3种语义分别说一下总体的向 kafka 发送数据的流程。

**■ Semantic.NONE**

这种方式不会做任何额外的操作,完全依靠 kafka producer 自身的特性,也就是FlinkKafkaProducer#invoke 里面发送数据之后,Flink 不会再考虑 kafka 是否已经正确的收到数据。

transaction.producer.send(record, callback);


**■ Semantic.AT_LEAST_ONCE**

这种语义下,除了会走上面说到的发送数据的流程外,如果开启了 checkpoint 功能,在 FlinkKafkaProducer#snapshotState 中会首先执行父类的 snapshotState方法,里面最终会执行 FlinkKafkaProducer#preCommit。

@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
switch (semantic) {

  case EXACTLY_ONCE:
  case AT_LEAST_ONCE:
     flush(transaction);
     break;
  case NONE:
     break;
  default:
     throw new UnsupportedOperationException("Not implemented semantic");

}
checkErroneous();
}


AT_LEAST_ONCE 会执行了 flush 方法,里面执行了:

transaction.producer.flush();


就是将 send 的数据立即发送给 kafka 服务端,详细含义可以参考 KafkaProducer api:http://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

> flush()  
> Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.  


**■ Semantic.EXACTLY_ONCE**

EXACTLY_ONCE 语义也会执行 send 和 flush 方法,但是同时会开启 kafka producer 的事务机制。FlinkKafkaProducer 中 beginTransaction 的源码如下,可以看到只有是 EXACTLY_ONCE 模式才会真正开始一个事务。

@Override
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
switch (semantic) {

  case EXACTLY_ONCE:
     FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
     producer.beginTransaction();
     return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
  case AT_LEAST_ONCE:
  case NONE:
     // Do not create new producer on each beginTransaction() if it is not necessary
     final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
     if (currentTransaction != null && currentTransaction.producer != null) {
        return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
     }
     return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
  default:
     throw new UnsupportedOperationException("Not implemented semantic");

}
}


和 AT_LEAST_ONCE 另一个不同的地方在于 checkpoint 的时候,会将事务相关信息保存到变量 nextTransactionalIdHintState 中,这个变量存储的信息会作为 checkpoint 中的一部分进行持久化。

if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
checkState(nextTransactionalIdHint != null, "nextTransactionalIdHint must be set for EXACTLY_ONCE");
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;

// If we scaled up, some (unknown) subtask must have created new transactional ids from scratch. In that
// case we adjust nextFreeTransactionalId by the range of transactionalIds that could be used for this
// scaling up.
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {

  nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;

}

nextTransactionalIdHintState.add(new FlinkKafkaProducer.NextTransactionalIdHint(

  getRuntimeContext().getNumberOfParallelSubtasks(),
  nextFreeTransactionalId));

}


> **小结:**本节介绍了 Flink Kafka Producer 的基本实现原理,后续会详细介绍 Flink 在结合 kafka 的时候如何做到端到端的 Exactly Once 语义的。  


**作者介绍:**

吴鹏,亚信科技资深工程师,Apache Flink Contributor。先后就职于中兴,IBM,华为。目前在亚信科技负责实时流处理引擎产品的研发。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
25天前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
6天前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
15 0
|
22天前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
705 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
25天前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
1月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多