各位大佬,有没有Flink CDC同步mongo数据,出现这个问题的
?flink1.15,cdc2.3,Exception in thread "main" org.bson.BsonInvalidOperationException: Document does not contain key $clusterTime,这个一直获取不到是什么问题哦?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您提供的问题描述,Flink CDC在同步MongoDB数据时出现org.bson.BsonInvalidOperationException: Document does not contain key $clusterTime的异常。以下是针对该问题的详细分析和解决方案:
MongoDB版本兼容性问题
$clusterTime 是 MongoDB 3.6 及以上版本中引入的一个字段,用于支持 Change Stream 特性。如果您的 MongoDB 版本低于 3.6,或者虽然版本符合要求但未正确启用 Change Stream 功能,则可能导致该异常。
MongoDB运行模式不满足要求
Change Stream 需要 MongoDB 在副本集(Replica Set)或分片集群(Sharded Cluster)架构下运行。如果 MongoDB 运行在单节点模式下,且未初始化为单节点副本集(通过 rs.initiate() 命令),则无法使用 Change Stream,从而导致 $clusterTime 字段缺失。
权限配置问题
如果连接 MongoDB 的用户没有足够的权限访问 Change Stream 或相关数据库,则可能导致无法获取 $clusterTime 字段。
Flink CDC连接器配置问题
Flink CDC 连接器可能未正确配置 hosts、username、password 等参数,导致无法正常读取 MongoDB 数据流。
db.runCommand({ getParameter: 1, featureCompatibilityVersion: 1 })
如果返回的 featureCompatibilityVersion 小于 3.6,需要升级 MongoDB 并启用 Change Stream。
rs.initiate()
初始化后,可以通过以下命令确认副本集状态:
rs.status()
read 或 readWrite 权限。authSource 参数:
.option("connection.options", "authSource=用户所在的DB")
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
通过上述步骤,您可以逐步排查并解决 Document does not contain key $clusterTime 的问题。重点在于确保 MongoDB 版本、运行模式、用户权限以及 Flink CDC 配置的正确性。如果问题仍然存在,建议联系阿里云技术支持团队获取进一步帮助。