开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinksql写kafka,有时输出数据连续,有时却不连续??

flinksql写kafka,开启了Checkpoint,并且设置了CheckpointingMode.EXACTLY_ONCE,写入kafka时没有设置kafka事务,为啥有时出现连续写入数据到kafka(如图1),有时又是周期性写入kafka(如图2)(周期性为Checkpoint的时间间隔)?

图1
1727689162200.png

图2
1727689114013.png

sink kafka 表是:

create table dwd_spyw_alert_info_stat_kaf_sk(
comb_type string
,period_type string
,user_id STRING
,alert_type STRING
,region_id STRING
,device_code STRING
,shop_id STRING
,alert_time STRING
,insert_time STRING
,primary key(comb_type, period_type, device_code, user_id, region_id, alert_type, shop_id) not enforced
)with(
'connector' = 'upsert-kafka',
'topic' = 'topic-name',
'properties.bootstrap.servers' = 'localhost:9092',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.json.ignore-parse-errors' = 'true',
'key.format' = 'json',
'value.format' = 'json',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '1 s',
'properties.compression.type' = 'gzip',
'properties.compression.codec' = 'gzip',
'properties.linger.ms' = '5'
);

展开
收起
游客glctgieaqedss 2024-09-30 18:01:47 8 0
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载