使用flink14用flinksql实现简单的数据mysql->mysql的实时数据传输,但是总是每隔两天挂掉,报错如上
代码如下
``` public static void main(String[] args) {
final EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
/F:\flinkdoris/
bsEnv.setStateBackend(new FsStateBackend("file:///home/flink-doris/src/checkpoints/sys_dict_item_prod/"));
//上线要用另一个checkpoint
// bsEnv.setStateBackend(new FsStateBackend("file:///home/flink-doris/src/checkpoints/sys_dict_item/"));
// bsEnv.setStateBackend(new StateBackend());
// // 每 1000ms 开始一次 checkpoint
bsEnv.enableCheckpointing(1000);
bsEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
//
// // 高级选项:
// 设置模式为精确一次 (这是默认值)
bsEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
// bsEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
// bsEnv.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许一个 checkpoint 进行
bsEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 开启在 job 中止后仍然保留的 externalized checkpoints
// bsEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bsEnv.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("pipeline.name", "EC2Flow-Sys_Dict_Item");
String sourceSql = "CREATE TABLE sys_dict_item (" +
" id varchar(32),"+
" parent_id varchar(32),"+
" code varchar(64),"+
" `value` varchar(64),"+
" type int,"+
" orderno int," +
" is_default boolean," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'xxxx'," +
" 'port' = '9306'," +
" 'username' = 'xxxx'," +
" 'password' = 'xxxx'," +
" 'database-name' = 'xxxx'," +
" 'table-name' = 'xxxxx'" +
")";
tEnv.executeSql(sourceSql);
String insertSql = "CREATE TABLE sink_sys_dict_item( " +
" id varchar(32),"+
" parent_id varchar(32),"+
" code varchar(64),"+
" `value` varchar(64),"+
" type int,"+
" orderno int," +
" is_default boolean," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH ( " +
"'connector' = 'jdbc',"+
"'url' = 'jdbc:mysql://xxx:3306/so_v1_jmkx?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
"'username' = 'xxxxx', " +
"'password' = 'xxxx', " +
"'table-name' = 'xxxxx', " +
"'driver' = 'com.mysql.cj.jdbc.Driver'," +
"'sink.buffer-flush.max-rows'='1'" +
")";
tEnv.executeSql(insertSql);
tEnv.executeSql("insert into sink_sys_dict_item(\n" +
" id \n" +
",parent_id \n" +
",code \n" +
",`value` \n" +
",type \n" +
",orderno \n" +
",is_default \n" +
") \n" +
"SELECT\n" +
" id \n" +
",parent_id \n" +
",code \n" +
",`value` \n" +
",type \n" +
",orderno \n" +
",is_default \n" +
"from sys_dict_item");
```
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。