各位,我 kafkasource 设置了但是在flink中没生效?

各位,我 kafkasource 设置了 isolation.level = read_committed,但是在flink中没生效image.png image.png 各位碰到过吗

展开
收起
真的很搞笑 2023-05-16 17:10:39 251 分享 版权
1 条回答
写回答
取消 提交回答
  • 存在即是合理

    可能是因为 Flink 默认使用的是 ReadUncommitted 隔离级别,而不是 ReadCommitted。可以试试通过以下方式来强制使用 ReadCommitted:

    1. 在 KafkaSource 中设置 auto.offset.reset 为 latest,这样 Flink 将从最新的偏移量开始读取消息。
    2. 在 Flink 程序中设置 stream.execution.isolation.level 为 read_committed。
    3. 在 Flink 程序中使用 FlinkKafkaConsumer 的 withTransactionalProperties() 方法,并将 auto.offset.reset 和 isolation.level 参数传递给它。例如:
    FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties -> {
        properties.setAutoOffsetReset(AutoOffsetReset.latest);
        properties.setIsolationLevel(Connection.IsolationLevel.READ_COMMITTED);
    });
    consumer.withTransactionalProperties(properties -> {
        properties.setAutoOffsetReset(AutoOffsetReset.latest);
        properties.setIsolationLevel(Connection.IsolationLevel.READ_COMMITTED);
    }).subscribe(data -> System.out.println(data));
    
    2023-05-18 13:34:06
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理