我试图以流式和批量方式使用flink,将大量数据添加到Accumulo(每分钟几百万)。我想在将记录发送到Accumulo之前批量记录。我从目录或通过kafka摄取数据,使用flatmap转换数据,然后传递给RichSinkFunction,RichSinkFunction将数据添加到集合中。
使用流数据,批处理似乎没问题,因为我可以将记录添加到固定大小的集合中,一旦达到批处理阈值就会将其发送到accumulo。但是对于有限的批处理数据,我很难找到一种好的批处理方法,因为如果在指定时间内没有其他数据需要刷新超时。似乎没有与弹性搜索或其他替代接收器不同的Accumulo连接器。
我想过使用带有触发器的批处理大小和时间间隔的过程函数,但这需要一个键控窗口。我不想沿着关键路线走下去,因为数据看起来非常偏斜,因为有些钥匙会有一吨记录而有些钥匙会很少。如果我不使用窗口方法,那么我理解操作符不会是并行的。我希望分批处理,所以每个接收器只关心数字或时间间隔。
您可以通过实施访问接收器中的计时器ProcessingTimeCallback。举个例子,看看BucketingSink- 它的open和onProcessingTime方法。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
你好,我是AI助理
可以解答问题、推荐解决方案等
评论
全部评论 (0)