各位大佬,有没有Flink CDC同步mongo数据,出现这个问题的?

各位大佬,有没有Flink CDC同步mongo数据,出现这个问题的image.png ?flink1.15,cdc2.3,Exception in thread "main" org.bson.BsonInvalidOperationException: Document does not contain key $clusterTime,这个一直获取不到是什么问题哦?

展开
收起
真的很搞笑 2023-05-23 18:35:00 180 分享 版权
阿里云 AI 助理回答

根据您提供的问题描述,Flink CDC在同步MongoDB数据时出现org.bson.BsonInvalidOperationException: Document does not contain key $clusterTime的异常。以下是针对该问题的详细分析和解决方案:


问题原因

  1. MongoDB版本兼容性问题
    $clusterTime 是 MongoDB 3.6 及以上版本中引入的一个字段,用于支持 Change Stream 特性。如果您的 MongoDB 版本低于 3.6,或者虽然版本符合要求但未正确启用 Change Stream 功能,则可能导致该异常。

  2. MongoDB运行模式不满足要求
    Change Stream 需要 MongoDB 在副本集(Replica Set)或分片集群(Sharded Cluster)架构下运行。如果 MongoDB 运行在单节点模式下,且未初始化为单节点副本集(通过 rs.initiate() 命令),则无法使用 Change Stream,从而导致 $clusterTime 字段缺失。

  3. 权限配置问题
    如果连接 MongoDB 的用户没有足够的权限访问 Change Stream 或相关数据库,则可能导致无法获取 $clusterTime 字段。

  4. Flink CDC连接器配置问题
    Flink CDC 连接器可能未正确配置 hostsusernamepassword 等参数,导致无法正常读取 MongoDB 数据流。


解决方案

1. 检查MongoDB版本

  • 确保 MongoDB 版本为 3.6及以上,建议使用 4.0及以上版本
  • 检查 MongoDB 是否启用了 Change Stream 功能:
    db.runCommand({ getParameter: 1, featureCompatibilityVersion: 1 })
    

    如果返回的 featureCompatibilityVersion 小于 3.6,需要升级 MongoDB 并启用 Change Stream。

2. 确保MongoDB运行在副本集模式

  • 如果 MongoDB 运行在单节点模式下,请将其初始化为单节点副本集:
    rs.initiate()
    

    初始化后,可以通过以下命令确认副本集状态:

    rs.status()
    

3. 检查用户权限

  • 确保连接 MongoDB 的用户具有以下权限:
    • readreadWrite 权限。
    • 对 Change Stream 的访问权限。
  • 如果用户是在特定数据库下创建的,请在 Flink CDC 配置中添加 authSource 参数:
    .option("connection.options", "authSource=用户所在的DB")
    

4. 验证Flink CDC连接器配置

  • 确保 Flink CDC 连接器的配置正确,例如:
    MongoDBSource.builder()
      .hosts("mongo.example.com:27017")
      .username("mongouser")
      .password("mongopasswd")
      .databaseList("testdb")
      .collectionList("testcoll")
      .startupOptions(StartupOptions.initial())
      .deserializer(new JsonDebeziumDeserializationSchema())
      .build();
    
    • hosts: 确保 MongoDB 主机地址和端口正确。
    • username/password: 确保用户名和密码正确,并具有相应权限。
    • databaseList/collectionList: 确保指定的数据库和集合存在。

5. 升级Flink CDC和MongoDB驱动

  • 您提到使用的是 Flink 1.15 和 CDC 2.3,建议检查是否为最新版本。如果不是,请升级到最新版本以修复潜在的兼容性问题。
  • 同时,确保使用的 MongoDB Java Driver 版本与 MongoDB 服务器版本兼容。

其他注意事项

  • 日志排查
    如果问题仍未解决,建议查看 Flink 作业日志和 MongoDB 日志,定位具体的错误信息。
  • 测试环境验证
    在测试环境中模拟相同的配置,逐步排查问题来源。

总结

通过上述步骤,您可以逐步排查并解决 Document does not contain key $clusterTime 的问题。重点在于确保 MongoDB 版本、运行模式、用户权限以及 Flink CDC 配置的正确性。如果问题仍然存在,建议联系阿里云技术支持团队获取进一步帮助。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

大数据领域前沿技术分享与交流,这里不止有技术干货、学习心得、企业实践、社区活动,还有未来。

还有其他疑问?
咨询AI助理