全托管flink-vvp 自定义mongodb-cdc-connector实践

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 本文以阿里云全托管FLINK及开源mongodb-cdc-connector为基础进行mongodb数据库的source、sink实践。

1. mongodb 数据库操作

1.1. 增删改查

小白从flink初入mongodb,这里记录了一些基本操作,以及测试数据的写入,变更等语句,大神可以跳过

-- 查看某个databses下面的所有表
show tables;
-------------
-- select部分
-------------
-- 查看所有数据
db.test_sink02.find()
-- 根据筛选条件查看数据
db.test_sink02.find({_id:"40"})
db.test_sink02.find({v:320771142})
-------------
-- insert部分
-------------
-- 直接插入数据
db.test_sink02.insert( { p : 121, v : 232 })
-------------
-- update部分
-------------
-- 匹配v=320771142 将p替换为666 $set可以替换所有类型的字段 $inc只能数字类型
db.test_sink02.update( { v : 320771142 } , { $set : { "p" : "666"} } );
-- 这个有点像upsert 如果根据主键_id=40 没有的话就insert 有的话就update
db.test_sink02.save({_id:"40",count:40,test1:"OK",p:999})
-------------
-- delete部分
-------------
db.test_sink02.remove({_id:"40"})

1.2. mongodb shell

本地mac环境下载mongodb shell

参考:https://www.mongodb.com/docs/mongodb-shell/

下载:https://www.mongodb.com/try/download/shell

安装和使用:https://www.mongodb.com/docs/mongodb-shell/install/#std-label-macos-install-archive

mongosh -host dds-xx-pub.mongodb.rds.aliyuncs.com -port 3717 -u root -p

公网会有链接失败的问题,是ip地址白名单导致的,可以参考:https://help.aliyun.com/document_detail/114304.html?spm=a2c4g.54957.0.0.448929e0YP8QTj#section-6vm-is9-ani

先开放所有的ip地址,本地mongodb shell链接成功后,根据命令查看链接的客户端都有哪些,将白名单加入

use admin
db.runCommand({currentOp: 1, "active" : true})

将输出结果相关的ip放进去,即可锁定本地ip地址。

1.2.1. ip白名单问题求解

!!!这个目前看本地java调试flink,按如上方法添加IP白名单还是成功不了,得开放全ip,不知道是不是flink-mongodb-cdc是通过快照+oplog而不是client session的形式所以生成了别的ip这里并探测不到导致的,希望懂的大佬能给指导下,跪谢。

image.png

1.3. mongodb数据库角色授权

use admin;
db.createRole(
    {
        role: "flinkrole",
        privileges: [{
            // 所有数据库中所有非系统集合的 grant 权限
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
           // 阅读 config.collections 和 config.chunks
           // 用于分片集群快照拆分。
            { role: 'read', db: 'config' }
        ]
    }
);
db.createUser(
  {
      user: '用户名',
      pwd: '密码',
      roles: [
         { role: 'flinkrole', db: 'admin' }
      ]
  }
);

flink cdc 要求的版本确认

db.version() # 查看版本
rs.conf() # 查看protocolVersion

2. mongodb flink sink

阿里云全托管vvp内置的mongodb connector 仅支持sink,这里做了个测试写入

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-19 10:22:59-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE datagen_source (  v INT,  p INT) WITH ('connector'='datagen','rows-per-second'='2');CREATE TEMPORARY TABLE mongodb_sink(  v INT,  p INT) WITH ('connector'='mongodb','database'='sunyf',-- database和uri中如果都指定了之后 以database为准'collection'='test_sink02',-- 其实就是表名 mongo里面叫`数据集合`'uri'='mongodb://root:密码@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx'-- uri flink官网文档样例:mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx-- mongo控制台提供的:mongodb://root:****@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx);INSERTINTO mongodb_sink SELECT v, p FROM datagen_source;

3. mongodb-cdc source

3.1. flink sql on vvp

3.1.1. 下载&上传自定义connector

  1. 参考vvr官方开源文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html
  2. 本次下载的是如下图1所示的2.3.0.jar(release版)
  3. 部分客户看到文档中如`scan.startup.mode`,`scan.startup.timestamp-millis`相关的参数,这里是在2.5版本才有的新特性(2023年7月24日),如果需要使用的话则需要手动编译2.5.0版本(即文档中所提到的snapshot版本),详见图2,图3。

image.png

image.png

image.png

3.1.2. flink-sql 代码

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri-- 需要配置单节点'hosts'='dds-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='test_sink02'-- 这个参数目前看加不加不影响-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.3. cdc日志样例

2023-07-2113:54:19,875 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-D[40,999]2023-07-2113:28:02,844 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-U[40,555]2023-07-2113:28:02,944 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+U[40,999]2023-07-2113:26:59,660 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+I[40,555]

3.1.4. 2.5.0-snapshot编译(2023年7月24日)

下载github源码,编译

cd ~git init
git lfs install
git clone https://github.com/ververica/flink-cdc-connectors.gitcd flink-cdc-connectors
mvn clean install -DskipTests

image.png

上传编译好的最新版本的jar包,路径如下

~/flink-cdc-connectors/flink-sql-connector-mongodb-cdc/target/flink-sql-connector-mongodb-cdc-2.5-SNAPSHOT.jar

我们上传到vvp平台的自定义连接器,选择与编译时相对应的flink版本,可以看到当前connector的properties多达3页,其中也包括我们这次客户需求的scan.startup.mode。

image.png

3.1.5. scan.startup.mode不同模式探究

说明:该mongodb的collection下,仅3条数据,并在两个任务启动后,通过如下的变更语句添加了一条记录

-- mongodb 变更语句db.flink_use.save({_id:41,name:"lisi",age:16})
--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  operation_ts TIMESTAMP_LTZ(3) METADATA FROM'op_ts' VIRTUAL,  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri'hosts'='s-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='flink_use',-- 指定 initial 快照+增量 (默认)-- 指定 latest-offset 不读取快照'scan.startup.mode'='initial'-- 这个参数目前看也可以没有-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  operation_ts TIMESTAMP_LTZ(3),  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.5.1. initial(默认)

快照(3条) + 增量(1条)

image.png

3.1.5.2. latest-offset

仅增量(1条)

image.png

4. flink java on mac IDEA

4.1. 样例代码1

源继承自SourceFunction

4.1.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkQuickStart</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>1.13-vvr-4.0.15</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.1</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-sls</artifactId><version>1.15-vvr-6.0.2-3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!--              <transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers>--></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.1.2. java

packageorg.example;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.api.functions.source.SourceFunction;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importcom.ververica.cdc.connectors.mongodb.MongoDBSource;
// 2023-07-20// 这两个mongodb的目前无法运行// 会卡在如下日志的地方// 2023-07-20 11:56:43,167 [debezium-engine] INFO  com.mongodb.kafka.connect.source.MongoSourceTask             [] - Resuming the change stream after the previous offset: {_data=BsonString{value='8264B8B079000000012B0229296E04'}}// 参考 https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html#id10// flink jar本地和 上传至vvp平台都是一样的publicclassMongoDBSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(conf);
SourceFunction<String>sourceFunction=MongoDBSource.<String>builder()
            .hosts("s-xx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf") // 设置捕获的数据库,支持正则表达式            .collectionList("sunyf.flink_use") //设置捕获的集合,支持正则表达式            .deserializer(newJsonDebeziumDeserializationSchema())
            .build();
// 5分钟一个cpenv.enableCheckpointing(30000*2*5);
env.addSource(sourceFunction)
            .print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序env.execute();
    }
}

4.1.3. 样例数据、日志

image.png

4.2. 样例代码2

4.2.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkMongoDBCDC</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-clients_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.16.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-shaded-guava</artifactId>--><!--      <version>30.1.1-jre-16.1</version>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.2.2. java

packageorg.example;
importcom.ververica.cdc.connectors.mongodb.source.MongoDBSource;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.configuration.RestOptions;
importorg.apache.flink.configuration.WebOptions;
importorg.apache.flink.configuration.ConfigConstants;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
publicclassMongoDBIncrementalSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
conf.setString(RestOptions.BIND_PORT,"8081");
conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log");
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log");
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
MongoDBSource<String>mongoSource=MongoDBSource.<String>builder()
            .hosts("s-xxx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf")
            .collectionList("sunyf.flink_use")
            .deserializer(newJsonDebeziumDeserializationSchema())
//                        .connectionOptions("replicaSet=mgset-69849026")            .build();
// 启用检查点env.enableCheckpointing(30000*2*5);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
            .setParallelism(1)
            .disableChaining()
            .print("sunyf666")
            .setParallelism(1);
env.execute("Print MongoDB Snapshot + Change Stream");
    }
}


4.2.3. 样例数据、日志及UI

image.png

image.png

4.3. 踩坑报错

4.3.1. Prematurely reached end of stream

详见下面的报错,问题原因:网络白名单

15:58:56.414 [cluster-ClusterId{value='64be2f26dd9b4604202b4460', description='null'}-s-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717] INFOorg.mongodb.driver.cluster-Exceptioninmonitorthreadwhileconnectingtoservers-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717com.mongodb.MongoSocketReadException: Prematurelyreachedendofstream

4.3.2. java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilder

详见下面的报错,问题原因:依赖冲突,根据pom的dependency analyzer可以看出,1.13.1版本的flink使用的事guava18,flink-cdc-mongodb使用的是30.1.1。

升级flink版本为1.16.1解决,两类jar包使用的guava版本一致

java.lang.RuntimeException: OneormorefetchershaveencounteredexceptionCausedby: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilderCausedby: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder

image.png


相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
1月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
14天前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
1月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
1月前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第21天】本文探讨了MongoDB Atlas的核心特性、实践应用及对云原生数据库未来的思考。MongoDB Atlas作为MongoDB的云原生版本,提供全球分布式、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了云原生数据库的未来趋势,如架构灵活性、智能化运维和混合云支持,并分享了实施MongoDB Atlas的最佳实践。
|
2月前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第20天】本文探讨了MongoDB Atlas的核心特性、实践应用及对未来云原生数据库的思考。MongoDB Atlas作为云原生数据库服务,具备全球分布、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了实施MongoDB Atlas的最佳实践和职业心得,展望了云原生数据库的发展趋势。
|
2月前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
92 9
|
1月前
|
数据采集 运维 搜索推荐
实时计算Flink场景实践
在数字化时代,实时数据处理愈发重要。本文分享了作者使用阿里云实时计算Flink版和流式数据湖仓Paimon的体验,展示了其在电商场景中的应用,包括数据抽取、清洗、关联和聚合,突出了系统的高效、稳定和低延迟特点。
61 0
|
2月前
|
存储 关系型数据库 MySQL
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB区别,适用场景
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB——特点、性能、扩展性、安全性、适用场景比较
|
4天前
|
存储 JSON NoSQL
学习 MongoDB:打开强大的数据库技术大门
MongoDB 是一个基于分布式文件存储的文档数据库,由 C++ 编写,旨在为 Web 应用提供可扩展的高性能数据存储解决方案。它与 MySQL 类似,但使用文档结构而非表结构。核心概念包括:数据库(Database)、集合(Collection)、文档(Document)和字段(Field)。MongoDB 使用 BSON 格式存储数据,支持多种数据类型,如字符串、整数、数组等,并通过二进制编码实现高效存储和传输。BSON 文档结构类似 JSON,但更紧凑,适合网络传输。
30 15
|
12天前
|
存储 NoSQL 关系型数据库
阿里云数据库MongoDB版助力信也科技 打造互联网金融企业样板
我们的风控系统引入阿里云数据库MongoDB版后,解决了特征类字段灵活加减的问题,大大提高了开发效率,极大的提升了业务用户体验,获得了非常好的效果
阿里云数据库MongoDB版助力信也科技 打造互联网金融企业样板