我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下
.process(new ProcessFunction<RatioValue, RatioValue>() {
@Override
public void processElement(RatioValuevalue, Context ctx, Collector out) throws Exception {
out.collect(value);
ctx.output(ratioOutputTag, value);
}
});
sideStream.addSink(new FlinkKafkaProducer<>(
"ratio_value",
new RatioValueSerializationSchema(suffix),
PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
DataStream ratioSideStream = sideStream.getSideOutput(ratioOutputTag);
ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool));
在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。
用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。
想问下这种情况是否有什么排查手段?*来自志愿者整理的flink邮件归档
一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢?
去掉kafka sink ,看下 写入效果。
再对比下 加入kafka 后的效果。
一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。