Flink CDC 同步 MySQL 数据写入paimon,写入时间戳字段会毫秒补0,mysql数据2023-05-19 08:59:12 / cdc to paimon数据2023-05-19 08:59:12.000 ,如何解决?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink CDC(Change Data Capture)在同步MySQL数据到Paimon时,可能会遇到时间戳字段毫秒补0的情况。这是因为Flink CDC默认将时间戳字段转换为精确到毫秒的时间戳,而MySQL的时间戳字段可能只精确到秒。这种情况下,毫秒部分会被补零。
要解决这个问题,可以采取以下几种方法:
TIMESTAMP(3)或DATETIME(3),其中数字3表示毫秒精度。TO_TIMESTAMP_LTZ函数将时间戳转换为精确到秒的格式。CREATE TABLE mysql_source (
  id INT,
  ts TIMESTAMP(3), -- 注意这里的毫秒精度
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'database-name' = 'testdb',
  'table-name' = 'test_table',
  'username' = 'root',
  'password' = 'password',
  'scan.startup.mode' = 'initial'
);
CREATE TABLE paimon_sink (
  id INT,
  ts TIMESTAMP(3), -- 注意这里的毫秒精度
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'paimon',
  'path' = 'hdfs://localhost:9000/paimon/test_table',
  'sink.format' = 'json'
);
INSERT INTO paimon_sink
SELECT
  id,
  TO_TIMESTAMP_LTZ(ts, 3) AS ts -- 转换为毫秒精度
FROM mysql_source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CdcToPaimon {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // Define MySQL CDC source
        String jdbcUrl = "jdbc:mysql://localhost:3306/testdb";
        String username = "root";
        String password = "password";
        tEnv.executeSql(
                "CREATE TABLE mysql_source (" +
                        "id INT," +
                        "ts TIMESTAMP(3)," + // 注意这里的毫秒精度
                        "PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (" +
                        "'connector' = 'mysql-cdc'," +
                        "'hostname' = 'localhost'," +
                        "'port' = '3306'," +
                        "'database-name' = 'testdb'," +
                        "'table-name' = 'test_table'," +
                        "'username' = '" + username + "'," +
                        "'password' = '" + password + "'," +
                        "'scan.startup.mode' = 'initial'" +
                        ")"
        );
        // Define Paimon sink
        tEnv.executeSql(
                "CREATE TABLE paimon_sink (" +
                        "id INT," +
                        "ts TIMESTAMP(3)," + // 注意这里的毫秒精度
                        "PRIMARY KEY (id) NOT ENFORCED" +
                        ") WITH (" +
                        "'connector' = 'paimon'," +
                        "'path' = 'hdfs://localhost:9000/paimon/test_table'," +
                        "'sink.format' = 'json'" +
                        ")"
        );
        Table sourceTable = tEnv.from("mysql_source");
        // 使用Java API处理时间戳字段
        DataStream<Tuple2<Integer, String>> dataStream = tEnv.toAppendStream(sourceTable, Tuple2.class);
        DataStream<Tuple2<Integer, String>> processedStream = dataStream.map(tuple -> {
            // 这里可以添加逻辑来处理时间戳字段
            // 例如:转换为毫秒精度
            return tuple; // 返回处理后的数据
        });
        // 将处理后的数据写入Paimon
        processedStream.addSink(JdbcSink.sink(
                "INSERT INTO test_table (id, ts) VALUES (?, ?)",
                new JdbcStatementBuilder<Tuple2<Integer, String>>() {
                    @Override
                    public void accept(PreparedStatement stmt, Tuple2<Integer, String> value) throws SQLException {
                        stmt.setInt(1, value.f0);
                        stmt.setTimestamp(2, Timestamp.valueOf(value.f1));
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(jdbcUrl)
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(username)
                        .withPassword(password)
                        .build()
        ));
        env.execute("Flink CDC to Paimon");
    }
}
flink.sql.connectors.mysql-cdc.time-zone=UTC
flink.sql.connectors.mysql-cdc.timestamp-type=TIMESTAMP_LTZ
通过上述方法之一或组合使用,应该可以解决Flink CDC同步MySQL数据到Paimon时时间戳字段毫秒补0的问题。请根据您的具体情况选择合适的方法进行调整。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。