开发者社区> 问答> 正文

jdbc sink无法插入数据怎么办?

我在使用flink1.12,现在有一个job,数据最后需要同时进入Kafka和数据库,所以在最后一步操作里加了side output,代码如下 

.process(new ProcessFunction<RatioValue, RatioValue>() { 

@Override 

public void processElement(RatioValuevalue, Context ctx, Collector out) throws Exception { 

out.collect(value); 

ctx.output(ratioOutputTag, value); 

}); 

sideStream.addSink(new FlinkKafkaProducer<>( 

"ratio_value", 

new RatioValueSerializationSchema(suffix), 

PropertiesUtil.getDefaultKafkaProperties(tool.get(KAFKA_URL), tool.get(SCHEMA_REGISTRY_URL)), 

FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); 

DataStream ratioSideStream = sideStream.getSideOutput(ratioOutputTag); 

ratioSideStream.addSink(JdbcSinkUtil.getRatioValueJdbcSinkFunction(tool)); 

在实际运行中,数据生成后能正确落入kafka,但是jdbc sink有些重启job后可用,有时重启后还是不可用。 

用local environment模式运行时,断点断在JdbcSink的sink方法里,发现没法断点进行,感觉时没执行到JdbcSink。 

想问下这种情况是否有什么排查手段?*来自志愿者整理的flink邮件归档

展开
收起
CCCC 2021-12-02 14:59:14 687 0
1 条回答
写回答
取消 提交回答
  • 一个朴素的思路,数据量是多少,有没有考虑到数据库的写入压力呢? 

    去掉kafka sink ,看下 写入效果。 

    再对比下 加入kafka 后的效果。 

    一个通道,连接了两个sink,一个落水快,一个落水慢。落水快的很快消化了,落水慢 可能无法控速,就跪了, 进而导致整个通道 跪了*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:58:29
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink SQL in 2020 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载