开发者社区> 问答> 正文

flink1.16 batch模式读取kafka任务不退出

做了如下设置

  1. mEnv.setRuntimeMode(RuntimeExecutionMode.BATCH)

  2. KafkaSource.builder().setBounded(OffsetsInitializer.latest())

代码能正常运行,但是读完kafka数据不退出

flink源码:
/**

 * By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
 * and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in
 * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link
 * OffsetsInitializer} to specify the stopping offsets for each partition. When all the
 * partitions have reached their stopping offsets, the KafkaSource will then exit.
 *
 * <p>This method is different from {@link #setUnbounded(OffsetsInitializer)} that after setting
 * the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will return
 * {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}.
 *
 * <p>The following {@link OffsetsInitializer} are commonly used and provided out of the box.
 * Users can also implement their own {@link OffsetsInitializer} for custom behaviors.
 *
 * <ul>
 *   <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when
 *       the KafkaSource starts to run.
 *   <li>{@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the
 *       consumer group.
 *   <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each
 *       partition.
 *   <li>{@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each
 *       partition. The guarantee of setting the stopping timestamp is that no Kafka records
 *       whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
 *       than the given stopping timestamp will be consumed. However, it is possible that some
 *       records whose timestamp is smaller than the specified stopping timestamp are not
 *       consumed.
 * </ul>
 *
 * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping
 *     offsets.
 * @return this KafkaSourceBuilder.
 * @see #setUnbounded(OffsetsInitializer)
 */
public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
    this.boundedness = Boundedness.BOUNDED;
    this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
    return this;
}

展开
收起
fmajtyzgacjxo 2023-09-18 11:53:40 326 0
1 条回答
写回答
取消 提交回答
  • 如果您在使用Flink 1.16 batch模式读取Kafka任务时遇到了任务不退出的问题,可能是因为您的任务没有正确地设置退出条件。在Flink中,任务的退出条件可以通过设置超时时间或者设置退出条件函数来实现。
    如果您想要设置超时时间,可以在任务的代码中设置一个超时时间

    2023-09-18 18:27:54
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载