做了如下设置
mEnv.setRuntimeMode(RuntimeExecutionMode.BATCH)
KafkaSource.builder().setBounded(OffsetsInitializer.latest())
代码能正常运行,但是读完kafka数据不退出
flink源码:
/**
* By default the KafkaSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
* and thus never stops until the Flink job fails or is canceled. To let the KafkaSource run in
* {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link
* OffsetsInitializer} to specify the stopping offsets for each partition. When all the
* partitions have reached their stopping offsets, the KafkaSource will then exit.
*
* <p>This method is different from {@link #setUnbounded(OffsetsInitializer)} that after setting
* the stopping offsets with this method, {@link KafkaSource#getBoundedness()} will return
* {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}.
*
* <p>The following {@link OffsetsInitializer} are commonly used and provided out of the box.
* Users can also implement their own {@link OffsetsInitializer} for custom behaviors.
*
* <ul>
* <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets of the partitions when
* the KafkaSource starts to run.
* <li>{@link OffsetsInitializer#committedOffsets()} - stops at the committed offsets of the
* consumer group.
* <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified offsets for each
* partition.
* <li>{@link OffsetsInitializer#timestamp(long)} - stops at the specified timestamp for each
* partition. The guarantee of setting the stopping timestamp is that no Kafka records
* whose {@link org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
* than the given stopping timestamp will be consumed. However, it is possible that some
* records whose timestamp is smaller than the specified stopping timestamp are not
* consumed.
* </ul>
*
* @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to specify the stopping
* offsets.
* @return this KafkaSourceBuilder.
* @see #setUnbounded(OffsetsInitializer)
*/
public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInitializer) {
this.boundedness = Boundedness.BOUNDED;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
return this;
}
如果您在使用Flink 1.16 batch模式读取Kafka任务时遇到了任务不退出的问题,可能是因为您的任务没有正确地设置退出条件。在Flink中,任务的退出条件可以通过设置超时时间或者设置退出条件函数来实现。
如果您想要设置超时时间,可以在任务的代码中设置一个超时时间
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。