bufferTimeout 这个参数在flink-conf.yaml 是怎么配置啊
在 Flink 中,bufferTimeout 参数用于控制 JDBC Sink 在写入数据库时的缓冲数和时间间隔。如果每次发送数据时都要建立一个连接,在重复使用上下文、引擎以及网络资源方面会带来较大的开销,也可能导致性能瓶颈。因此,Flink 提供了 JDBC 批量操作来处理这样的情况。
在 flink-conf.yaml 配置文件中设置参数可以为整个应用程序定义全局值。以下是如何配置 bufferTimeout 参数
# flink-conf.yaml
...
execution:
planner:
...
# 统一的 Flush 触发器时间,默认为毫秒
sink.buffer-flush.max-events=10000
sink.buffer-flush.interval=1s
# 回滚时失败(设计成无限重试或放弃)或成功传递记录
sink.semantic.setFailOnTransactionalMismatch=false
#事件之间的最小交付超时,默认30ms(保证low-latency)
event-time: {
max-out-of-orderness: 14s
watermark:
generator:
classname: org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
parameter:
maxOutOfOrdernessSeconds: 13.5
}
env.java.opts: ''
taskmanager.memory.preallocate: true
# Disabled for a single node setup. For more information check ConfigOptions#LOCAL_NUMBER_TASK_MANAGER or the documentation.
cluster.evenly-spread-out-slots: false
# TextKinesis Test
sequence.generator.retry:
# maximum retry times before record declared as failed
max-retries: 10
# delay between two retries in milliseconds
delta-in-ms: 4000
# flink 基本属性
taskmanager:
# 一个TaskManager的可用的CPU核数, -1 = 默认值(所有核) (就是默认分配所有core)
numberOfTaskSlots: 2
...
# jdbc Configurations
jdbc.driver.class_name=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/[DB_NAME]?useSSL=false&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC
jdbc.username=root
jdbc.password=root
# optional connection options keys.
jdbc.table-name=my_test_table
jdbc.write.mode=UPDATE
jdbc.batch.size=100
sink.buffer-flush.max-events=10000
sink.buffer-flush.interval=3s
在上述配置文件中,最后一行设置了 sink.buffer-flush.interval 参数为每3秒刷新缓冲池。这个参数也可以通过 Java 访问和调整:
JdbcSink jdbcSink = JdbcSink.sink( 'INSERT INTO table', new SimpleJdbcStatementBuilder<>(), preparedStatementSetter);
jdbcSink.setBatchSize(1024).setFlushIntervalMills(3000L);
DataStream> ds = env.fromElements(Tuple2.of('spam',42));
ds.addSink(jdbcSink);
使用上面提到的方式来更改此 “ sink.buffer-flush.interval” 属性以及其他 Flink 配置参数都是合法的。
以上是配置 bufferTimeout 参数的方法,希望它能够解决您的问题!
赞0
踩0