开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问flink-connector-jdbc在sink时,如何设置批量写入?

尝试了sink.buffer-flush.max-rows,感觉没生效。

展开
收起
游客6vdkhpqtie2h2 2022-10-01 10:34:33 3172 1
17 条回答
写回答
取消 提交回答
  • 阿里云实时计算 Flink 支持使用 flink-connector-jdbc 连接器将数据从 Flink 程序写入到关系型数据库中,通过设置批量写入可以提高写入性能,降低写入延迟。在 flink-connector-jdbc 中,可以通过以下方式设置批量写入:

    1. 在 JDBC sink 的构造函数中,使用 BatchPreparedStatementSetter 对象实现批量写入。BatchPreparedStatementSetter 可以实现在 PreparedStatement 中设置批量写入的参数。
    JdbcSink.sink(
        "INSERT INTO table (column1, column2) VALUES (?, ?)",
        new BatchPreparedStatementSetter<Tuple2<String, Integer>>() {
            @Override
            public void setValues(PreparedStatement ps, Tuple2<String, Integer> tuple) throws SQLException {
                ps.setString(1, tuple.f0);
                ps.setInt(2, tuple.f1);
                ps.addBatch();
            }
    
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                // do nothing
            }
        },
        // other config
    )
    
    1. 在 JDBC sink 的构造函数中,使用 BatchSize 参数设置批量写入的大小。BatchSize 指定每次写入的数据条数,默认值为 5000。
    JdbcSink.sink(
        "INSERT INTO table (column1, column2) VALUES (?, ?)",
        new JdbcStatementBuilderImpl<>(),
        new JdbcExecutionOptions.Builder()
            .withBatchSize(1000)
            .build(),
        // other config
    )
    

    设置批量写入可以提高写入性能,但也可能会导致数据不一致的问题。因此,在设置批量写入时,需要根据具体业务场景进行权衡和测试。

    2023-05-07 22:54:56
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在使用 Flink 提供的 JDBC Connector 时,可以通过设置 batchSize 属性来实现批量写入。具体操作如下:

    1. 引入依赖

    在项目的 pom.xml 文件中添加 JDBC Connector 的依赖:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    1. 创建 JDBC Sink

    在创建 JDBC Sink 时,可以通过 JDBCOutputFormatBuildersetBatchIntervalMillis 方法设置批量写入的时间间隔,单位是毫秒。可以根据实际场景调整时间间隔和批量大小:

    JDBCOutputFormatBuilder builder = new JDBCOutputFormatBuilder()
            .setDrivername(driverName)
            .setDBUrl(dbURL)
            .setUsername(userName)
            .setPassword(password)
            .setQuery(insertQuery)
            .setBatchIntervalMillis(batchIntervalMillis)    // 设置批量写入的时间间隔
            .setBatchSize(batchSize);                       // 设置批量大小
    
    JDBCOutputFormat outputFormat = builder.finish();
    
    JDBCTableSink jdbcSink = JDBCTableSink.builder()
            .setOutputFormat(outputFormat)
            .build();
    

    其中,batchSize 属性可以设置批量写入的大小,即将多个数据行打包成一个批次进行写入。注意,这个值需要根据你的数据量和实际场景进行调整,过小可能会影响写入性能,过大可能会导致内存溢出。

    1. 将 Sink 应用到 Flink 流计算中

    将 JDBC Sink 应用到 Flink 流计算中,例如:

    DataStream<MyPojo> inputStream = ...;
    
    inputStream.addSink(jdbcSink);
    

    在应用到 Flink 流计算中时,可以通过 setFlushOnCheckpoint(true) 方法设置在Checkpoint时也要进行数据写入,保证数据的完整性和可靠性。

    2023-05-05 20:16:17
    赞同 展开评论 打赏
  • 在使用flink-connector-jdbc进行数据sink时,可以通过设置JDBCOutputFormat的batchSize参数来控制批量写入的大小。batchSize参数表示每次写入的记录数,可以根据需要进行调整。例如:

    JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername(driver)
        .setDBUrl(dbUrl)
        .setUsername(username)
        .setPassword(password)
        .setQuery(insertQuery)
        .setBatchInterval(batchInterval)
        .setBatchSize(batchSize)
        .finish();
    

    其中,batchSize参数可以设置为一个大于1的整数,表示每次写入的记录数。需要注意的是,设置的batchSize过大可能会导致内存溢出,需要根据实际情况进行调整。

    2023-05-02 07:46:40
    赞同 展开评论 打赏
  • 在使用 Flink 的 JDBC sink 时,可以通过配置 batchSize 来设置批量写入的大小。batchSize 表示一次性向 JDBC 批量提交多少条记录,可以设置为整数值。

    例如,以下代码片段使用 JDBC sink 向 MySQL 数据库中批量写入数据,每次提交 100 条记录:

    DataStream&lt;MyData&gt; stream = ...;
    
    FlinkJdbcSink&lt;MyData&gt; sink = JdbcSink.&lt;MyData&gt;batch(
            "INSERT INTO myTable (col1, col2, col3) VALUES (?, ?, ?)",
            (ps, data) -&gt; {
                ps.setString(1, data.getField1());
                ps.setInt(2, data.getField2());
                ps.setLong(3, data.getField3());
            },
            JdbcExecutionOptions.builder()
                    .withBatchSize(100)
                    .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://localhost:3306/myDb")
                    .withDriverName("com.mysql.jdbc.Driver")
                    .withUsername("myUser")
                    .withPassword("myPassword")
                    .build());
    
    stream.addSink(sink);
    

    在上述示例中,使用了 FlinkJdbcSink 的静态方法 batch,并传入了 batchSize 参数。该参数被设置为 100,因此会每次向数据库批量提交 100 条记录。

    注意,此时需要保证配置的 batchSize 不超过 JDBC 驱动的最大批量提交大小,否则可能会出现异常。通常情况下,可以根据系统性能和数据量大小综合考虑一个适当的 batchSize 值。

    2023-04-27 21:37:33
    赞同 展开评论 打赏
  • 云端行者觅知音, 技术前沿我独行。 前言探索无边界, 阿里风光引我情。

    官方文档提供了2个方式。

    1、BatchIntervalChecker

    BatchIntervalChecker是一个用于检查批量写入时间间隔的工具类。您可以使用 BatchIntervalChecker来设置批量写入的时间间隔。BatchIntervalChecker会在每个批次写入之前检查时间间隔是否已经达到,如果达到了,就会触发批量写入操作。

    2、BatchSizeTrigger

    BatchSizeTrigger是一个用于检查批量写入大小的工具类。您可以使用BatchSizeTrigger来设置批量写入的大小。BatchSizeTrigger会在每个批次写入之前检查写入的数据量是否已经达到,如果达到了,就会触发批量写入操作。

    具体使用哪种方式取决于您的需求和场景。如果您的数据量较小,可以使用BatchIntervalChecker来设置时间间隔;如果您的数据量较大,可以使用 BatchSizeTrigger来设置批量写入大小。

    2023-04-27 13:02:07
    赞同 展开评论 打赏
  • 从事java行业9年至今,热爱技术,热爱以博文记录日常工作,csdn博主,座右铭是:让技术不再枯燥,让每一位技术人爱上技术

    Flink JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持,如果需要配置批量写入可以设置sink.buffer-flush.max-rows 大于0 ,flush数据前,缓存记录的最大值。以及设置sink.buffer-flush.interval 大于0,flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。同时需要注意字段类型映射,参考文档:文档

    2023-04-26 18:05:54
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    在Flink SQL连接MySQL等关系型数据库时,通常会使用Flink官方提供的JDBC connector。当想要在sink阶段进行批量写入时,可以考虑以下几个方面。

    配置Sink 除了 sink.buffer-flush.max-rows,还需要配置sink.buffer-flush.interval和sink.buffer-flush.max-size等参数,这些参数共同控制了一个批次的多少和大小。具体参数含义可以参考官方文档(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#jdbc-sink)。

    数据类型和批量写入 如果sink数据列的类型与表中的数据类型不匹配,可能也会导致写入失败,因此需要确保被写入的数据类型一致。此外,对于大数据量的写入,Flink仅使用JDBC批量编译器API,其中大批量数据的处理是在JDBC驱动程序内部完成的,但该功能仅在JDBC驱动程序支持时才会生效,因此需要确保JDBC驱动程序支持批量处理。

    确认实时更新 最后,确保你正在连接到手动维护的活动数据库,而不是连接到备用副本,否则写入可能会延迟,因为备用副本可能不会实时更新。

    希望这些信息有所帮助,并解决您的问题。

    2023-04-26 12:30:45
    赞同 展开评论 打赏
  • 设置最大缓冲行数 设置最大缓冲时间 默认情况下,JDBC Sink 发送完成后会自动关闭连接,并提交事务。

    还有一些与性能相关的调整方面也可以考虑做出:比如,修改输入并行度(即 max.parallelism),或是调整容器资源限制等都可能受益于增加批量处理参数来保证Flink Job稳定运行。

    2023-04-25 14:27:37
    赞同 展开评论 打赏
  • Flink 中的 JDBC Connector 可以用于读取和写入关系型数据库。在使用 Flink 的 JDBC Sink 进行批量写入时,需要设置 sink.buffer-flush.max-rows 和 sink.buffer-flush.interval 两个参数,分别表示最大缓冲行数和最大缓冲时间。当满足其中一个条件时,就会将数据批量写入 Jdbc。

    1. 设置最大缓冲行数

      在你的 Flink SQL 程序里找到对应的 JDBC sink 并进行如下配置:

    sink.buffer-flush.max-rows = 5000
    

    这个例子中表示每插入 5000 行触发一次写操作。

    1. 设置最大缓冲时间

    同样地,在你的 Flink SQL 程序配置中添加:

    sink.buffer-flush.interval = 2000ms 
    

    这个例子中则是代表了每隔 2 秒钟写入一次缓存区内的所有内容。

    默认情况下,JDBC Sink 发送完成后会自动关闭连接,并提交事务。

    还有一些与性能相关的调整方面也可以考虑做出:比如,修改输入并行度(即 max.parallelism),或是调整容器资源限制等都可能受益于增加批量处理参数来保证Flink Job稳定运行。

    注意 JKBC Sink 默认不启用 Batch Api, 如需将 JDBC Sink 扩展成可同时支持 batch insert 和 streaming insert 需要额外开发。

    2023-04-24 18:21:16
    赞同 展开评论 打赏
  • 在使用Flink JDBC Connector的Sink写入数据库时,设置批量flush参数是实现高吞吐量的关键。主要有两个相关参数: 1. sink.buffer-flush.max-rows:触发flush并写入数据库的最大行数阈值。默认为1,即每插入1行就flush。 2. sink.buffer-flush.interval:触发flush的最大时间间隔。默认为0,即不按时间隔断flush。 要实现批量写入数据库,需要同时调大这两个参数。例如:

    properties
    # 每5000行或1秒钟触发一次flush
    sink.buffer-flush.max-rows=5000  
    sink.buffer-flush.interval=1000
    

    这样,连接器会将插入的数据缓存至内存Buffer,当数据量达到5000行或者超过1秒钟时,会触发flush并批量写入数据库。 adjust这两个参数对吞吐量的影响主要有: 1. sink.buffer-flush.max-rows越大,触发flush的频率越低,插入数据库的批量数据越多,数据库的IO压力越大,从而达到更高的吞吐量。 2. sink.buffer-flush.interval允许在达不到max-rows阈值时,也定期触发flush。这可以避免数据在Buffer中停留时间过长,同时也为数据库提供更均匀的压力,有利于吞吐量的提高。 3. 两个参数都不宜设置过大,否则会给数据库带来过高的压力和较高的延迟。需要根据数据库的处理能力进行评估和调优。 4. interval参数对event time语义的支持很关键。它可以确保无论 max-rows是否达到,数据至少以该频率超越水位线并刷新到数据库。 所以,想实现Flink JDBC Sink的批量写入和高吞吐量,调优这两个flush相关的参数是关键。需要结合具体数据库的性能,选择适当的max-rows和interval值,既能发挥批量写入的优势,又不会过分压榨数据库资源。

    2023-04-24 17:00:56
    赞同 展开评论 打赏
  • 在使用 Flink JDBC Connector 时,可以通过配置 sink.buffer-flush.max-rowssink.buffer-flush.interval 参数来控制批量写入。其中,sink.buffer-flush.max-rows 参数表示可以缓存的最大记录数,sink.buffer-flush.interval 参数表示缓存的最大时间间隔。只要满足其中一个条件,就会触发将缓存中的数据批量写入 JDBC 数据库。

    如果您设置了 sink.buffer-flush.max-rows 参数但是没有生效,可能需要检查以下问题:

    1. 是否设置了正确的参数值。确保 sink.buffer-flush.max-rows 参数的值大于 0。

    2. 是否开启了批量写入。在 Flink JDBC Connector 中,默认是开启了批量写入的,如果您没有手动关闭,那么就应该可以正常批量写入。可以检查一下代码中是否有关闭批量写入的操作。

    3. 是否达到了批量写入的条件。如果您设置了 sink.buffer-flush.max-rows 参数,但是没有达到最大记录数,那么就不会触发批量写入。可以检查一下是否有足够的数据达到了最大记录数。

    如果您仍然无法解决问题,请提供更多详细的信息,例如代码片段、日志信息等,以便更好地定位问题。

    2023-04-24 12:04:17
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    可以使用 Flink 官方提供的 BulkFormatBuilder 工具类,来实现 flink-connector-jdbc 的批量写入。

    // 这里假设已经定义好了 POJO 类型和 JDBC 配置信息

    DataStream records = ...; JdbcConnectionOptions connectionOptions = ...; JdbcExecutionOptions executionOptions = ...;

    // 使用 BulkFormatBuilder 构建 JdbcOutputFormatBuilder

    JdbcOutputFormatBuilder outputFormatBuilder = JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:test") .setUsername("") .setPassword("") .setQuery("INSERT INTO test (id, name) VALUES (?, ?);") .setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR }) .setBatchIntervalMs(1000) .setBatchSize(500) .setJdbcStatementBuilder(new JdbcStatementBuilder() { public void accept(java.sql.PreparedStatement statement, MyRecord record) { statement.setInt(1, record.id); statement.setString(2, record.name); } });

    // 构建 SinkFunction

    SinkFunction sink = JdbcSink.sink( outputFormatBuilder, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:derby:memory:test") .withDriverName("org.apache.derby.jdbc.EmbeddedDriver") .build(), new JdbcExecutionOptions.JdbcExecutionOptionsBuilder() .withBatchSize(500) .withBatchIntervalMs(1000) .withMaxRetries(3) .build());

    // 将 Source 写入 Sink

    records.addSink(sink);

    2023-04-24 10:10:55
    赞同 展开评论 打赏
  • 全栈JAVA领域创作者

    Flink 官方提供的 flink-connector-jdbc 在 Sink 时支持批量写入,可以通过设置以下参数来实现:

    jdbc.max-retries:设置在失败情况下允许的最大重试次数。默认值为 3。

    jdbc.batch-size:设置 JDBC Sink 在写入数据时使用的批量大小。默认值为 1000。

    jdbc.batch-interval-millis:设置 JDBC Sink 在写入数据时使用的批量时间间隔。默认值为 0,表示禁用批量时间间隔。

    sink.buffer-flush.max-rows:设置在写入到 JDBC Sink 之前缓冲区中允许的最大记录数。默认值为 -1,表示禁用缓冲区。设置为正整数时,表示启用缓冲区,并设置缓冲区中允许的最大记录数。

    需要注意的是,sink.buffer-flush.max-rows 参数主要用于控制缓冲区的大小,并不能直接控制批量写入的大小。实际上,在使用 JDBC Sink 时,批量写入的大小由 jdbc.batch-size 和 jdbc.batch-interval-millis 参数共同决定。

    如果您想要实现更高效的批量写入,建议将 jdbc.batch-size 参数设置为较大的值,并设置合理的 jdbc.batch-interval-millis 参数以控制写入的频率。同时,还可以考虑通过调整并发度、优化 SQL 语句、优化数据库连接等方式来进一步提高写入性能。

    Regenerate response

    2023-04-23 20:24:12
    赞同 展开评论 打赏
  • 存在即是合理

    在使用 Flink JDBC Sink 时,可以通过以下参数控制批量写入:

    1、sink.buffer-flush.max-rows:设置缓冲区大小,即达到该缓冲区大小后批量写入到数据库,默认为 5000; 2、sink.buffer-flush.interval:设置缓冲区时间间隔,即达到该时间间隔后批量写入到数据库,默认为 1s; 3、sink.max-retries:设置写入失败时的最大重试次数,默认为 3; 4、sink.max-parallelism:设置写入的并行度,默认为 1。

    如果你想要自定义批量写入的大小,可以通过在参数中添加如下配置实现:

    .withParameters(JdbcExecutionOptions.builder().withBatchSize(1000).build()) 其中 withBatchSize() 方法可以设置批量写入的大小,即一次性写入多少条数据。例如上述示例中的 1000 表示一次性写入 1000 条数据。

    另外,如果你在使用 Flink JDBC Sink 时出现了 sink.buffer-flush.max-rows 参数不生效的情况,可以尝试将该参数的值设置为较小的值,如 100,看看是否能够生效。如果仍然不生效,可能是因为数据库本身的限制导致无法批量写入。

    2023-04-23 17:28:12
    赞同 展开评论 打赏
  • 技术架构师 阿里云开发者社区技术专家博主 CSDN签约专栏技术博主 掘金签约技术博主 云安全联盟专家 众多开源代码库Commiter

    在 Flink 中使用 flink-connector-jdbc 作为 Sink 时,可以通过sink.buffer-flush.max-rows来设置批量写入的最大行数。但是需要注意的是,这个参数仅仅是一个提示,Flink 会尽可能地将数据批量写入 JDBC 数据库,并且根据数据库支持的特性进行优化,以达到更高的吞吐量。

    在设置 sink.buffer-flush.max-rows 参数时,需要同时设置 sink.buffer-flush.interval 参数,用于控制数据写入间隔时间,保证不会因为等待写入数据库而阻塞整个任务。例如:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<Row> stream = ...
    stream.addSink(JdbcSink.sink(
            "insert into myTable values (?, ?, ?)",
            (ps, t) -> {
                ps.setInt(1, t.getField(0));
                ps.setString(2, t.getField(1));
                ps.setDouble(3, t.getField(2));
            },
            JdbcExecutionOptions.builder()
                    .withBatchSize(5000)
                    .withBatchIntervalMs(1000L)
                    .withMaxRetries(5)
                    .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://localhost/test")
                    .withDriverName("com.mysql.jdbc.Driver")
                    .withUsername("root")
                    .withPassword("")
                    .build()));
    env.execute();
    

    在上面的示例中,我们设置了JdbcExecutionOptions对象的withBatchSize方法为5000,表示每次写入5000行数据;同时,withBatchIntervalMs方法也被设置为1000毫秒,表示每隔1秒写入一次数据。这样就可以达到批量写入的效果了。

    需要注意的是,在使用flink-connector-jdbc时,还可以通过其他参数来进一步优化写入性能,例如设置 JDBC 连接池、调整并发度等等。

    2023-04-23 17:09:19
    赞同 展开评论 打赏
  • 热爱开发

    在使用Flink的JDBC连接器时,可以通过以下两种方式来设置批量写入:

    使用BatchingSinkFunction BatchingSinkFunction是一种可选的SinkFunction,它允许将多个记录作为一个批次进行写入。在Flink的JDBC连接器中,您可以通过继承或实现这个类来创建自定义的批量写入SinkFunction。

    例如,下面的代码使用BatchingSinkFunction来将数据按照批量大小(100)写入MySQL:

    public class MySQLBatchWriter extends BatchingJdbcSinkFunction {

    public MySQLBatchWriter() {
        super(JdbcExecutionOptions.builder()
                .withBatchSize(100)
                .build());
    }
    
    @Override
    protected void prepareStatement(PreparedStatement statement, MyData data) throws SQLException {
        statement.setInt(1, data.getId());
        statement.setString(2, data.getName());
        statement.setDouble(3, data.getValue());
    }
    

    } 其中,prepareStatement方法用于准备SQL查询语句,而构造函数则使用JdbcExecutionOptions.Builder来设置批量大小。

    使用JdbcExecutionOptions JdbcExecutionOptions提供了多个配置选项,其中包括设置批量大小的选项。在使用Flink的JDBC连接器时,您可以通过以下代码来设置批量大小:

    JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() .withBatchSize(100) .build();

    JdbcSink sink = JdbcSink.sink( "INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)", (statement, data) -> { statement.setInt(1, data.getId()); statement.setString(2, data.getName()); statement.setDouble(3, data.getValue()); }, JdbcExecutionOptions.builder() .withBatchSize(100) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/my_database") .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("root") .build());

    dataStream.addSink(sink); 其中,withBatchSize方法用于设置批量大小,并将其传递给JdbcExecutionOptions.Builder。

    2023-04-23 17:02:18
    赞同 展开评论 打赏
  • 可能有以下几个原因导致该参数没有生效:

    数据源本身的限制:如果数据源本身对批量写入有限制,那么设置 sink.buffer-flush.max-rows 参数也不会生效。你可以尝试查看数据源的文档或者源码,确认数据源是否支持批量写入,以及支持的批量写入的大小。

    配置参数生效范围:sink.buffer-flush.max-rows 参数只在配置文件中的 flink-conf.yaml 或者执行作业时的 -yD 参数中生效,在代码中通过 env.getConfig().setGlobalJobParameters() 方式设置的参数并不会生效。

    代码中的批量写入限制:在源码中,如果对 sink.buffer-flush.max-rows 参数进行了手动覆盖限制,那么该限制也会覆盖配置文件中的同名限制,你可以去查看代码中是否有这方面的限制。

    需要注意的是,由于 JDBC Sink 是一个异步写入的 Sink,批量写入的大小可能会受到网络带宽、数据源状态等因素的影响。

    2023-04-23 16:36:34
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载