尝试了sink.buffer-flush.max-rows,感觉没生效。
阿里云实时计算 Flink 支持使用 flink-connector-jdbc 连接器将数据从 Flink 程序写入到关系型数据库中,通过设置批量写入可以提高写入性能,降低写入延迟。在 flink-connector-jdbc 中,可以通过以下方式设置批量写入:
JdbcSink.sink(
"INSERT INTO table (column1, column2) VALUES (?, ?)",
new BatchPreparedStatementSetter<Tuple2<String, Integer>>() {
@Override
public void setValues(PreparedStatement ps, Tuple2<String, Integer> tuple) throws SQLException {
ps.setString(1, tuple.f0);
ps.setInt(2, tuple.f1);
ps.addBatch();
}
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
// do nothing
}
},
// other config
)
JdbcSink.sink(
"INSERT INTO table (column1, column2) VALUES (?, ?)",
new JdbcStatementBuilderImpl<>(),
new JdbcExecutionOptions.Builder()
.withBatchSize(1000)
.build(),
// other config
)
设置批量写入可以提高写入性能,但也可能会导致数据不一致的问题。因此,在设置批量写入时,需要根据具体业务场景进行权衡和测试。
在使用 Flink 提供的 JDBC Connector 时,可以通过设置 batchSize
属性来实现批量写入。具体操作如下:
在项目的 pom.xml
文件中添加 JDBC Connector 的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
在创建 JDBC Sink 时,可以通过 JDBCOutputFormatBuilder
的 setBatchIntervalMillis
方法设置批量写入的时间间隔,单位是毫秒。可以根据实际场景调整时间间隔和批量大小:
JDBCOutputFormatBuilder builder = new JDBCOutputFormatBuilder()
.setDrivername(driverName)
.setDBUrl(dbURL)
.setUsername(userName)
.setPassword(password)
.setQuery(insertQuery)
.setBatchIntervalMillis(batchIntervalMillis) // 设置批量写入的时间间隔
.setBatchSize(batchSize); // 设置批量大小
JDBCOutputFormat outputFormat = builder.finish();
JDBCTableSink jdbcSink = JDBCTableSink.builder()
.setOutputFormat(outputFormat)
.build();
其中,batchSize
属性可以设置批量写入的大小,即将多个数据行打包成一个批次进行写入。注意,这个值需要根据你的数据量和实际场景进行调整,过小可能会影响写入性能,过大可能会导致内存溢出。
将 JDBC Sink 应用到 Flink 流计算中,例如:
DataStream<MyPojo> inputStream = ...;
inputStream.addSink(jdbcSink);
在应用到 Flink 流计算中时,可以通过 setFlushOnCheckpoint(true)
方法设置在Checkpoint时也要进行数据写入,保证数据的完整性和可靠性。
在使用flink-connector-jdbc进行数据sink时,可以通过设置JDBCOutputFormat的batchSize参数来控制批量写入的大小。batchSize参数表示每次写入的记录数,可以根据需要进行调整。例如:
JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(driver)
.setDBUrl(dbUrl)
.setUsername(username)
.setPassword(password)
.setQuery(insertQuery)
.setBatchInterval(batchInterval)
.setBatchSize(batchSize)
.finish();
其中,batchSize参数可以设置为一个大于1的整数,表示每次写入的记录数。需要注意的是,设置的batchSize过大可能会导致内存溢出,需要根据实际情况进行调整。
在使用 Flink 的 JDBC sink 时,可以通过配置 batchSize
来设置批量写入的大小。batchSize
表示一次性向 JDBC 批量提交多少条记录,可以设置为整数值。
例如,以下代码片段使用 JDBC sink 向 MySQL 数据库中批量写入数据,每次提交 100 条记录:
DataStream<MyData> stream = ...;
FlinkJdbcSink<MyData> sink = JdbcSink.<MyData>batch(
"INSERT INTO myTable (col1, col2, col3) VALUES (?, ?, ?)",
(ps, data) -> {
ps.setString(1, data.getField1());
ps.setInt(2, data.getField2());
ps.setLong(3, data.getField3());
},
JdbcExecutionOptions.builder()
.withBatchSize(100)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/myDb")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("myUser")
.withPassword("myPassword")
.build());
stream.addSink(sink);
在上述示例中,使用了 FlinkJdbcSink 的静态方法 batch
,并传入了 batchSize
参数。该参数被设置为 100,因此会每次向数据库批量提交 100 条记录。
注意,此时需要保证配置的 batchSize
不超过 JDBC 驱动的最大批量提交大小,否则可能会出现异常。通常情况下,可以根据系统性能和数据量大小综合考虑一个适当的 batchSize
值。
官方文档提供了2个方式。
1、BatchIntervalChecker
BatchIntervalChecker是一个用于检查批量写入时间间隔的工具类。您可以使用 BatchIntervalChecker来设置批量写入的时间间隔。BatchIntervalChecker会在每个批次写入之前检查时间间隔是否已经达到,如果达到了,就会触发批量写入操作。
2、BatchSizeTrigger
BatchSizeTrigger是一个用于检查批量写入大小的工具类。您可以使用BatchSizeTrigger来设置批量写入的大小。BatchSizeTrigger会在每个批次写入之前检查写入的数据量是否已经达到,如果达到了,就会触发批量写入操作。
具体使用哪种方式取决于您的需求和场景。如果您的数据量较小,可以使用BatchIntervalChecker来设置时间间隔;如果您的数据量较大,可以使用 BatchSizeTrigger来设置批量写入大小。
Flink JDBC连接器提供了对MySQL、PostgreSQL和Oracle等常见的数据库读写支持,如果需要配置批量写入可以设置sink.buffer-flush.max-rows 大于0 ,flush数据前,缓存记录的最大值。以及设置sink.buffer-flush.interval 大于0,flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。同时需要注意字段类型映射,参考文档:文档
在Flink SQL连接MySQL等关系型数据库时,通常会使用Flink官方提供的JDBC connector。当想要在sink阶段进行批量写入时,可以考虑以下几个方面。
配置Sink 除了 sink.buffer-flush.max-rows,还需要配置sink.buffer-flush.interval和sink.buffer-flush.max-size等参数,这些参数共同控制了一个批次的多少和大小。具体参数含义可以参考官方文档(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#jdbc-sink)。
数据类型和批量写入 如果sink数据列的类型与表中的数据类型不匹配,可能也会导致写入失败,因此需要确保被写入的数据类型一致。此外,对于大数据量的写入,Flink仅使用JDBC批量编译器API,其中大批量数据的处理是在JDBC驱动程序内部完成的,但该功能仅在JDBC驱动程序支持时才会生效,因此需要确保JDBC驱动程序支持批量处理。
确认实时更新 最后,确保你正在连接到手动维护的活动数据库,而不是连接到备用副本,否则写入可能会延迟,因为备用副本可能不会实时更新。
希望这些信息有所帮助,并解决您的问题。
设置最大缓冲行数 设置最大缓冲时间 默认情况下,JDBC Sink 发送完成后会自动关闭连接,并提交事务。
还有一些与性能相关的调整方面也可以考虑做出:比如,修改输入并行度(即 max.parallelism),或是调整容器资源限制等都可能受益于增加批量处理参数来保证Flink Job稳定运行。
Flink 中的 JDBC Connector 可以用于读取和写入关系型数据库。在使用 Flink 的 JDBC Sink 进行批量写入时,需要设置 sink.buffer-flush.max-rows 和 sink.buffer-flush.interval 两个参数,分别表示最大缓冲行数和最大缓冲时间。当满足其中一个条件时,就会将数据批量写入 Jdbc。
设置最大缓冲行数
在你的 Flink SQL 程序里找到对应的 JDBC sink 并进行如下配置:
sink.buffer-flush.max-rows = 5000
这个例子中表示每插入 5000 行触发一次写操作。
同样地,在你的 Flink SQL 程序配置中添加:
sink.buffer-flush.interval = 2000ms
这个例子中则是代表了每隔 2 秒钟写入一次缓存区内的所有内容。
默认情况下,JDBC Sink 发送完成后会自动关闭连接,并提交事务。
还有一些与性能相关的调整方面也可以考虑做出:比如,修改输入并行度(即 max.parallelism),或是调整容器资源限制等都可能受益于增加批量处理参数来保证Flink Job稳定运行。
注意 JKBC Sink 默认不启用 Batch Api, 如需将 JDBC Sink 扩展成可同时支持 batch insert 和 streaming insert 需要额外开发。
在使用Flink JDBC Connector的Sink写入数据库时,设置批量flush参数是实现高吞吐量的关键。主要有两个相关参数: 1. sink.buffer-flush.max-rows:触发flush并写入数据库的最大行数阈值。默认为1,即每插入1行就flush。 2. sink.buffer-flush.interval:触发flush的最大时间间隔。默认为0,即不按时间隔断flush。 要实现批量写入数据库,需要同时调大这两个参数。例如:
properties
# 每5000行或1秒钟触发一次flush
sink.buffer-flush.max-rows=5000
sink.buffer-flush.interval=1000
这样,连接器会将插入的数据缓存至内存Buffer,当数据量达到5000行或者超过1秒钟时,会触发flush并批量写入数据库。 adjust这两个参数对吞吐量的影响主要有: 1. sink.buffer-flush.max-rows越大,触发flush的频率越低,插入数据库的批量数据越多,数据库的IO压力越大,从而达到更高的吞吐量。 2. sink.buffer-flush.interval允许在达不到max-rows阈值时,也定期触发flush。这可以避免数据在Buffer中停留时间过长,同时也为数据库提供更均匀的压力,有利于吞吐量的提高。 3. 两个参数都不宜设置过大,否则会给数据库带来过高的压力和较高的延迟。需要根据数据库的处理能力进行评估和调优。 4. interval参数对event time语义的支持很关键。它可以确保无论 max-rows是否达到,数据至少以该频率超越水位线并刷新到数据库。 所以,想实现Flink JDBC Sink的批量写入和高吞吐量,调优这两个flush相关的参数是关键。需要结合具体数据库的性能,选择适当的max-rows和interval值,既能发挥批量写入的优势,又不会过分压榨数据库资源。
在使用 Flink JDBC Connector 时,可以通过配置 sink.buffer-flush.max-rows
和 sink.buffer-flush.interval
参数来控制批量写入。其中,sink.buffer-flush.max-rows
参数表示可以缓存的最大记录数,sink.buffer-flush.interval
参数表示缓存的最大时间间隔。只要满足其中一个条件,就会触发将缓存中的数据批量写入 JDBC 数据库。
如果您设置了 sink.buffer-flush.max-rows
参数但是没有生效,可能需要检查以下问题:
是否设置了正确的参数值。确保 sink.buffer-flush.max-rows
参数的值大于 0。
是否开启了批量写入。在 Flink JDBC Connector 中,默认是开启了批量写入的,如果您没有手动关闭,那么就应该可以正常批量写入。可以检查一下代码中是否有关闭批量写入的操作。
是否达到了批量写入的条件。如果您设置了 sink.buffer-flush.max-rows
参数,但是没有达到最大记录数,那么就不会触发批量写入。可以检查一下是否有足够的数据达到了最大记录数。
如果您仍然无法解决问题,请提供更多详细的信息,例如代码片段、日志信息等,以便更好地定位问题。
可以使用 Flink 官方提供的 BulkFormatBuilder 工具类,来实现 flink-connector-jdbc 的批量写入。
// 这里假设已经定义好了 POJO 类型和 JDBC 配置信息
DataStream records = ...; JdbcConnectionOptions connectionOptions = ...; JdbcExecutionOptions executionOptions = ...;
// 使用 BulkFormatBuilder 构建 JdbcOutputFormatBuilder
JdbcOutputFormatBuilder outputFormatBuilder = JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:test") .setUsername("") .setPassword("") .setQuery("INSERT INTO test (id, name) VALUES (?, ?);") .setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR }) .setBatchIntervalMs(1000) .setBatchSize(500) .setJdbcStatementBuilder(new JdbcStatementBuilder() { public void accept(java.sql.PreparedStatement statement, MyRecord record) { statement.setInt(1, record.id); statement.setString(2, record.name); } });
// 构建 SinkFunction
SinkFunction sink = JdbcSink.sink( outputFormatBuilder, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:derby:memory:test") .withDriverName("org.apache.derby.jdbc.EmbeddedDriver") .build(), new JdbcExecutionOptions.JdbcExecutionOptionsBuilder() .withBatchSize(500) .withBatchIntervalMs(1000) .withMaxRetries(3) .build());
// 将 Source 写入 Sink
records.addSink(sink);
Flink 官方提供的 flink-connector-jdbc 在 Sink 时支持批量写入,可以通过设置以下参数来实现:
jdbc.max-retries:设置在失败情况下允许的最大重试次数。默认值为 3。
jdbc.batch-size:设置 JDBC Sink 在写入数据时使用的批量大小。默认值为 1000。
jdbc.batch-interval-millis:设置 JDBC Sink 在写入数据时使用的批量时间间隔。默认值为 0,表示禁用批量时间间隔。
sink.buffer-flush.max-rows:设置在写入到 JDBC Sink 之前缓冲区中允许的最大记录数。默认值为 -1,表示禁用缓冲区。设置为正整数时,表示启用缓冲区,并设置缓冲区中允许的最大记录数。
需要注意的是,sink.buffer-flush.max-rows 参数主要用于控制缓冲区的大小,并不能直接控制批量写入的大小。实际上,在使用 JDBC Sink 时,批量写入的大小由 jdbc.batch-size 和 jdbc.batch-interval-millis 参数共同决定。
如果您想要实现更高效的批量写入,建议将 jdbc.batch-size 参数设置为较大的值,并设置合理的 jdbc.batch-interval-millis 参数以控制写入的频率。同时,还可以考虑通过调整并发度、优化 SQL 语句、优化数据库连接等方式来进一步提高写入性能。
Regenerate response
在使用 Flink JDBC Sink 时,可以通过以下参数控制批量写入:
1、sink.buffer-flush.max-rows:设置缓冲区大小,即达到该缓冲区大小后批量写入到数据库,默认为 5000; 2、sink.buffer-flush.interval:设置缓冲区时间间隔,即达到该时间间隔后批量写入到数据库,默认为 1s; 3、sink.max-retries:设置写入失败时的最大重试次数,默认为 3; 4、sink.max-parallelism:设置写入的并行度,默认为 1。
如果你想要自定义批量写入的大小,可以通过在参数中添加如下配置实现:
.withParameters(JdbcExecutionOptions.builder().withBatchSize(1000).build()) 其中 withBatchSize() 方法可以设置批量写入的大小,即一次性写入多少条数据。例如上述示例中的 1000 表示一次性写入 1000 条数据。
另外,如果你在使用 Flink JDBC Sink 时出现了 sink.buffer-flush.max-rows 参数不生效的情况,可以尝试将该参数的值设置为较小的值,如 100,看看是否能够生效。如果仍然不生效,可能是因为数据库本身的限制导致无法批量写入。
在 Flink 中使用 flink-connector-jdbc 作为 Sink 时,可以通过sink.buffer-flush.max-rows
来设置批量写入的最大行数。但是需要注意的是,这个参数仅仅是一个提示,Flink 会尽可能地将数据批量写入 JDBC 数据库,并且根据数据库支持的特性进行优化,以达到更高的吞吐量。
在设置 sink.buffer-flush.max-rows
参数时,需要同时设置 sink.buffer-flush.interval
参数,用于控制数据写入间隔时间,保证不会因为等待写入数据库而阻塞整个任务。例如:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Row> stream = ...
stream.addSink(JdbcSink.sink(
"insert into myTable values (?, ?, ?)",
(ps, t) -> {
ps.setInt(1, t.getField(0));
ps.setString(2, t.getField(1));
ps.setDouble(3, t.getField(2));
},
JdbcExecutionOptions.builder()
.withBatchSize(5000)
.withBatchIntervalMs(1000L)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("")
.build()));
env.execute();
在上面的示例中,我们设置了JdbcExecutionOptions
对象的withBatchSize
方法为5000,表示每次写入5000行数据;同时,withBatchIntervalMs
方法也被设置为1000毫秒,表示每隔1秒写入一次数据。这样就可以达到批量写入的效果了。
需要注意的是,在使用flink-connector-jdbc
时,还可以通过其他参数来进一步优化写入性能,例如设置 JDBC 连接池、调整并发度等等。
在使用Flink的JDBC连接器时,可以通过以下两种方式来设置批量写入:
使用BatchingSinkFunction BatchingSinkFunction是一种可选的SinkFunction,它允许将多个记录作为一个批次进行写入。在Flink的JDBC连接器中,您可以通过继承或实现这个类来创建自定义的批量写入SinkFunction。
例如,下面的代码使用BatchingSinkFunction来将数据按照批量大小(100)写入MySQL:
public class MySQLBatchWriter extends BatchingJdbcSinkFunction {
public MySQLBatchWriter() {
super(JdbcExecutionOptions.builder()
.withBatchSize(100)
.build());
}
@Override
protected void prepareStatement(PreparedStatement statement, MyData data) throws SQLException {
statement.setInt(1, data.getId());
statement.setString(2, data.getName());
statement.setDouble(3, data.getValue());
}
} 其中,prepareStatement方法用于准备SQL查询语句,而构造函数则使用JdbcExecutionOptions.Builder来设置批量大小。
使用JdbcExecutionOptions JdbcExecutionOptions提供了多个配置选项,其中包括设置批量大小的选项。在使用Flink的JDBC连接器时,您可以通过以下代码来设置批量大小:
JdbcExecutionOptions executionOptions = JdbcExecutionOptions.builder() .withBatchSize(100) .build();
JdbcSink sink = JdbcSink.sink( "INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)", (statement, data) -> { statement.setInt(1, data.getId()); statement.setString(2, data.getName()); statement.setDouble(3, data.getValue()); }, JdbcExecutionOptions.builder() .withBatchSize(100) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/my_database") .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("root") .build());
dataStream.addSink(sink); 其中,withBatchSize方法用于设置批量大小,并将其传递给JdbcExecutionOptions.Builder。
可能有以下几个原因导致该参数没有生效:
数据源本身的限制:如果数据源本身对批量写入有限制,那么设置 sink.buffer-flush.max-rows 参数也不会生效。你可以尝试查看数据源的文档或者源码,确认数据源是否支持批量写入,以及支持的批量写入的大小。
配置参数生效范围:sink.buffer-flush.max-rows 参数只在配置文件中的 flink-conf.yaml 或者执行作业时的 -yD 参数中生效,在代码中通过 env.getConfig().setGlobalJobParameters() 方式设置的参数并不会生效。
代码中的批量写入限制:在源码中,如果对 sink.buffer-flush.max-rows 参数进行了手动覆盖限制,那么该限制也会覆盖配置文件中的同名限制,你可以去查看代码中是否有这方面的限制。
需要注意的是,由于 JDBC Sink 是一个异步写入的 Sink,批量写入的大小可能会受到网络带宽、数据源状态等因素的影响。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。