开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinkcdc,用哪个序列化器,才能拿到binlog里面的数据操作时间?

5abe0b7abc41452a86bd8def646672fd.png
flinkcdc,用哪个序列化器,才能拿到binlog里面的数据操作时间?

展开
收起
小易01 2023-07-19 17:56:52 96 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,可以使用自定义的反序列化器来解析 CDC 数据源的数据,以获取 binlog 中的数据操作时间。根据您的需求,可以使用不同的反序列化器来实现。
    如果您使用的是 Flink CDC 内置的反序列化器,可以通过在反序列化器中添加对应的字段,来获取 binlog 中的数据操作时间。例如,如果您使用的是 MySQL CDC 数据源,并且使用了内置的 MySQLRecordDeserializationSchema,可以在 schema 中增加对应的字段来获取 binlog 中的数据操作时间。示例代码如下:
    java
    Copy
    public class MyRecordDeserializationSchema implements DebeziumDeserializationSchema {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<MyRecord> collector) throws Exception {
        Struct value = (Struct) sourceRecord.value();
        String id = value.getString("id");
        String name = value.getString("name");
        int age = value.getInt32("age");
        long timestamp = sourceRecord.timestamp(); // 获取 binlog 中的数据操作时间
        collector.collect(new MyRecord(id, name, age, timestamp));
    }
    
    @Override
    public TypeInformation<MyRecord> getProducedType() {
        return TypeInformation.of(MyRecord.class);
    }
    

    }
    在这个例子中,使用 sourceRecord.timestamp() 方法来获取 binlog 中的数据操作时间,并将其添加到 MyRecord 对象中。这样,在 Flink 程序中就可以获取到从 CDC 数据源中读取的每条数据的操作时间。

    2023-07-29 19:54:31
    赞同 展开评论 打赏
  • 意中人就是我呀!

    直接把这个mysqlsource输出b3865e1057a788a18a673eaebb2319d7.png
    flink-connector-mysql-cdc包自带。a11faa2cccb92b98fc145fc48d1fcb70.png
    f05c6094a5fad1438b4a8deceda80463.png
    这样启动拿到的就是最开始binlog了,你试试这样启动,你就能看到差异了。
    此回答整理至钉群“Flink CDC 社区”。"

    2023-07-19 18:57:53
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
PolarDB-X 2.0 全局 Binlog 与备份恢复能 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载