开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink mongodb cdc 2.4是不是有bug呀?这里也没有这个类呀?

问题1:flink mongodb cdc 2.4是不是有bug呀?这里也没有这个类呀?image.png
源码上面也是这样呀image.png
image.png
是我打开方式不对吗
问题2:这个问题是因为我导的包有问题吗?flink的版本是1.6image.png

展开
收起
真的很搞笑 2023-07-13 12:13:21 65 0
3 条回答
写回答
取消 提交回答
  • 北京阿里云ACE会长

    在Flink CDC 2.4.0版本中,并没有提供针对MongoDB的CDC模块。因此,如果您需要实现MongoDB的增量数据抓取和数据同步等功能,需要自己编写相关代码,并集成到Flink任务中。

    通常情况下,实现MongoDB的CDC功能需要使用MongoDB的oplog,即操作日志。oplog记录了MongoDB中所有的数据变更操作,包括插入、更新和删除等。通过解析oplog,可以实现增量数据抓取和数据同步等功能。

    下面是一个示例代码,演示如何使用MongoDB的oplog实现增量数据抓取和数据同步:

    java
    Copy
    MongoClient mongoClient = new MongoClient("localhost", 27017);
    MongoDatabase db = mongoClient.getDatabase("mydb");
    MongoCollection collection = db.getCollection("mycollection");

    // 获取oplog集合
    MongoCollection oplog = db.getCollection("oplog.rs");

    // 构造查询条件,过滤出指定集合的操作日志
    Bson filter = Filters.and(
    Filters.eq("ns", "mydb.mycollection"),
    Filters.exists("o")
    );

    // 获取操作日志的游标
    FindIterable cursor = oplog.find(filter)
    .sort(new Document("$natural", 1))
    .cursorType(CursorType.TailableAwait);

    while (true) {
    // 获取下一个操作日志
    Document oplogEntry = cursor.tryNext();

    if (oplogEntry != null) {
        // 解析操作日志,获取变更的数据
        Document document = (Document) oplogEntry.get("o");
        // TODO: 将变更的数据转换为Flink的数据流,并进行处理
    } else {
        // 如果没有新的操作日志,则休眠一段时间
        Thread.sleep(100);
    }
    

    }
    在上述示例中,我们首先获取MongoDB的oplog集合,并通过查询条件过滤出指定集合的操作日志。然后,我们使用tailable cursor的方式获取操作日志的游标,并不断读取下一个操作日志。在每次读取操作日志时,我们解析操作日志,获取变更的数据,并将其转换为Flink的数据流,以供下游任务使用。

    2023-07-30 09:37:23
    赞同 展开评论 打赏
  • 在 Flink CDC 中,针对 MongoDB 的 CDC 连接器确实存在一些版本和包名的变化。根据您提供的信息,可以回答如下:

    1. 问题1:Flink MongoDB CDC 2.4 版本是否有 bug?    根据您提供的截图,显示无法找到 MongoDbSource 类,这可能是由于包名或类路径的变化导致的。但从截图上看不出具体错误信息。建议您检查以下几点:    - 确认您使用的是正确的 Flink MongoDB CDC 版本,并根据该版本的文档和示例进行操作。    - 检查引入的包和依赖项是否正确,并确保版本兼容性。    - 查看类路径和包名是否与您当前的项目结构匹配。

    2. 问题2:导入的包是否存在问题?    从您提供的截图来看,显示导入的包为 flink-connector-mongodb,而您使用的 Flink 版本为 1.6。请注意,Flink 1.6 版本的 MongoDB Connector 包名为 flink-connector-mongodb_2.11(对应 Scala 2.11)。因此,请确保您导入的包名与您正在使用的 Flink 版本相匹配。

    如果您仍然遇到问题,建议您提供更多详细信息,例如完整的错误日志、代码示例以及相关的依赖项和版本等。这将有助于更准确地定位和解决问题。

    另外,对于 MongoDB CDC 的使用,建议查阅相关文档和示例,并参考 Flink 社区或相关技术论坛上的讨论,以获取更多关于特定版本和问题的信息和解决方案。

    2023-07-29 22:28:24
    赞同 展开评论 打赏
  • 回答1:这个类不是不是cdc包下的啊。包名都不一样的,是flink-connector-base包下的,image.png
    回答2:,此回答整理自钉群“Flink CDC 社区”

    2023-07-13 15:53:22
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载