Flink CDC 有没有DEMO看下依赖包?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
Flink CDC 提供了多种使用场景的 DEMO 和依赖包配置方法,以下为具体的参考内容和操作步骤:
在使用 Flink CDC 时,您需要根据作业类型(SQL 或 DataStream)选择合适的依赖包。以下是两种常见的依赖包形式:
flink-sql-connector-xxx.jar
lib
目录即可。flink-connector-xxx.jar
如果您使用的是 DataStream 作业,可以在 pom.xml
文件中添加以下依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${vvr.version}</version>
</dependency>
如果需要排除冲突依赖,可以使用 exclusions
标签:
<dependency>
<groupId>foo</groupId>
<artifactId>bar</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
以下是一些常见的 Flink CDC 使用场景及其 DEMO 配置:
这是一个典型的 Flink CDC 数据同步案例,展示了如何将 MySQL 数据实时同步到 Hologres。
操作步骤: 1. 准备测试数据: - 下载 tpc_ds.sql
、user_db1.sql
等文件并导入 RDS MySQL。 - 在 Hologres 中创建目标数据库 my_user
。 2. 配置白名单: - 获取 Flink 工作空间的 VPC 网段信息。 - 将网段信息添加到 MySQL 和 Hologres 的 IP 白名单中。 3. 编写 SQL 作业:
CREATE TABLE mysql_source (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '<yourHostname>',
'port' = '<yourPort>',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = 'tpc_ds',
'table-name' = 'orders'
);
CREATE TABLE hologres_sink (
id INT,
name STRING
) WITH (
'connector' = 'hologres',
'url' = '<yourHologresUrl>',
'tablename' = 'public.orders',
'username' = '<yourUserName>',
'password' = '<yourPassWord>'
);
INSERT INTO hologres_sink SELECT * FROM mysql_source;
PolarDB PostgreSQL 的 Flink CDC 连接器支持全量快照和增量变更数据的读取。
操作步骤: 1. 前提准备: - 购买 PolarDB PostgreSQL 集群并创建高权限账户。 - 创建源表 shipments
和目标表 shipments_sink
。 2. 编写 SQL 作业:
CREATE TEMPORARY TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
order_time TIMESTAMP,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'polardbo-cdc',
'hostname' = '<yourHostname>',
'port' = '<yourPort>',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = 'flink_source',
'schema-name' = 'public',
'table-name' = 'shipments',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'flink'
);
CREATE TEMPORARY TABLE shipments_sink (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
order_time TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = '<yourJdbcUrl>',
'table-name' = 'shipments_sink'
);
INSERT INTO shipments_sink SELECT * FROM shipments;
阿里云实时计算 Flink 版支持使用社区版 CDC 连接器,但需要注意以下事项: - 社区版 CDC 连接器遵循 Apache 2.0 开源协议,与商业版在服务支持及 SLA 方面存在差异。 - 使用过程中遇到的问题需在开源社区寻求解决方案。
操作步骤: 1. 访问 Apache Flink CDC 页面,下载目标 CDC 连接器的 JAR 包。 2. 登录实时计算控制台,上传 JAR 包并创建自定义连接器。 3. 编写 SQL 作业时,将连接器的名字作为 connector
参数传入。
通过以上 DEMO 和依赖包配置,您可以快速上手 Flink CDC 并实现数据的实时同步。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。