1. mongodb 数据库操作
1.1. 增删改查
小白从flink初入mongodb,这里记录了一些基本操作,以及测试数据的写入,变更等语句,大神可以跳过
-- 查看某个databses下面的所有表 show tables; ------------- -- select部分 ------------- -- 查看所有数据 db.test_sink02.find() -- 根据筛选条件查看数据 db.test_sink02.find({_id:"40"}) db.test_sink02.find({v:320771142}) ------------- -- insert部分 ------------- -- 直接插入数据 db.test_sink02.insert( { p : 121, v : 232 }) ------------- -- update部分 ------------- -- 匹配v=320771142 将p替换为666 $set可以替换所有类型的字段 $inc只能数字类型 db.test_sink02.update( { v : 320771142 } , { $set : { "p" : "666"} } ); -- 这个有点像upsert 如果根据主键_id=40 没有的话就insert 有的话就update db.test_sink02.save({_id:"40",count:40,test1:"OK",p:999}) ------------- -- delete部分 ------------- db.test_sink02.remove({_id:"40"})
1.2. mongodb shell
本地mac环境下载mongodb shell
参考:https://www.mongodb.com/docs/mongodb-shell/
下载:https://www.mongodb.com/try/download/shell
安装和使用:https://www.mongodb.com/docs/mongodb-shell/install/#std-label-macos-install-archive
mongosh -host dds-xx-pub.mongodb.rds.aliyuncs.com -port 3717 -u root -p
公网会有链接失败的问题,是ip地址白名单导致的,可以参考:https://help.aliyun.com/document_detail/114304.html?spm=a2c4g.54957.0.0.448929e0YP8QTj#section-6vm-is9-ani
先开放所有的ip地址,本地mongodb shell链接成功后,根据命令查看链接的客户端都有哪些,将白名单加入
use admin db.runCommand({currentOp: 1, "active" : true})
将输出结果相关的ip放进去,即可锁定本地ip地址。
1.2.1. ip白名单问题求解
!!!这个目前看本地java调试flink,按如上方法添加IP白名单还是成功不了,得开放全ip,不知道是不是flink-mongodb-cdc是通过快照+oplog而不是client session的形式所以生成了别的ip这里并探测不到导致的,希望懂的大佬能给指导下,跪谢。
1.3. mongodb数据库角色授权
use admin; db.createRole( { role: "flinkrole", privileges: [{ // 所有数据库中所有非系统集合的 grant 权限 resource: { db: "", collection: "" }, actions: [ "splitVector", "listDatabases", "listCollections", "collStats", "find", "changeStream" ] }], roles: [ // 阅读 config.collections 和 config.chunks // 用于分片集群快照拆分。 { role: 'read', db: 'config' } ] } ); db.createUser( { user: '用户名', pwd: '密码', roles: [ { role: 'flinkrole', db: 'admin' } ] } );
flink cdc 要求的版本确认
db.version() # 查看版本 rs.conf() # 查看protocolVersion
2. mongodb flink sink
阿里云全托管vvp内置的mongodb connector 仅支持sink,这里做了个测试写入
--********************************************************************---- Author: sunyf-- Created Time: 2023-07-19 10:22:59-- Description: Write your description here-- Hints: You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE datagen_source ( v INT, p INT) WITH ('connector'='datagen','rows-per-second'='2');CREATE TEMPORARY TABLE mongodb_sink( v INT, p INT) WITH ('connector'='mongodb','database'='sunyf',-- database和uri中如果都指定了之后 以database为准'collection'='test_sink02',-- 其实就是表名 mongo里面叫`数据集合`'uri'='mongodb://root:密码@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx'-- uri flink官网文档样例:mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx-- mongo控制台提供的:mongodb://root:****@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx);INSERTINTO mongodb_sink SELECT v, p FROM datagen_source;
3. mongodb-cdc source
3.1. flink sql on vvp
3.1.1. 下载&上传自定义connector
- 参考vvr官方开源文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html
- 本次下载的是如下图1所示的2.3.0.jar(release版)
- 部分客户看到文档中如`scan.startup.mode`,`scan.startup.timestamp-millis`相关的参数,这里是在2.5版本才有的新特性(2023年7月24日),如果需要使用的话则需要手动编译2.5.0版本(即文档中所提到的snapshot版本),详见图2,图3。
3.1.2. flink-sql 代码
--********************************************************************---- Author: sunyf-- Created Time: 2023-07-20 11:53:46-- Description: Write your description here-- Hints: You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products ( _id STRING,// 必须声明 p int, PRIMARY KEY(_id)NOT ENFORCED ) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri-- 需要配置单节点'hosts'='dds-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='test_sink02'-- 这个参数目前看加不加不影响-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink ( _id STRING,// 必须声明 p int, PRIMARY KEY(_id)NOT ENFORCED )COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink SELECT*FROM products;
3.1.3. cdc日志样例
2023-07-2113:54:19,875 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-D[40,999]2023-07-2113:28:02,844 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-U[40,555]2023-07-2113:28:02,944 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+U[40,999]2023-07-2113:26:59,660 INFO org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+I[40,555]
3.1.4. 2.5.0-snapshot编译(2023年7月24日)
下载github源码,编译
cd ~git init git lfs install git clone https://github.com/ververica/flink-cdc-connectors.gitcd flink-cdc-connectors mvn clean install -DskipTests
上传编译好的最新版本的jar包,路径如下
~/flink-cdc-connectors/flink-sql-connector-mongodb-cdc/target/flink-sql-connector-mongodb-cdc-2.5-SNAPSHOT.jar
我们上传到vvp平台的自定义连接器,选择与编译时相对应的flink版本,可以看到当前connector的properties多达3页,其中也包括我们这次客户需求的scan.startup.mode。
3.1.5. scan.startup.mode不同模式探究
说明:该mongodb的collection下,仅3条数据,并在两个任务启动后,通过如下的变更语句添加了一条记录
-- mongodb 变更语句db.flink_use.save({_id:41,name:"lisi",age:16})
--********************************************************************---- Author: sunyf-- Created Time: 2023-07-20 11:53:46-- Description: Write your description here-- Hints: You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products ( operation_ts TIMESTAMP_LTZ(3) METADATA FROM'op_ts' VIRTUAL, _id STRING,// 必须声明 `name` string, age int, PRIMARY KEY(_id)NOT ENFORCED ) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri'hosts'='s-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='flink_use',-- 指定 initial 快照+增量 (默认)-- 指定 latest-offset 不读取快照'scan.startup.mode'='initial'-- 这个参数目前看也可以没有-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink ( operation_ts TIMESTAMP_LTZ(3), _id STRING,// 必须声明 `name` string, age int, PRIMARY KEY(_id)NOT ENFORCED )COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink SELECT*FROM products;
3.1.5.1. initial(默认)
快照(3条) + 增量(1条)
3.1.5.2. latest-offset
仅增量(1条)
4. flink java on mac IDEA
4.1. 样例代码1
源继承自SourceFunction
4.1.1. pom
<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements. See the NOTICE filedistributed with this work for additional informationregarding copyright ownership. The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License. You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied. See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkQuickStart</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>--><!-- <version>${flink.version}</version>--><!-- </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>1.13-vvr-4.0.15</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.1</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-sls</artifactId><version>1.15-vvr-6.0.2-3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!-- <transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers>--></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>
4.1.2. java
packageorg.example; importorg.apache.flink.configuration.Configuration; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; importorg.apache.flink.streaming.api.functions.source.SourceFunction; importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; importcom.ververica.cdc.connectors.mongodb.MongoDBSource; // 2023-07-20// 这两个mongodb的目前无法运行// 会卡在如下日志的地方// 2023-07-20 11:56:43,167 [debezium-engine] INFO com.mongodb.kafka.connect.source.MongoSourceTask [] - Resuming the change stream after the previous offset: {_data=BsonString{value='8264B8B079000000012B0229296E04'}}// 参考 https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html#id10// flink jar本地和 上传至vvp平台都是一样的publicclassMongoDBSourceExample { publicstaticvoidmain(String[] args) throwsException { Configurationconf=newConfiguration(); StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(conf); SourceFunction<String>sourceFunction=MongoDBSource.<String>builder() .hosts("s-xx-pub.mongodb.rds.aliyuncs.com:3717") .username("root") .password("密码") .databaseList("sunyf") // 设置捕获的数据库,支持正则表达式 .collectionList("sunyf.flink_use") //设置捕获的集合,支持正则表达式 .deserializer(newJsonDebeziumDeserializationSchema()) .build(); // 5分钟一个cpenv.enableCheckpointing(30000*2*5); env.addSource(sourceFunction) .print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序env.execute(); } }
4.1.3. 样例数据、日志
4.2. 样例代码2
4.2.1. pom
<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements. See the NOTICE filedistributed with this work for additional informationregarding copyright ownership. The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License. You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied. See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkMongoDBCDC</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>--><!-- <version>${flink.version}</version>--><!-- <scope>provided</scope>--><!-- </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-clients_${scala.binary.version}</artifactId>--><!-- <version>${flink.version}</version>--><!-- <scope>provided</scope>--><!-- </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.16.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-shaded-guava</artifactId>--><!-- <version>30.1.1-jre-16.1</version>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>--><!-- <version>${flink.version}</version>--><!-- <scope>provided</scope>--><!-- </dependency>--><!-- <dependency>--><!-- <groupId>org.apache.flink</groupId>--><!-- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>--><!-- <version>${flink.version}</version>--><!-- <scope>provided</scope>--><!-- </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>
4.2.2. java
packageorg.example; importcom.ververica.cdc.connectors.mongodb.source.MongoDBSource; importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; importorg.apache.flink.api.common.eventtime.WatermarkStrategy; importorg.apache.flink.configuration.Configuration; importorg.apache.flink.configuration.RestOptions; importorg.apache.flink.configuration.WebOptions; importorg.apache.flink.configuration.ConfigConstants; importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment; publicclassMongoDBIncrementalSourceExample { publicstaticvoidmain(String[] args) throwsException { Configurationconf=newConfiguration(); conf.setString(RestOptions.BIND_PORT,"8081"); conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log"); conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log"); StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); MongoDBSource<String>mongoSource=MongoDBSource.<String>builder() .hosts("s-xxx-pub.mongodb.rds.aliyuncs.com:3717") .username("root") .password("密码") .databaseList("sunyf") .collectionList("sunyf.flink_use") .deserializer(newJsonDebeziumDeserializationSchema()) // .connectionOptions("replicaSet=mgset-69849026") .build(); // 启用检查点env.enableCheckpointing(30000*2*5); env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource") .setParallelism(1) .disableChaining() .print("sunyf666") .setParallelism(1); env.execute("Print MongoDB Snapshot + Change Stream"); } }
4.2.3. 样例数据、日志及UI
4.3. 踩坑报错
4.3.1. Prematurely reached end of stream
详见下面的报错,问题原因:网络白名单
15:58:56.414 [cluster-ClusterId{value='64be2f26dd9b4604202b4460', description='null'}-s-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717] INFOorg.mongodb.driver.cluster-Exceptioninmonitorthreadwhileconnectingtoservers-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717com.mongodb.MongoSocketReadException: Prematurelyreachedendofstream
4.3.2. java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilder
详见下面的报错,问题原因:依赖冲突,根据pom的dependency analyzer可以看出,1.13.1版本的flink使用的事guava18,flink-cdc-mongodb使用的是30.1.1。
升级flink版本为1.16.1解决,两类jar包使用的guava版本一致
java.lang.RuntimeException: OneormorefetchershaveencounteredexceptionCausedby: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilderCausedby: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder