hi: 情景: 我在用flink通过FlinkKafkaConsumer消费kafka的数据并写入到mysql的时候,在sink端我继承 RichSinkFunction ,并重写了open(),close()方法,以及实现了invoke(),方法 个人理解: 1. open()方法在程序启动的时候只走一次,我在该方法中初始化了数据库连接 2. close()方法在程序结束的时候也是只走一次 3. invoke()方法在获取到每一条数据走一次这个方法 实际情况及问题(env.setParallelism(1)): 1. open()在程序启动的时候运行了两次 2. invoke()方法在每条消息过来也会被处理两次
code: reader端:
public class FlinkKafkaReader<B> extends DataKafkaConnect<StreamExecutionEnvironment, DataStream> {
@Override
protected DataStream reader(StreamExecutionEnvironment env, KafkaConfig cfg) throws JobException {
DataStream<B> stream = null;
try {
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", cfg.getBootstrapServers());
kafkaProps.setProperty("group.id", cfg.getGroupId());
kafkaProps.setProperty("auto.offset.reset", cfg.getOffsetReset());
kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("enable.auto.commit", cfg.getAutoCommit());
kafkaProps.setProperty("max.poll.interval.ms", cfg.getIntervalMs());
KafkaDeserializationSchema deserializationKdl = null;
// 根据不同的配置进行选择不同的消息解析器
switch (cfg.getMessageType()) {
case "mysql":
deserializationKdl = new KafkaDataDeserialiaztionBinlogSchemal();
break;
case "mongodb":
deserializationKdl = new KafkaDataDeserialiaztionOplogSchema();
break;
}
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(Arrays.asList(cfg.getTopics().split(",")), deserializationKdl, kafkaProps);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 如果设置了消费的开始offset时间,则从指定的时间点开始会消费,否则从当前时间点开始消费
String consumptionStartOffset = cfg.getConsumptionStartTime();
if (StringUtils.isBlank(consumptionStartOffset)) {
flinkKafkaConsumer.setStartFromGroupOffsets();
} else {
flinkKafkaConsumer.setStartFromTimestamp(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse(cfg.getConsumptionStartTime())
.getTime()
);
}
// 设置并行度
env.setParallelism(1);
// env.getCheckpointConfig().setCheckpointInterval(1000 * 30);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置可容忍checkpoint失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
stream = env.addSource(flinkKafkaConsumer)
.filter(new FilterFunction() {
@Override
public boolean filter(Object value) throws Exception {
return null != value;
}
});
} catch (Exception e) {
throw new JobException(e.getMessage());
}
return stream;
}
}
sink端:
public class MysqlSink<T> extends RichSinkFunction<T> {
@Override
public void open(Configuration config) throw Exception{
...
}
@Override
public void close(){
...
}
@Override
public void invoke(Object obj,Context context){
//业务逻辑, 这里的逻辑每一条数据过来会运行两次,这里也是我的问题
...
}
}
还请知悉原因的道友给点指引,万分感谢*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。