各位大佬,flink cdc有查询进度的接口吗,比如开启全量+增量模式,如何知道目前的进度百分比?

各位大佬,flink cdc有查询进度的接口吗,比如开启全量+增量模式,如何知道目前的进度百分比?image.png

展开
收起
真的很搞笑 2023-07-13 13:21:45 218 分享 版权
3 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在 Flink CDC 中,可以通过 REST API 来查询任务的进度信息。Flink CDC 的 REST API 提供了丰富的接口,可以查询任务的状态、进度、统计信息等等。

    如果使用 Flink CDC 中的全量+增量模式进行数据同步,可以通过以下接口来查询任务的进度信息:

    查询任务的状态信息:可以通过 /jobs/:jobid 接口来查询任务的状态信息,其中 :jobid 是任务的 ID。例如:

    apache
    Copy
    GET /jobs/8fd8f4b80f4b4b4f8d21179c6d42f9c0 HTTP/1.1
    返回的结果中包含了任务的状态信息,例如:

    json
    Copy
    {
    "jid": "8fd8f4b80f4b4b4f8d21179c6d42f9c0",
    "name": "MyJob",
    "state": "RUNNING",
    "start-time": 1628547014333,
    "end-time": -1,
    "duration": 1976073,
    "now": 1628548980406,
    "timestamps": {
    "CREATED": 1628547014333,
    "SCHEDULED": 1628547014349,
    "RUNNING": 1628547014351
    }
    }
    查询任务的统计信息:可以通过 /jobs/:jobid/vertices/:vertexid 接口来查询任务的统计信息,其中 :jobid 是任务的 ID,:vertexid 是任务的算子 ID。例如:

    apache
    Copy
    GET /jobs/8fd8f4b80f4b4b4f8d21179c6d42f9c0/vertices/49fd3ef689f6c1d7c4ab2a4ab35bf2a4 HTTP/1.1
    返回的结果中包含了任务的统计信息,例如:

    json
    Copy
    {
    "id": "49fd3ef689f6c1d7c4ab2a4ab35bf2a4",
    "name": "MongoDB CDC Source",
    "parallelism": 1,
    "status": {
    "isBackPressured": false,
    "numBytesInLocal": 0,
    "numBytesInRemote": 0,
    "numBytesOut": 0,
    "numBytesOutRecovered": 0,
    "numRecordsInLocal": 1000,
    "numRecordsInRemote": 0,
    "numRecordsOut": 1000,
    "numRecordsOutRecovered": 0
    }
    }
    在 Flink CDC 中,全量+增量模式的数据同步通常是通过两个算子实现的:一个用于全量导入数据,另一个用于增量更新数据。

    2023-07-29 22:45:03
    赞同 展开评论
  • 目前,Flink CDC(Changelog Data Capture)还没有内置的查询进度接口来获取全量+增量模式的进度百分比。但是,从 Flink 1.18 版本开始,引入了 Enumerator 功能,该功能可以用于获取数据源的元信息和进度报告。

    使用 Enumerator,您可以通过以下步骤获取 Flink CDC 的进度百分比:

    1. 更新到 Flink 1.18 版本:确保您的 Flink 环境已经升级到 1.18 版本或更高版本。

    2. 创建并配置 CDC 连接器:在您的 Flink CDC 应用程序中,创建并配置 CDC 连接器,指定全量+增量模式。例如,可以使用类似如下的代码配置 CDC 连接器:

       java    // 创建 CDC 连接器    MySQLSource<MyRecord> source = MySQLSource.<MyRecord>builder()        .hostname("localhost")        .port(3306)        .database("my_database")        .table("my_table")        .startupMode(StartupMode.INITIAL_FROM_LATEST_OFFSET)        .build();        // 将 CDC 连接器作为 Source 添加到 Flink 程序中    DataStream<MyRecord> stream = env.addSource(source);    

    3. 使用 Enumerator 获取进度报告:根据 Flink 1.18 的文档,您可以使用 Enumerator API 来获取数据源的进度报告。具体实现方式和代码示例,请参考 Flink 文档中关于 Enumerator 的部分,并查看您所使用的具体 CDC 连接器的文档和示例代码。

    请注意,以上信息基于 Flink 1.18 版本及以上版本。如果您正在使用较低版本的 Flink,可能暂时无法获取全量+增量模式的进度百分比。建议您关注 Flink 的更新和官方文档,以获取最新的功能和解决方案。

    2023-07-29 21:42:27
    赞同 展开评论
  • 这个得等新版本,1.18版本才支持enumerator的 指标汇报,ps:这里我说的是Flink 引擎的版本,此回答整理自钉群“Flink CDC 社区”

    2023-07-13 16:04:40
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理