开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Could not perform checkpoint 352 for operator Souc

使用flink14用flinksql实现简单的数据mysql->mysql的实时数据传输,但是总是每隔两天挂掉,报错如上
代码如下
``` public static void main(String[] args) {
final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
/F:\flinkdoris/

    bsEnv.setStateBackend(new FsStateBackend("file:///home/flink-doris/src/checkpoints/sys_dict_item_prod/"));
    //上线要用另一个checkpoint
    // bsEnv.setStateBackend(new FsStateBackend("file:///home/flink-doris/src/checkpoints/sys_dict_item/"));

// bsEnv.setStateBackend(new StateBackend());
// // 每 1000ms 开始一次 checkpoint
bsEnv.enableCheckpointing(1000);

    bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3, // 尝试重启的次数
            Time.of(10, TimeUnit.SECONDS) // 间隔
    ));

//
// // 高级选项:
// 设置模式为精确一次 (这是默认值)
bsEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
// bsEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
// bsEnv.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许一个 checkpoint 进行
bsEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 开启在 job 中止后仍然保留的 externalized checkpoints
// bsEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    bsEnv.setParallelism(1);
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

    Configuration configuration = tEnv.getConfig().getConfiguration();
    configuration.setString("pipeline.name", "EC2Flow-Sys_Dict_Item");

    String sourceSql = "CREATE TABLE sys_dict_item (" +
            "    id varchar(32),"+
            "    parent_id varchar(32),"+
            "    code varchar(64),"+
            "    `value` varchar(64),"+
            "    type int,"+
            "    orderno int," +
            "    is_default boolean," +
            "    PRIMARY KEY (id) NOT ENFORCED" +
            ") WITH (" +
            " 'connector' = 'mysql-cdc'," +
            " 'hostname' = 'xxxx'," +   
            " 'port' = '9306'," +
            " 'username' = 'xxxx'," +
            " 'password' = 'xxxx'," +
            " 'database-name' = 'xxxx'," +
            " 'table-name' = 'xxxxx'" +
            ")";
    tEnv.executeSql(sourceSql);


    String insertSql = "CREATE TABLE sink_sys_dict_item( " +
            "    id varchar(32),"+
            "    parent_id varchar(32),"+
            "    code varchar(64),"+
            "    `value` varchar(64),"+
            "    type int,"+
            "    orderno int," +
            "    is_default boolean," +
            "    PRIMARY KEY (id) NOT ENFORCED" +
            ") WITH ( " +
            "'connector' = 'jdbc',"+
            "'url' = 'jdbc:mysql://xxx:3306/so_v1_jmkx?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
            "'username' = 'xxxxx', " +
            "'password' = 'xxxx', " +
            "'table-name' = 'xxxxx', " +
            "'driver' = 'com.mysql.cj.jdbc.Driver'," +
            "'sink.buffer-flush.max-rows'='1'" +
            ")";
    tEnv.executeSql(insertSql);



    tEnv.executeSql("insert into sink_sys_dict_item(\n" +
            " id                        \n" +
            ",parent_id                 \n" +
            ",code                      \n" +
            ",`value`                   \n" +
            ",type                      \n" +
            ",orderno                   \n" +
            ",is_default                \n" +
            ") \n" +
            "SELECT\n" +
            " id                       \n" +
            ",parent_id                \n" +
            ",code                     \n" +
            ",`value`                  \n" +
            ",type                     \n" +
            ",orderno         \n" +
            ",is_default       \n" +
            "from sys_dict_item");

```

展开
收起
yangxiaoqiang 2024-08-12 09:43:11 107 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载