Flink CDC 有没有DEMO看下依赖包?

Flink CDC 有没有DEMO看下依赖包?

展开
收起
真的很搞笑 2023-09-19 20:29:27 60 分享 版权
阿里云 AI 助理回答

Flink CDC 提供了多种使用场景的 DEMO 和依赖包配置方法,以下为具体的参考内容和操作步骤:


1. Flink CDC 的依赖包配置

在使用 Flink CDC 时,您需要根据作业类型(SQL 或 DataStream)选择合适的依赖包。以下是两种常见的依赖包形式:

  • flink-sql-connector-xxx.jar

    • 这是一个 FAT JAR,包含了连接器的所有依赖,并将第三方库通过 Shade 打包。
    • 适用于 SQL 作业,只需将该 JAR 包放入 lib 目录即可。
  • flink-connector-xxx.jar

    • 仅包含连接器的核心代码,不包含其依赖。
    • 适用于 DataStream 作业,需要手动管理依赖并处理冲突。

示例:MySQL CDC 的 Maven 依赖

如果您使用的是 DataStream 作业,可以在 pom.xml 文件中添加以下依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>${vvr.version}</version>
</dependency>

如果需要排除冲突依赖,可以使用 exclusions 标签:

<dependency>
    <groupId>foo</groupId>
    <artifactId>bar</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2. Flink CDC 的 DEMO 示例

以下是一些常见的 Flink CDC 使用场景及其 DEMO 配置:

(1) MySQL 到 Hologres 的实时同步

这是一个典型的 Flink CDC 数据同步案例,展示了如何将 MySQL 数据实时同步到 Hologres。

操作步骤: 1. 准备测试数据: - 下载 tpc_ds.sqluser_db1.sql 等文件并导入 RDS MySQL。 - 在 Hologres 中创建目标数据库 my_user。 2. 配置白名单: - 获取 Flink 工作空间的 VPC 网段信息。 - 将网段信息添加到 MySQL 和 Hologres 的 IP 白名单中。 3. 编写 SQL 作业

CREATE TABLE mysql_source (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '<yourHostname>',
    'port' = '<yourPort>',
    'username' = '<yourUserName>',
    'password' = '<yourPassWord>',
    'database-name' = 'tpc_ds',
    'table-name' = 'orders'
);

CREATE TABLE hologres_sink (
    id INT,
    name STRING
) WITH (
    'connector' = 'hologres',
    'url' = '<yourHologresUrl>',
    'tablename' = 'public.orders',
    'username' = '<yourUserName>',
    'password' = '<yourPassWord>'
);

INSERT INTO hologres_sink SELECT * FROM mysql_source;

(2) PolarDB PostgreSQL 的 CDC 同步

PolarDB PostgreSQL 的 Flink CDC 连接器支持全量快照和增量变更数据的读取。

操作步骤: 1. 前提准备: - 购买 PolarDB PostgreSQL 集群并创建高权限账户。 - 创建源表 shipments 和目标表 shipments_sink。 2. 编写 SQL 作业

CREATE TEMPORARY TABLE shipments (
    shipment_id INT,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN,
    order_time TIMESTAMP,
    PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
    'connector' = 'polardbo-cdc',
    'hostname' = '<yourHostname>',
    'port' = '<yourPort>',
    'username' = '<yourUserName>',
    'password' = '<yourPassWord>',
    'database-name' = 'flink_source',
    'schema-name' = 'public',
    'table-name' = 'shipments',
    'decoding.plugin.name' = 'pgoutput',
    'slot.name' = 'flink'
);

CREATE TEMPORARY TABLE shipments_sink (
    shipment_id INT,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN,
    order_time TIMESTAMP
) WITH (
    'connector' = 'jdbc',
    'url' = '<yourJdbcUrl>',
    'table-name' = 'shipments_sink'
);

INSERT INTO shipments_sink SELECT * FROM shipments;

3. 社区版 CDC 的使用

阿里云实时计算 Flink 版支持使用社区版 CDC 连接器,但需要注意以下事项: - 社区版 CDC 连接器遵循 Apache 2.0 开源协议,与商业版在服务支持及 SLA 方面存在差异。 - 使用过程中遇到的问题需在开源社区寻求解决方案。

操作步骤: 1. 访问 Apache Flink CDC 页面,下载目标 CDC 连接器的 JAR 包。 2. 登录实时计算控制台,上传 JAR 包并创建自定义连接器。 3. 编写 SQL 作业时,将连接器的名字作为 connector 参数传入。


4. 重要提醒

  • 依赖冲突问题:确保作业 JAR 包中不包含不必要的依赖(如 Flink、Hadoop 和 log4j),否则可能导致冲突。
  • 版本兼容性:本地开发依赖的 Flink 版本需与部署环境保持一致。
  • SLA 保障:社区版 CDC 连接器不提供 SLA 保障,请谨慎使用。

通过以上 DEMO 和依赖包配置,您可以快速上手 Flink CDC 并实现数据的实时同步。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理