开发者社区> 问答> 正文

Flink维护配置状态

我有一个用例维护Flink中的配置,我真的不知道该如何处理。

假设我有一些配置存储在某个地方,我需要它来进行处理。在Flink作业的初始化时,我想加载所有配置。

还可以在Flink作业运行期间修改此配置,因此我必须在内存中保留此配置的状态,并在需要时进行更新。可从KafkaSource访问配置更新。

所以这是我所拥有的:

我有一个功能可以加载整个配置,保持其状态并将其与我的数据流关联:

public class MyConfiguration extends RichFlatMapFunction<Row, Row>{ private transient MapState<String, MyConfObject> configuration;

@Override
public void open(MyConfiguration config) throws Exception{
    MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
            "configuration",
            BasicTypeInfo.STRING_TYPE_INFO,
            ...
    );
    configuration = getRuntimeContext().getMapState(descriptor);
    configuration.putAll(...);   // Load configuration from somewhere
}

@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
    MyConfObject conf = configuration.get(...);
    ...               // Associate conf with data
    out.collect(value);
}

} 我的pipeline如下所示:

DataStream dataStream = ...; // My data stream DataStream<Map<String, MyConfObject> streamConf = env.addSource(new FlinkKafkaConsumer (..., ..., ...)) // The stream of configuration updates .map(...);

return dataStream .assignTimestampsAndWatermarks(...) .flatMap(new MyConfiguration())

... //Do some processing

.map(m -> {
    ObjectMapper objectMapper = new ObjectMapper();
    String json = objectMapper.writeValueAsString(m);
    return json.getBytes();
});

我想要的是使用配置更新流streamConf来更新MyConfiguration平面映射函数内的State变量。我怎样才能做到这一点 ?

展开
收起
小六码奴 2019-10-09 19:22:34 764 0
1 条回答
写回答
取消 提交回答
  • 我建议编写一个从Kafka读取配置信息的源,然后通过广播流将更改广播到映射功能。映射功能将以其持久状态存储完整的当前配置,并且广播流意味着映射功能的所有实例将获取所有配置更改。

    2019-10-09 19:23:41
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载