Flink CDC 如何配置可以实现 仅执行全量同步一次后 结束任务。 意思是当作批任务执行一次。

  1. 采用dataStream方式 而非sql方式
  2. 版本 2.1.1+1.13.6

展开
收起
云璟 2023-08-31 16:08:10 176 分享 版权
阿里云 AI 助理回答

在使用 DataStream 方式而非 SQL 方式进行开发时,您需要确保以下关键步骤和配置正确无误。以下是针对 Flink 1.13.6Scala 2.11 版本的详细操作指南:


1. 前提条件

在开始开发之前,请确保满足以下条件: - 已安装并配置好本地开发环境(如 IntelliJ IDEA)。 - 已准备好 Flink 集群,并确认其版本为 1.13.6。 - 已下载并引入所需的依赖包,包括 Flink DataStream Connector 和其他相关依赖。


2. 引入依赖

在 Maven 项目中,您需要在 pom.xml 文件中添加以下依赖项,以支持 DataStream API 的开发:

<dependencies>
    <!-- Flink Core Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.13.6</version>
    </dependency>

    <!-- Flink DataStream Connector for Hologres -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>ververica-connector-hologres</artifactId>
        <version>1.13-vvr-6.0.7</version>
    </dependency>

    <!-- MySQL CDC Connector (如果需要读取 MySQL 数据) -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>ververica-connector-mysql-cdc</artifactId>
        <version>1.13-vvr-6.0.7</version>
    </dependency>

    <!-- JDBC Connector (如果需要写入 ClickHouse 或其他数据库) -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>1.13.6</version>
    </dependency>
</dependencies>

注意:请根据实际使用的连接器类型调整依赖版本号,并确保与 Flink 引擎版本兼容。


3. 配置 DataStream 作业

3.1 读取数据源

使用 DataStream API 读取数据时,可以通过以下方式实现:

  • Hologres 源表: 使用 HologresBulkreadInputFormat 实现类读取 Hologres 表数据。示例代码如下:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
      "jdbc:hologres://<endpoint>:<port>/<dbname>",
      "<username>",
      "<password>",
      "<tablename>"
    );
    
    DataStream<Row> sourceStream = env.createInput(inputFormat);
    
  • MySQL CDC 数据源: 如果需要从 MySQL 数据库读取 Binlog 数据,可以使用 MySqlSource,并显式配置唯一的 server-id

    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
      .hostname("<hostname>")
      .port(3306)
      .databaseList("<database>")
      .tableList("<table>")
      .username("<username>")
      .password("<password>")
      .deserializer(new JsonDebeziumDeserializationSchema())
      .serverId("56000") // 必须为每个作业配置唯一值
      .build();
    
    DataStream<String> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
    

3.2 处理数据

在 DataStream 中,您可以使用 KeyedProcessFunctionProcessFunction 对数据进行自定义处理。例如:

sourceStream
    .keyBy(row -> row.getField(0)) // 按照指定字段分组
    .process(new KeyedProcessFunction<Object, Row, String>() {
        @Override
        public void processElement(Row value, Context ctx, Collector<String> out) {
            // 自定义处理逻辑
            out.collect(value.toString());
        }
    });

重要提示:避免在 processElementonTimer 方法中重复注册 Timer,否则可能导致 Timer 数量急剧膨胀。


4. 写入目标表

4.1 写入 Hologres

通过 DataStream API 写入 Hologres 时,可以使用 HologresSink

HologresSink<Row> hologresSink = HologresSink.builder()
    .setEndpoint("<endpoint>")
    .setDbName("<dbname>")
    .setTableName("<tablename>")
    .setUsername("<username>")
    .setPassword("<password>")
    .build();

sourceStream.addSink(hologresSink);

4.2 写入 ClickHouse

如果需要将数据写入 ClickHouse,可以使用 JdbcSink,并通过批量插入提高性能:

class CkSinkBuilder implements JdbcStatementBuilder<Tuple3<String, Long, Float>> {
    @Override
    public void accept(PreparedStatement ps, Tuple3<String, Long, Float> value) throws SQLException {
        ps.setString(1, value.f0);
        ps.setLong(2, value.f1);
        ps.setFloat(3, value.f2);
    }
}

JdbcSink.sink(
    "INSERT INTO clickhouse_table (col1, col2, col3) VALUES (?, ?, ?)",
    new CkSinkBuilder(),
    JdbcExecutionOptions.builder().withBatchSize(1000).build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:clickhouse://<host>:8123/<database>")
        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
        .withUsername("<username>")
        .withPassword("<password>")
        .build()
);

5. 打包与部署

5.1 本地调试

在本地调试时,请确保使用正确的 Uber JAR 文件(如 ververica-connector-hologres-1.13-vvr-6.0.7-uber.jar),并将其添加到项目的 classpath 中。

5.2 上传与运行

  • 使用 Maven 打包应用程序:
    mvn clean package -DskipTests
    
  • 在 Flink 控制台上传 JAR 包,并配置资源信息后启动作业。

6. 常见问题与解决方案

  • 问题1:运行时找不到连接器相关类。

    • 原因:未正确使用 Uber JAR。
    • 解决方案:确保使用对应版本的 Uber JAR 进行调试。
  • 问题2:缺少常见的 Flink 类。

    • 原因:缺少依赖或未加载 provided 依赖。
    • 解决方案:在 pom.xml 中添加相关依赖,或在 IntelliJ IDEA 中勾选“Add dependencies with provided scope to classpath”。
  • 问题3:运行中报错“Incompatible magic value”。

    • 原因:Uber JAR 与 Connector 版本不一致。
    • 解决方案:选择对应版本的 Connector 和 Uber JAR。

通过以上步骤,您可以成功使用 DataStream 方式开发并运行 Flink 作业。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理