开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大佬们,Flink 中我mysql-cdc,想取当时数据在mysql中产生的时间,怎么取,ts_?

大佬们,Flink 中我mysql-cdc,想取当时数据在mysql中产生的时间,怎么取,ts_ms 取的好像是cdc执行的时间?

展开
收起
真的很搞笑 2023-07-03 15:16:00 200 0
2 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 MySQL 中使用 CDC(Change Data Capture)技术进行数据变更抓取时,可以通过获取 binlog 中的 timestamp 来获取数据变更的时间。在 Flink 中使用 MySQL-CDC 抓取到的数据中,可以通过以下两种方式获取数据变更的时间:

    使用 Flink-Debezium 库:Flink-Debezium 库是 Flink 官方提供的一个 Debezium Connector,可以方便地将 CDC 数据源集成到 Flink 程序中。在 Flink-Debezium 库中,可以通过 Debezium 的 SourceRecord 中的 timestamp 字段获取数据变更的时间,该字段是 binlog 中的时间戳,表示数据变更的时间。

    使用 CDC 数据中的时间戳字段:在 MySQL-CDC 抓取到的数据中,可以通过查看 CDC 数据的具体格式和字段定义,来确定数据变更的时间戳字段。通常情况下,CDC 数据中会包含一个时间戳字段,例如 binlog 中的 timestamp 字段或者 MySQL 5.6 以上版本中的 row_update_time 字段。可以通过读取该字段来获取数据变更的时间。

    2023-07-30 09:39:50
    赞同 展开评论 打赏
  • 在 Flink 中使用 MySQL CDC,要获取数据在 MySQL 中产生的时间,可以使用 source 函数提供的元信息(metadata)来获取特定字段的值。

    默认情况下,当使用 Flink CDC 连接到 MySQL 数据源时,会将每条记录的变更时间(cdc执行的时间)写入到字段 ts_ms 中。如果您想获取 MySQL 中实际产生数据的时间,需要在 MySQL 表中有相应的字段记录数据的生成时间,并确保该字段的值能够传递到 Flink 的数据流中。

    假设 MySQL 表有一个名为 create_time 的字段来记录数据的生成时间,您可以通过以下示例代码来获取该字段的值:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    // 创建 MySQL CDC 数据源
    DataStream<Row> mysqlCdcStream = env.addSource(new MySqlCdcSource(...));
    
    // 将 create_time 字段作为事件时间戳
    DataStream<Row> withEventTimeStream = mysqlCdcStream.assignTimestampsAndWatermarks(
      WatermarkStrategy.<Row>forMonotonousTimestamps()
        .withTimestampAssigner((event, timestamp) -> {
          // 获取 create_time 字段的值作为事件时间戳
          long eventTime = event.getField("create_time").toTimestamp().getTime();
          return eventTime;
        })
    );
    
    // 在后续操作中可以使用事件时间戳
    withEventTimeStream
      .keyBy(...)
      .window(...)
      .process(...);
    
    env.execute("MySQL CDC");
    

    在上述示例中,通过 assignTimestampsAndWatermarks 方法设置了事件时间戳,并指定了 create_time 字段的值作为事件时间戳。然后,在后续操作中可以使用该事件时间戳进行窗口计算等操作。

    需要根据实际情况修改代码,确保字段名称和数据类型与实际表结构一致。

    2023-07-30 09:41:22
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    搭建电商项目架构连接MySQL 立即下载
    搭建4层电商项目架构,实战连接MySQL 立即下载
    PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

    相关镜像