全托管flink-vvp 自定义mongodb-cdc-connector实践

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 本文以阿里云全托管FLINK及开源mongodb-cdc-connector为基础进行mongodb数据库的source、sink实践。

1. mongodb 数据库操作

1.1. 增删改查

小白从flink初入mongodb,这里记录了一些基本操作,以及测试数据的写入,变更等语句,大神可以跳过

-- 查看某个databses下面的所有表
show tables;
-------------
-- select部分
-------------
-- 查看所有数据
db.test_sink02.find()
-- 根据筛选条件查看数据
db.test_sink02.find({_id:"40"})
db.test_sink02.find({v:320771142})
-------------
-- insert部分
-------------
-- 直接插入数据
db.test_sink02.insert( { p : 121, v : 232 })
-------------
-- update部分
-------------
-- 匹配v=320771142 将p替换为666 $set可以替换所有类型的字段 $inc只能数字类型
db.test_sink02.update( { v : 320771142 } , { $set : { "p" : "666"} } );
-- 这个有点像upsert 如果根据主键_id=40 没有的话就insert 有的话就update
db.test_sink02.save({_id:"40",count:40,test1:"OK",p:999})
-------------
-- delete部分
-------------
db.test_sink02.remove({_id:"40"})

1.2. mongodb shell

本地mac环境下载mongodb shell

参考:https://www.mongodb.com/docs/mongodb-shell/

下载:https://www.mongodb.com/try/download/shell

安装和使用:https://www.mongodb.com/docs/mongodb-shell/install/#std-label-macos-install-archive

mongosh -host dds-xx-pub.mongodb.rds.aliyuncs.com -port 3717 -u root -p

公网会有链接失败的问题,是ip地址白名单导致的,可以参考:https://help.aliyun.com/document_detail/114304.html?spm=a2c4g.54957.0.0.448929e0YP8QTj#section-6vm-is9-ani

先开放所有的ip地址,本地mongodb shell链接成功后,根据命令查看链接的客户端都有哪些,将白名单加入

use admin
db.runCommand({currentOp: 1, "active" : true})

将输出结果相关的ip放进去,即可锁定本地ip地址。

1.2.1. ip白名单问题求解

!!!这个目前看本地java调试flink,按如上方法添加IP白名单还是成功不了,得开放全ip,不知道是不是flink-mongodb-cdc是通过快照+oplog而不是client session的形式所以生成了别的ip这里并探测不到导致的,希望懂的大佬能给指导下,跪谢。

image.png

1.3. mongodb数据库角色授权

use admin;
db.createRole(
    {
        role: "flinkrole",
        privileges: [{
            // 所有数据库中所有非系统集合的 grant 权限
            resource: { db: "", collection: "" },
            actions: [
                "splitVector",
                "listDatabases",
                "listCollections",
                "collStats",
                "find",
                "changeStream" ]
        }],
        roles: [
           // 阅读 config.collections 和 config.chunks
           // 用于分片集群快照拆分。
            { role: 'read', db: 'config' }
        ]
    }
);
db.createUser(
  {
      user: '用户名',
      pwd: '密码',
      roles: [
         { role: 'flinkrole', db: 'admin' }
      ]
  }
);

flink cdc 要求的版本确认

db.version() # 查看版本
rs.conf() # 查看protocolVersion

2. mongodb flink sink

阿里云全托管vvp内置的mongodb connector 仅支持sink,这里做了个测试写入

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-19 10:22:59-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************--CREATE TEMPORARY TABLE datagen_source (  v INT,  p INT) WITH ('connector'='datagen','rows-per-second'='2');CREATE TEMPORARY TABLE mongodb_sink(  v INT,  p INT) WITH ('connector'='mongodb','database'='sunyf',-- database和uri中如果都指定了之后 以database为准'collection'='test_sink02',-- 其实就是表名 mongo里面叫`数据集合`'uri'='mongodb://root:密码@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx'-- uri flink官网文档样例:mongodb://root:xxxxx@dds-xxxxxx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xxxxxxx-- mongo控制台提供的:mongodb://root:****@dds-xx.mongodb.rds.aliyuncs.com:3717,dds-xx.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-xx);INSERTINTO mongodb_sink SELECT v, p FROM datagen_source;

3. mongodb-cdc source

3.1. flink sql on vvp

3.1.1. 下载&上传自定义connector

  1. 参考vvr官方开源文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html
  2. 本次下载的是如下图1所示的2.3.0.jar(release版)
  3. 部分客户看到文档中如`scan.startup.mode`,`scan.startup.timestamp-millis`相关的参数,这里是在2.5版本才有的新特性(2023年7月24日),如果需要使用的话则需要手动编译2.5.0版本(即文档中所提到的snapshot版本),详见图2,图3。

image.png

image.png

image.png

3.1.2. flink-sql 代码

--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri-- 需要配置单节点'hosts'='dds-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='test_sink02'-- 这个参数目前看加不加不影响-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  _id STRING,// 必须声明
  p int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.3. cdc日志样例

2023-07-2113:54:19,875 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-D[40,999]2023-07-2113:28:02,844 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>-U[40,555]2023-07-2113:28:02,944 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+U[40,999]2023-07-2113:26:59,660 INFO  org.apache.flink.api.common.functions.util.PrintSinkOutputWriter[]- sunyf666->>+I[40,555]

3.1.4. 2.5.0-snapshot编译(2023年7月24日)

下载github源码,编译

cd ~git init
git lfs install
git clone https://github.com/ververica/flink-cdc-connectors.gitcd flink-cdc-connectors
mvn clean install -DskipTests

image.png

上传编译好的最新版本的jar包,路径如下

~/flink-cdc-connectors/flink-sql-connector-mongodb-cdc/target/flink-sql-connector-mongodb-cdc-2.5-SNAPSHOT.jar

我们上传到vvp平台的自定义连接器,选择与编译时相对应的flink版本,可以看到当前connector的properties多达3页,其中也包括我们这次客户需求的scan.startup.mode。

image.png

3.1.5. scan.startup.mode不同模式探究

说明:该mongodb的collection下,仅3条数据,并在两个任务启动后,通过如下的变更语句添加了一条记录

-- mongodb 变更语句db.flink_use.save({_id:41,name:"lisi",age:16})
--********************************************************************---- Author:         sunyf-- Created Time:   2023-07-20 11:53:46-- Description:    Write your description here-- Hints:          You can use SET statements to modify the configuration--********************************************************************---- 在 Flink SQL 中注册 MongoDB 表 `products`CREATE TEMPORARY TABLE products (  operation_ts TIMESTAMP_LTZ(3) METADATA FROM'op_ts' VIRTUAL,  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
) WITH ('connector'='mongodb-cdc',-- 这里不能是高可用的那个uri'hosts'='s-xx.mongodb.rds.aliyuncs.com:3717','username'='root','password'='密码','database'='sunyf','collection'='flink_use',-- 指定 initial 快照+增量 (默认)-- 指定 latest-offset 不读取快照'scan.startup.mode'='initial'-- 这个参数目前看也可以没有-- ,'connection.options' = 'replicaSet=mgset-69737988');CREATE TEMPORARY TABLE print_sink
(  operation_ts TIMESTAMP_LTZ(3),  _id STRING,// 必须声明
  `name` string,  age int,  PRIMARY KEY(_id)NOT ENFORCED
)COMMENT '__comment'WITH ('connector'='print','print-identifier'='sunyf666->');-- 从 `products` 集合中读取快照和更改事件insertinto print_sink
SELECT*FROM products;

3.1.5.1. initial(默认)

快照(3条) + 增量(1条)

image.png

3.1.5.2. latest-offset

仅增量(1条)

image.png

4. flink java on mac IDEA

4.1. 样例代码1

源继承自SourceFunction

4.1.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkQuickStart</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>1.13-vvr-4.0.15</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.1.1</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-sls</artifactId><version>1.15-vvr-6.0.2-3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><!--              <transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers>--></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.1.2. java

packageorg.example;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
importorg.apache.flink.streaming.api.functions.source.SourceFunction;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importcom.ververica.cdc.connectors.mongodb.MongoDBSource;
// 2023-07-20// 这两个mongodb的目前无法运行// 会卡在如下日志的地方// 2023-07-20 11:56:43,167 [debezium-engine] INFO  com.mongodb.kafka.connect.source.MongoSourceTask             [] - Resuming the change stream after the previous offset: {_data=BsonString{value='8264B8B079000000012B0229296E04'}}// 参考 https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc%28ZH%29.html#id10// flink jar本地和 上传至vvp平台都是一样的publicclassMongoDBSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(conf);
SourceFunction<String>sourceFunction=MongoDBSource.<String>builder()
            .hosts("s-xx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf") // 设置捕获的数据库,支持正则表达式            .collectionList("sunyf.flink_use") //设置捕获的集合,支持正则表达式            .deserializer(newJsonDebeziumDeserializationSchema())
            .build();
// 5分钟一个cpenv.enableCheckpointing(30000*2*5);
env.addSource(sourceFunction)
            .print().setParallelism(1); // 对 sink 使用并行度 1 以保持消息顺序env.execute();
    }
}

4.1.3. 样例数据、日志

image.png

4.2. 样例代码2

4.2.1. pom

<!--Licensed to the Apache Software Foundation (ASF) under oneor more contributor license agreements.  See the NOTICE filedistributed with this work for additional informationregarding copyright ownership.  The ASF licenses this fileto you under the Apache License, Version 2.0 (the"License"); you may not use this file except in compliancewith the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,software distributed under the License is distributed on an"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANYKIND, either express or implied.  See the License for thespecific language governing permissions and limitationsunder the License.--><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkMongoDBCDC</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-clients_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.16.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-shaded-guava</artifactId>--><!--      <version>30.1.1-jre-16.1</version>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!--    <dependency>--><!--      <groupId>org.apache.flink</groupId>--><!--      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>--><!--      <version>${flink.version}</version>--><!--      <scope>provided</scope>--><!--    </dependency>--><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.3.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.example.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>


4.2.2. java

packageorg.example;
importcom.ververica.cdc.connectors.mongodb.source.MongoDBSource;
importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;
importorg.apache.flink.configuration.Configuration;
importorg.apache.flink.configuration.RestOptions;
importorg.apache.flink.configuration.WebOptions;
importorg.apache.flink.configuration.ConfigConstants;
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
publicclassMongoDBIncrementalSourceExample {
publicstaticvoidmain(String[] args) throwsException {
Configurationconf=newConfiguration();
conf.setString(RestOptions.BIND_PORT,"8081");
conf.setString(WebOptions.LOG_PATH,"tmp/log/job.log");
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY,"tmp/log/job.log");
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
MongoDBSource<String>mongoSource=MongoDBSource.<String>builder()
            .hosts("s-xxx-pub.mongodb.rds.aliyuncs.com:3717")
            .username("root")
            .password("密码")
            .databaseList("sunyf")
            .collectionList("sunyf.flink_use")
            .deserializer(newJsonDebeziumDeserializationSchema())
//                        .connectionOptions("replicaSet=mgset-69849026")            .build();
// 启用检查点env.enableCheckpointing(30000*2*5);
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBIncrementalSource")
            .setParallelism(1)
            .disableChaining()
            .print("sunyf666")
            .setParallelism(1);
env.execute("Print MongoDB Snapshot + Change Stream");
    }
}


4.2.3. 样例数据、日志及UI

image.png

image.png

4.3. 踩坑报错

4.3.1. Prematurely reached end of stream

详见下面的报错,问题原因:网络白名单

15:58:56.414 [cluster-ClusterId{value='64be2f26dd9b4604202b4460', description='null'}-s-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717] INFOorg.mongodb.driver.cluster-Exceptioninmonitorthreadwhileconnectingtoservers-2ze7067a7f9ed974-pub.mongodb.rds.aliyuncs.com:3717com.mongodb.MongoSocketReadException: Prematurelyreachedendofstream

4.3.2. java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilder

详见下面的报错,问题原因:依赖冲突,根据pom的dependency analyzer可以看出,1.13.1版本的flink使用的事guava18,flink-cdc-mongodb使用的是30.1.1。

升级flink版本为1.16.1解决,两类jar包使用的guava版本一致

java.lang.RuntimeException: OneormorefetchershaveencounteredexceptionCausedby: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/util/concurrent/ThreadFactoryBuilderCausedby: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder

image.png


相关文章
|
6月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
607 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
4月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
258 9
Flink在B站的大规模云原生实践
|
5月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
527 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
5月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
294 9
网易游戏 Flink 云原生实践
|
7月前
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
3月前
|
NoSQL MongoDB 数据库
数据库数据恢复—MongoDB数据库数据恢复案例
MongoDB数据库数据恢复环境: 一台操作系统为Windows Server的虚拟机上部署MongoDB数据库。 MongoDB数据库故障: 工作人员在MongoDB服务仍然开启的情况下将MongoDB数据库文件拷贝到其他分区,数据复制完成后将MongoDB数据库原先所在的分区进行了格式化操作。 结果发现拷贝过去的数据无法使用。管理员又将数据拷贝回原始分区,MongoDB服务仍然无法使用,报错“Windows无法启动MongoDB服务(位于 本地计算机 上)错误1067:进程意外终止。”
|
3月前
|
缓存 NoSQL Linux
在CentOS 7系统中彻底移除MongoDB数据库的步骤
以上步骤完成后,MongoDB应该会从您的CentOS 7系统中被彻底移除。在执行上述操作前,请确保已经备份好所有重要数据以防丢失。这些步骤操作需要一些基本的Linux系统管理知识,若您对某一步骤不是非常清楚,请先进行必要的学习或咨询专业人士。在执行系统级操作时,推荐在实施前创建系统快照或备份,以便在出现问题时能够恢复到原先的状态。
319 79
|
3月前
|
存储 NoSQL MongoDB
MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
209 8
MongoDB数据库详解-针对大型分布式项目采用的原因以及基础原理和发展-卓伊凡|贝贝|莉莉
|
2月前
|
运维 NoSQL 容灾
告别运维噩梦:手把手教你将自建 MongoDB 平滑迁移至云数据库
程序员为何逃离自建MongoDB?扩容困难、运维复杂、高可用性差成痛点。阿里云MongoDB提供分钟级扩容、自动诊断与高可用保障,助力企业高效运维、降本增效,实现数据库“无感运维”。
|
6月前
|
NoSQL MongoDB 数据库
数据库数据恢复——MongoDB数据库服务无法启动的数据恢复案例
MongoDB数据库数据恢复环境: 一台Windows Server操作系统虚拟机上部署MongoDB数据库。 MongoDB数据库故障: 管理员在未关闭MongoDB服务的情况下拷贝数据库文件。将MongoDB数据库文件拷贝到其他分区后,对MongoDB数据库所在原分区进行了格式化操作。格式化完成后将数据库文件拷回原分区,并重新启动MongoDB服务。发现服务无法启动并报错。

热门文章

最新文章

推荐镜像

更多