全托管flink-vvp 自定义mongodb-cdc-connector实践

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: 本文以阿里云全托管FLINK及开源mongodb-cdc-connector为基础进行mongodb数据库的source、sink实践。

1. mongodb 数据库操作

1.1. 增删改查

小白从flink初入mongodb,这里记录了一些基本操作,以及测试数据的写入,变更等语句,大神可以跳过

-- 查看某个databses下面的所有表
show tables;
-------------
-- select部分
-------------
-- 查看所有数据
db.test_sink02.find()
-- 根据筛选条件查看数据
db.test_sink02.find({_id:"40"})
db.test_sink02.find({v:320771142})
-------------
-- insert部分
-------------
-- 直接插入数据
db.test_sink02.insert( { p : 121, v : 232 })
-------------
-- update部分
-------------
-- 匹配v=320771142 将p替换为666 $set可以替换所有类型的字段 $inc只能数字类型
db.test_sink02.update( { v : 320771142 } , { $set : { "p" : "666"} } );
-- 这个有点像upsert 如果根据主键_id=40 没有的话就insert 有的话就update
db.test_sink02.save({_id:"40",count:40,test1:"OK",p:999})
-------------
-- delete部分
-------------
db.test_sink02.remove({_id:"40"})

1.2. mongodb shell

本地mac环境下载mongodb shell

参考:https://www.mongodb.com/docs/mongodb-shell/

下载:https://www.mongodb.com/try/download/shell

安装和使用:https://www.mongodb.com/docs/mongodb-shell/install/#std-label-macos-install-archive

mongosh -host dds-xx-pub.mongodb.rds.aliyuncs.com -port 3717 -u root -p

公网会有链接失败的问题,是ip地址白名单导致的,可以参考:https://help.aliyun.com/document_detail/114304.html?spm=a2c4g.54957.0.0.448929e0YP8QTj#section-6vm-is9-ani

先开放所有的ip地址,本地mongodb shell链接成功后,根据命令查看链接的客户端都有哪些,将白名单加入

use admin
db.runCommand({currentOp: 1, "active" : true})

将输出结果相关的ip放进去,即可锁定本地ip地址。

1.2.1. ip白名单问题求解

!!!这个目前看本地java调试flink,按如上方法添加IP白名单还是成功不了,得开放全ip,不知道是不是flink-mongodb-cdc是通过快照+oplog而不是client session的形式所以生成了别的ip这里并探测不到导致的,希望懂的大佬能给指导下,跪谢。

image.png

1.3. mongodb数据库角色授权

use admin;
db.createRole(
    {
        role: "flinkrole",
        privileges: [{
            // 所有数据库中所有非系统集合的 grant 权限
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
           // 阅读 config.collections 和 config.chunks
           // 用于分片集群快照拆分。
            { role: 'read', db: 'config' }
        ]
    }
);
db.createUser(
  {
      user: '用户名',
      pwd: '密码',
      roles: [
         { role: 'flinkrole', db: 'admin' }
      ]
  }
);

flink cdc 要求的版本确认

db.version() # 查看版本
rs.conf() # 查看protocolVersion

2. mongodb flink sink

阿里云全托管vvp内置的mongodb connector 仅支持sink,这里做了个测试写入

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-19 10:22:59-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE datagen_source (  v INT,  p INT) WITH ('connector'='datagen','rows-per-second'='2');CREATE TEMPORARY TABLE mongodb_sink(  v INT,  p INT) WITH ('connector'='mongodb','database'='sunyf',-- database和uri中如果都指定了之后 以database为准'collection'='test_sink02',-- 其实就是表名 mongo里面叫`数据集合`'uri'='mongodb://root:密码@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx'-- uri flink官网文档样例:mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx-- mongo控制台提供的:mongodb://root:****@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx);INSERTINTO mongodb_sink SELECT v, p FROM datagen_source;

3. mongodb-cdc source

3.1. flink sql on vvp

3.1.1. 下载&上传自定义connector

  1. 参考vvr官方开源文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html
  2. 本次下载的是如下图1所示的2.3.0.jar(release版)
  3. 部分客户看到文档中如`scan.startup.mode`,`scan.startup.timestamp-millis`相关的参数,这里是在2.5版本才有的新特性(2023年7月24日),如果需要使用的话则需要手动编译2.5.0版本(即文档中所提到的snapshot版本),详见图2,图3。

image.png

image.png

image.png

3.1.2. flink-sql 代码

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri-- 需要配置单节点'hosts'='dds-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='test_sink02'-- 这个参数目前看加不加不影响-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.3. cdc日志样例

2023-07-2113:54:19,875 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-D[40,999]2023-07-2113:28:02,844 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-U[40,555]2023-07-2113:28:02,944 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+U[40,999]2023-07-2113:26:59,660 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+I[40,555]

3.1.4. 2.5.0-snapshot编译(2023年7月24日)

下载github源码,编译

cd ~git init
git lfs install
git clone https://github.com/ververica/flink-cdc-connectors.gitcd flink-cdc-connectors
mvn clean install -DskipTests

image.png

上传编译好的最新版本的jar包,路径如下

~/flink-cdc-connectors/flink-sql-connector-mongodb-cdc/target/flink-sql-connector-mongodb-cdc-2.5-SNAPSHOT.jar

我们上传到vvp平台的自定义连接器,选择与编译时相对应的flink版本,可以看到当前connector的properties多达3页,其中也包括我们这次客户需求的scan.startup.mode。

image.png

3.1.5. scan.startup.mode不同模式探究

说明:该mongodb的collection下,仅3条数据,并在两个任务启动后,通过如下的变更语句添加了一条记录

-- mongodb 变更语句db.flink_use.save({_id:41,name:"lisi",age:16})
--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  operation_ts TIMESTAMP_LTZ(3) METADATA FROM'op_ts' VIRTUAL,  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri'hosts'='s-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='flink_use',-- 指定 initial 快照+增量 (默认)-- 指定 latest-offset 不读取快照'scan.startup.mode'='initial'-- 这个参数目前看也可以没有-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  operation_ts TIMESTAMP_LTZ(3),  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.5.1. initial(默认)

快照(3条) + 增量(1条)

image.png

3.1.5.2. latest-offset

仅增量(1条)

image.png

4. flink java on mac IDEA

4.1. 样例代码1

源继承自SourceFunction

4.1.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkQuickStart</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>1.13-vvr-4.0.15</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.1</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-sls</artifactId><version>1.15-vvr-6.0.2-3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!--              <transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers>--></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.1.2. java

packageorg.example;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.api.functions.source.SourceFunction;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importcom.ververica.cdc.connectors.mongodb.MongoDBSource;
// 2023-07-20// 这两个mongodb的目前无法运行// 会卡在如下日志的地方// 2023-07-20 11:56:43,167 [debezium-engine] INFO  com.mongodb.kafka.connect.source.MongoSourceTask             [] - Resuming the change stream after the previous offset: {_data=BsonString{value='8264B8B079000000012B0229296E04'}}// 参考 https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html#id10// flink jar本地和 上传至vvp平台都是一样的publicclassMongoDBSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(conf);
SourceFunction<String>sourceFunction=MongoDBSource.<String>builder()
            .hosts("s-xx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf") // 设置捕获的数据库,支持正则表达式            .collectionList("sunyf.flink_use") //设置捕获的集合,支持正则表达式            .deserializer(newJsonDebeziumDeserializationSchema())
            .build();
// 5分钟一个cpenv.enableCheckpointing(30000*2*5);
env.addSource(sourceFunction)
            .print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序env.execute();
    }
}

4.1.3. 样例数据、日志

image.png

4.2. 样例代码2

4.2.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkMongoDBCDC</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-clients_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.16.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-shaded-guava</artifactId>--><!--      <version>30.1.1-jre-16.1</version>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.2.2. java

packageorg.example;
importcom.ververica.cdc.connectors.mongodb.source.MongoDBSource;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.configuration.RestOptions;
importorg.apache.flink.configuration.WebOptions;
importorg.apache.flink.configuration.ConfigConstants;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
publicclassMongoDBIncrementalSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
conf.setString(RestOptions.BIND_PORT,"8081");
conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log");
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log");
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
MongoDBSource<String>mongoSource=MongoDBSource.<String>builder()
            .hosts("s-xxx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf")
            .collectionList("sunyf.flink_use")
            .deserializer(newJsonDebeziumDeserializationSchema())
//                        .connectionOptions("replicaSet=mgset-69849026")            .build();
// 启用检查点env.enableCheckpointing(30000*2*5);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
            .setParallelism(1)
            .disableChaining()
            .print("sunyf666")
            .setParallelism(1);
env.execute("Print MongoDB Snapshot + Change Stream");
    }
}


4.2.3. 样例数据、日志及UI

image.png

image.png

4.3. 踩坑报错

4.3.1. Prematurely reached end of stream

详见下面的报错,问题原因:网络白名单

15:58:56.414 [cluster-ClusterId{value='64be2f26dd9b4604202b4460', description='null'}-s-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717] INFOorg.mongodb.driver.cluster-Exceptioninmonitorthreadwhileconnectingtoservers-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717com.mongodb.MongoSocketReadException: Prematurelyreachedendofstream

4.3.2. java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilder

详见下面的报错,问题原因:依赖冲突,根据pom的dependency analyzer可以看出,1.13.1版本的flink使用的事guava18,flink-cdc-mongodb使用的是30.1.1。

升级flink版本为1.16.1解决,两类jar包使用的guava版本一致

java.lang.RuntimeException: OneormorefetchershaveencounteredexceptionCausedby: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilderCausedby: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder

image.png


相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
13天前
|
消息中间件 分布式计算 大数据
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源
28 0
|
13天前
|
分布式计算 监控 大数据
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction
39 0
|
1月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
440 1
Flink CDC:新一代实时数据集成框架
|
1月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
497 14
Flink CDC 在货拉拉的落地与实践
|
2月前
|
Oracle 关系型数据库 新能源
Flink CDC 在新能源制造业的实践
本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
432 13
Flink CDC 在新能源制造业的实践
|
2月前
|
SQL 数据库 流计算
Flink CDC数据读取问题之一致性如何解决
Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
102 1
|
2月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
SQL JSON 缓存
玳数科技集成 Flink CDC 3.0 的实践
本文投稿自玳数科技工程师杨槐老师,介绍了 Flink CDC 3.0 与 ChunJun 框架在玳数科技的集成实践。
579 7
玳数科技集成 Flink CDC 3.0 的实践
|
2月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
202 2
|
2月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
242 1