开发者社区> 问答> 正文

flink消费kafka数据open()方法初始要换多次,怎么解决?

情景: 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承 RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法 个人理解: 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接 2. close()方法在程序结束的时候也是只走一次 3. invoke()方法在获取到每一条数据走一次这个方法 实际情况及问题(env.setParallelism(1)): 1. open()在程序启动的时候运行了两次 2. invoke()方法在每条消息过来也会被处理两次

code: reader端:

public class FlinkKafkaReader<B> extends DataKafkaConnect<StreamExecutionEnvironment, DataStream> {

@Override
protected DataStream reader(StreamExecutionEnvironment env, KafkaConfig cfg) throws JobException {

DataStream<B> stream = null;
try {
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", cfg.getBootstrapServers());
kafkaProps.setProperty("group.id", cfg.getGroupId());
kafkaProps.setProperty("auto.offset.reset", cfg.getOffsetReset());
kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("enable.auto.commit", cfg.getAutoCommit());
kafkaProps.setProperty("max.poll.interval.ms", cfg.getIntervalMs());

KafkaDeserializationSchema deserializationKdl = null;
// 根据不同的配置进行选择不同的消息解析器
switch (cfg.getMessageType()) {
case "mysql":
deserializationKdl = new KafkaDataDeserialiaztionBinlogSchemal();
break;
case "mongodb":
deserializationKdl = new KafkaDataDeserialiaztionOplogSchema();
break;
}
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")), deserializationKdl, kafkaProps);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费
String consumptionStartOffset = cfg.getConsumptionStartTime();
if (StringUtils.isBlank(consumptionStartOffset)) {
flinkKafkaConsumer.setStartFromGroupOffsets();
} else {
flinkKafkaConsumer.setStartFromTimestamp(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse(cfg.getConsumptionStartTime())
.getTime()
);
}
// 设置并行度
env.setParallelism(1);
// env.getCheckpointConfig().setCheckpointInterval(1000 * 30);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置可容忍checkpoint失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
stream = env.addSource(flinkKafkaConsumer)
.filter(new FilterFunction() {
@Override
public boolean filter(Object value) throws Exception {
return null != value;
}
});
} catch (Exception e) {
throw new JobException(e.getMessage());
}
return stream;
}
}

sink端:

public class MysqlSink<T> extends RichSinkFunction<T> {
@Override
public void open(Configuration config) throw Exception{
...
}
@Override
public void close(){
...
}
@Override
public void invoke(Object obj,Context context){
//业务逻辑, 这里的逻辑每一条数据过来会运行两次,这里也是我的问题
...
}
}

还请知悉原因的道友给点指引,万分感谢*来自志愿者整理的FLINK邮件归档

展开
收起
又出bug了-- 2021-12-03 16:58:38 1031 0
1 条回答
写回答
取消 提交回答
  • 没有全部代码,我猜你 addSink() 走了两次,调试看下。*来自志愿者整理的FLINK邮件归档

    2021-12-03 17:55:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载