全托管flink-vvp 自定义mongodb-cdc-connector实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: 本文以阿里云全托管FLINK及开源mongodb-cdc-connector为基础进行mongodb数据库的source、sink实践。

1. mongodb 数据库操作

1.1. 增删改查

小白从flink初入mongodb,这里记录了一些基本操作,以及测试数据的写入,变更等语句,大神可以跳过

-- 查看某个databses下面的所有表
show tables;
-------------
-- select部分
-------------
-- 查看所有数据
db.test_sink02.find()
-- 根据筛选条件查看数据
db.test_sink02.find({_id:"40"})
db.test_sink02.find({v:320771142})
-------------
-- insert部分
-------------
-- 直接插入数据
db.test_sink02.insert( { p : 121, v : 232 })
-------------
-- update部分
-------------
-- 匹配v=320771142 将p替换为666 $set可以替换所有类型的字段 $inc只能数字类型
db.test_sink02.update( { v : 320771142 } , { $set : { "p" : "666"} } );
-- 这个有点像upsert 如果根据主键_id=40 没有的话就insert 有的话就update
db.test_sink02.save({_id:"40",count:40,test1:"OK",p:999})
-------------
-- delete部分
-------------
db.test_sink02.remove({_id:"40"})

1.2. mongodb shell

本地mac环境下载mongodb shell

参考:https://www.mongodb.com/docs/mongodb-shell/

下载:https://www.mongodb.com/try/download/shell

安装和使用:https://www.mongodb.com/docs/mongodb-shell/install/#std-label-macos-install-archive

mongosh -host dds-xx-pub.mongodb.rds.aliyuncs.com -port 3717 -u root -p

公网会有链接失败的问题,是ip地址白名单导致的,可以参考:https://help.aliyun.com/document_detail/114304.html?spm=a2c4g.54957.0.0.448929e0YP8QTj#section-6vm-is9-ani

先开放所有的ip地址,本地mongodb shell链接成功后,根据命令查看链接的客户端都有哪些,将白名单加入

use admin
db.runCommand({currentOp: 1, "active" : true})

将输出结果相关的ip放进去,即可锁定本地ip地址。

1.2.1. ip白名单问题求解

!!!这个目前看本地java调试flink,按如上方法添加IP白名单还是成功不了,得开放全ip,不知道是不是flink-mongodb-cdc是通过快照+oplog而不是client session的形式所以生成了别的ip这里并探测不到导致的,希望懂的大佬能给指导下,跪谢。

image.png

1.3. mongodb数据库角色授权

use admin;
db.createRole(
    {
        role: "flinkrole",
        privileges: [{
            // 所有数据库中所有非系统集合的 grant 权限
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
           // 阅读 config.collections 和 config.chunks
           // 用于分片集群快照拆分。
            { role: 'read', db: 'config' }
        ]
    }
);
db.createUser(
  {
      user: '用户名',
      pwd: '密码',
      roles: [
         { role: 'flinkrole', db: 'admin' }
      ]
  }
);

flink cdc 要求的版本确认

db.version() # 查看版本
rs.conf() # 查看protocolVersion

2. mongodb flink sink

阿里云全托管vvp内置的mongodb connector 仅支持sink,这里做了个测试写入

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-19 10:22:59-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE datagen_source (  v INT,  p INT) WITH ('connector'='datagen','rows-per-second'='2');CREATE TEMPORARY TABLE mongodb_sink(  v INT,  p INT) WITH ('connector'='mongodb','database'='sunyf',-- database和uri中如果都指定了之后 以database为准'collection'='test_sink02',-- 其实就是表名 mongo里面叫`数据集合`'uri'='mongodb://root:密码@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx'-- uri flink官网文档样例:mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx-- mongo控制台提供的:mongodb://root:****@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx);INSERTINTO mongodb_sink SELECT v, p FROM datagen_source;

3. mongodb-cdc source

3.1. flink sql on vvp

3.1.1. 下载&上传自定义connector

  1. 参考vvr官方开源文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html
  2. 本次下载的是如下图1所示的2.3.0.jar(release版)
  3. 部分客户看到文档中如`scan.startup.mode`,`scan.startup.timestamp-millis`相关的参数,这里是在2.5版本才有的新特性(2023年7月24日),如果需要使用的话则需要手动编译2.5.0版本(即文档中所提到的snapshot版本),详见图2,图3。

image.png

image.png

image.png

3.1.2. flink-sql 代码

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri-- 需要配置单节点'hosts'='dds-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='test_sink02'-- 这个参数目前看加不加不影响-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.3. cdc日志样例

2023-07-2113:54:19,875 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-D[40,999]2023-07-2113:28:02,844 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-U[40,555]2023-07-2113:28:02,944 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+U[40,999]2023-07-2113:26:59,660 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+I[40,555]

3.1.4. 2.5.0-snapshot编译(2023年7月24日)

下载github源码,编译

cd ~git init
git lfs install
git clone https://github.com/ververica/flink-cdc-connectors.gitcd flink-cdc-connectors
mvn clean install -DskipTests

image.png

上传编译好的最新版本的jar包,路径如下

~/flink-cdc-connectors/flink-sql-connector-mongodb-cdc/target/flink-sql-connector-mongodb-cdc-2.5-SNAPSHOT.jar

我们上传到vvp平台的自定义连接器,选择与编译时相对应的flink版本,可以看到当前connector的properties多达3页,其中也包括我们这次客户需求的scan.startup.mode。

image.png

3.1.5. scan.startup.mode不同模式探究

说明:该mongodb的collection下,仅3条数据,并在两个任务启动后,通过如下的变更语句添加了一条记录

-- mongodb 变更语句db.flink_use.save({_id:41,name:"lisi",age:16})
--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  operation_ts TIMESTAMP_LTZ(3) METADATA FROM'op_ts' VIRTUAL,  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri'hosts'='s-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='flink_use',-- 指定 initial 快照+增量 (默认)-- 指定 latest-offset 不读取快照'scan.startup.mode'='initial'-- 这个参数目前看也可以没有-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  operation_ts TIMESTAMP_LTZ(3),  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.5.1. initial(默认)

快照(3条) + 增量(1条)

image.png

3.1.5.2. latest-offset

仅增量(1条)

image.png

4. flink java on mac IDEA

4.1. 样例代码1

源继承自SourceFunction

4.1.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkQuickStart</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>1.13-vvr-4.0.15</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.1</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-sls</artifactId><version>1.15-vvr-6.0.2-3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!--              <transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers>--></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.1.2. java

packageorg.example;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.api.functions.source.SourceFunction;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importcom.ververica.cdc.connectors.mongodb.MongoDBSource;
// 2023-07-20// 这两个mongodb的目前无法运行// 会卡在如下日志的地方// 2023-07-20 11:56:43,167 [debezium-engine] INFO  com.mongodb.kafka.connect.source.MongoSourceTask             [] - Resuming the change stream after the previous offset: {_data=BsonString{value='8264B8B079000000012B0229296E04'}}// 参考 https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html#id10// flink jar本地和 上传至vvp平台都是一样的publicclassMongoDBSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(conf);
SourceFunction<String>sourceFunction=MongoDBSource.<String>builder()
            .hosts("s-xx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf") // 设置捕获的数据库,支持正则表达式            .collectionList("sunyf.flink_use") //设置捕获的集合,支持正则表达式            .deserializer(newJsonDebeziumDeserializationSchema())
            .build();
// 5分钟一个cpenv.enableCheckpointing(30000*2*5);
env.addSource(sourceFunction)
            .print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序env.execute();
    }
}

4.1.3. 样例数据、日志

image.png

4.2. 样例代码2

4.2.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkMongoDBCDC</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-clients_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.16.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-shaded-guava</artifactId>--><!--      <version>30.1.1-jre-16.1</version>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.2.2. java

packageorg.example;
importcom.ververica.cdc.connectors.mongodb.source.MongoDBSource;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.configuration.RestOptions;
importorg.apache.flink.configuration.WebOptions;
importorg.apache.flink.configuration.ConfigConstants;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
publicclassMongoDBIncrementalSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
conf.setString(RestOptions.BIND_PORT,"8081");
conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log");
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log");
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
MongoDBSource<String>mongoSource=MongoDBSource.<String>builder()
            .hosts("s-xxx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf")
            .collectionList("sunyf.flink_use")
            .deserializer(newJsonDebeziumDeserializationSchema())
//                        .connectionOptions("replicaSet=mgset-69849026")            .build();
// 启用检查点env.enableCheckpointing(30000*2*5);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
            .setParallelism(1)
            .disableChaining()
            .print("sunyf666")
            .setParallelism(1);
env.execute("Print MongoDB Snapshot + Change Stream");
    }
}


4.2.3. 样例数据、日志及UI

image.png

image.png

4.3. 踩坑报错

4.3.1. Prematurely reached end of stream

详见下面的报错,问题原因:网络白名单

15:58:56.414 [cluster-ClusterId{value='64be2f26dd9b4604202b4460', description='null'}-s-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717] INFOorg.mongodb.driver.cluster-Exceptioninmonitorthreadwhileconnectingtoservers-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717com.mongodb.MongoSocketReadException: Prematurelyreachedendofstream

4.3.2. java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilder

详见下面的报错,问题原因:依赖冲突,根据pom的dependency analyzer可以看出,1.13.1版本的flink使用的事guava18,flink-cdc-mongodb使用的是30.1.1。

升级flink版本为1.16.1解决,两类jar包使用的guava版本一致

java.lang.RuntimeException: OneormorefetchershaveencounteredexceptionCausedby: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilderCausedby: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder

image.png


相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
1月前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
43 0
|
1月前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
45 0
|
10天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
33 9
|
20天前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第21天】本文探讨了MongoDB Atlas的核心特性、实践应用及对云原生数据库未来的思考。MongoDB Atlas作为MongoDB的云原生版本,提供全球分布式、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了云原生数据库的未来趋势,如架构灵活性、智能化运维和混合云支持,并分享了实施MongoDB Atlas的最佳实践。
|
21天前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第20天】本文探讨了MongoDB Atlas的核心特性、实践应用及对未来云原生数据库的思考。MongoDB Atlas作为云原生数据库服务,具备全球分布、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了实施MongoDB Atlas的最佳实践和职业心得,展望了云原生数据库的发展趋势。
|
2月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
582 1
Flink CDC:新一代实时数据集成框架
|
2月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
537 14
Flink CDC 在货拉拉的落地与实践
|
3月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
445 13
Flink CDC 在新能源制造业的实践
|
3月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
125 1
|
3月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。