你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。
class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {
/** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient MapState<String, Long> stat_value;
@Override public void flatMap(ObjectNode input, Collector out) throws Exception {
// access the state value
}
@Override public void open(Configuration config) { MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<String, Long>( "stat_value",String.class, Long.class); // default value of the state, if nothing was set stat_value = getRuntimeContext().getMapState(descriptor); } } *来自志愿者整理的flink邮件归档
有做过类似的事情,不用侵入flink的源码。记录几个关键信息:
当有数据过来时,iff (null == initTime) || (initTime < openTime) 进行初始数据加载动作。
Sent from Mailhttps://go.microsoft.com/fwlink/?LinkId=550986 for Windows 10*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。