开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

sink 消费 到 MySQL, 数据库表里面已经设置了 自增主键, flink 里面,如何 操作?

如题

展开
收起
游客3oewgrzrf6o5c 2022-06-30 18:29:44 810 0
1 条回答
写回答
取消 提交回答
  • 十分耕耘,一定会有一分收获!

    楼主你好,如果MySQL数据库表中已经设置了自增主键,那么在Flink中操作应该遵循以下步骤:

    1. 将MySQL中的自增主键字段设置为Flink数据流中的字段之一,通常选择作为流的第一个字段。

    2. 在Flink中使用JDBC Sink将数据写入MySQL表中。在Sink的构造函数中,将MySQL表中的自增主键设置为primary key并启用{@code ignoreUpdates}选项。这意味着Sink将忽略来自Flink数据流的主键冲突并将其视为重复数据。

    例如,以下代码片段演示了如何将Flink数据流中的数据插入到MySQL表中,同时忽略来自流中的主键冲突。

    DataStream<MyData> stream = ...;
    
    stream.addSink(
        JdbcSink.sink(
            "INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)",
            (ps, data) -> {
                ps.setLong(1, data.getId()); // 使用MySQL表中的自增主键
                ps.setString(2, data.getName());
                ps.setInt(3, data.getValue());
            },
            JdbcExecutionOptions.builder()
                .withBatchSize(1000)
                .withMaxRetries(3)
                .withTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)
                .build(),
            JdbcConnectionOptions.builder()
                .withUrl("jdbc:mysql://localhost:3306/my_db")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("user")
                .withPassword("password")
                .build(),
            JdbcStatementBuilderOptions.builder()
                .withKeyFields("id") // 指定主键字段
                .withIgnoreParseErrors()
                .withParameterTypes(Types.BIGINT, Types.VARCHAR, Types.INTEGER)
                .build()
        )
    );
    

    在这个例子中,假设MySQL表定义如下:

    CREATE TABLE my_table (
      id BIGINT AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(50),
      value INT
    );
    

    注意这个例子中的withKeyFields("id")选项指定主键字段为"id"。这将防止来自流的重复数据插入到表中。

    2023-07-31 09:17:33
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    DTCC 2022大会集锦《云原生一站式数据库技术与实践》 立即下载
    阿里云瑶池数据库精要2022版 立即下载
    2022 DTCC-阿里云一站式数据库上云最佳实践 立即下载

    相关镜像