我有一个用例维护Flink中的配置,我真的不知道该如何处理。
假设我有一些配置存储在某个地方,我需要它来进行处理。在Flink作业的初始化时,我想加载所有配置。
还可以在Flink作业运行期间修改此配置,因此我必须在内存中保留此配置的状态,并在需要时进行更新。可从KafkaSource访问配置更新。
所以这是我所拥有的:
我有一个功能可以加载整个配置,保持其状态并将其与我的数据流关联:
public class MyConfiguration extends RichFlatMapFunction<Row, Row>{ private transient MapState<String, MyConfObject> configuration;
@Override
public void open(MyConfiguration config) throws Exception{
MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
"configuration",
BasicTypeInfo.STRING_TYPE_INFO,
...
);
configuration = getRuntimeContext().getMapState(descriptor);
configuration.putAll(...); // Load configuration from somewhere
}
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
MyConfObject conf = configuration.get(...);
... // Associate conf with data
out.collect(value);
}
} 我的pipeline如下所示:
DataStream dataStream = ...; // My data stream DataStream<Map<String, MyConfObject> streamConf = env.addSource(new FlinkKafkaConsumer (..., ..., ...)) // The stream of configuration updates .map(...);
return dataStream .assignTimestampsAndWatermarks(...) .flatMap(new MyConfiguration())
... //Do some processing
.map(m -> {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(m);
return json.getBytes();
});
我想要的是使用配置更新流streamConf来更新MyConfiguration平面映射函数内的State变量。我怎样才能做到这一点 ?
我建议编写一个从Kafka读取配置信息的源,然后通过广播流将更改广播到映射功能。映射功能将以其持久状态存储完整的当前配置,并且广播流意味着映射功能的所有实例将获取所有配置更改。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。