flinksql写kafka,开启了Checkpoint,并且设置了CheckpointingMode.EXACTLY_ONCE,写入kafka时没有设置kafka事务,为啥有时出现连续写入数据到kafka(如图1),有时又是周期性写入kafka(如图2)(周期性为Checkpoint的时间间隔)?
图1
图2
sink kafka 表是:
create table dwd_spyw_alert_info_stat_kaf_sk(
comb_type string
,period_type string
,user_id STRING
,alert_type STRING
,region_id STRING
,device_code STRING
,shop_id STRING
,alert_time STRING
,insert_time STRING
,primary key(comb_type, period_type, device_code, user_id, region_id, alert_type, shop_id) not enforced
)with(
'connector' = 'upsert-kafka',
'topic' = 'topic-name',
'properties.bootstrap.servers' = 'localhost:9092',
'key.json.ignore-parse-errors' = 'true',
'value.json.fail-on-missing-field' = 'false',
'value.json.ignore-parse-errors' = 'true',
'key.format' = 'json',
'value.format' = 'json',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '1 s',
'properties.compression.type' = 'gzip',
'properties.compression.codec' = 'gzip',
'properties.linger.ms' = '5'
);
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。