开发者社区> 问答> 正文

Flink 状态使用问题咨询

你好,我有一个keyed state,当我启动flink程序时,我需要根据数据库中的数据给每个key加载不同的数值状态,我应该怎样来操作呢,在文档上找不到例子,我觉得应该是在open函数中设置初始状态,但是在这里获取不到key。

class LineItemStat extends RichFlatMapFunction<ObjectNode, JSONObject> {

/** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient MapState<String, Long> stat_value;

@Override public void flatMap(ObjectNode input, Collector out) throws Exception {

// access the state value

}

@Override public void open(Configuration config) { MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<String, Long>( "stat_value",String.class, Long.class); // default value of the state, if nothing was set stat_value = getRuntimeContext().getMapState(descriptor); } } *来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 14:06:56 583 0
1 条回答
写回答
取消 提交回答
  • 有做过类似的事情,不用侵入flink的源码。记录几个关键信息:

    1. Function open的时间,openTime;
    2. 数据初始化的时间,initTime,可以用State保存;
    3. 真正的业务State

    当有数据过来时,iff (null == initTime) || (initTime < openTime) 进行初始数据加载动作。

    Sent from Mailhttps://go.microsoft.com/fwlink/?LinkId=550986 for Windows 10*来自志愿者整理的flink

    2021-12-07 15:25:15
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载