ES数据量大概2亿条,定制CustomElasticsearchSource extends RichParallelSourceFunction作为数据源,用于翻页读取ES数据,然后通过flink的api输出到paimon+hadoop中,通过IDEA环境调试,现在发现程序一直在读取数据,hadoop中直到任务执行完成,才能被写入文件数据?能否读取1w条后即输出到hadoop?
另外,我发现table.executeInsert(tableName);和env.excute()会导致程序运行2次,那么到底应不应该加env.excute(),到底什么情况下才能加?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
你可以试试在Flink的处理逻辑中设置一个计数器,在读取到1万条数据时手动触发checkpoint并将数据输出到Hadoop,这样可以避免数据累积。至于table.executeInsert(tableName);
和 env.execute()
,通常只需要调用一次env.execute()
来启动整个Flink作业,executeInsert
是在Table API中用来直接将结果插入到外部存储中的方法,两者功能不同,一般不需要同时使用。如果还是不理解的话,可以直接询问阿里云客服,或者提交工单给阿里云的技术顾问,回答不易,麻烦大佬给个采纳,谢谢。