对于Flink CDC,有大佬遇到过这个问题吗?加了表然后从checkpoint启动就报这个

对于Flink CDC,有大佬遇到过这个问题吗?加了表然后从checkpoint启动就报这个 477f926f450d986ae52387002f665b26.png

展开
收起
wenti 2023-01-15 16:38:50 162 分享 版权
1 条回答
写回答
取消 提交回答
  • 原因:

    此错误表示你正在使用的 TableSink(在本例中可能是 Flink CDC 表连接器)没有实现 java.io.Serializable 接口。当 Flink 从检查点恢复时,需要对任务进行序列化,其中包括 TableSink。如果 TableSink 不可序列化,则 Flink 无法恢复任务。

    解决方案:

    有两种方法可以解决此问题:

    使 TableSink 实现 java.io.Serializable 接口: 这是最简单的方法,但是可能需要修改 TableSink 的源代码。
    使用 AggregatingState 或 FoldingState: 这些类是 Flink 提供的序列化状态后端,可以用于存储不可序列化的对象。你可以将 TableSink 状态存储在这些后端中,从而使 TableSink 整体可序列化。
    具体实现:

    方法 1:实现 java.io.Serializable 接口

    在 TableSink 类中添加以下代码:

    public class MyTableSink implements TableSink, java.io.Serializable {
    // ...
    }
    方法 2:使用 AggregatingState 或 FoldingState

    在 TableSink 类中添加以下代码:

    public class MyTableSink implements TableSink {

    private AggregatingState<Long, Long> countState;
    
    @Override
    public void open(Configuration parameters) {
        countState = getRuntimeContext().getAggregatingState(
            new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L));
    }
    
    // ...
    

    }
    在上面的示例中,countState 是一个 AggregatingState,用于存储不可序列化的对象(在本例中为 Long)。

    注意:

    如果 TableSink 只能通过依赖项进行访问,则可能无法直接修改其源代码。在这种情况下,可以使用方法 2。
    AggregatingState 和 FoldingState 仅适用于状态后端,例如 RocksDB 或 StateBackend。

    2024-02-23 15:26:56
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
还有其他疑问?
咨询AI助理