我现在从ES读取数据初始化了一个MemSourceStreamOp,然后把他转成了StreamOperator,然后我通过滑动窗口获取一些特征,但是我随时需要读取新的ES数据,需要加到StreamOperator中,让这个滑动窗口再加入数据之后继续滑动。但是我没看到这种类似的方式,这个机器学习PAI需求可以实现吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里云机器学习PAI是支持动态添加数据到StreamOperator的。在你的场景中,你需要通过DataStreamSourceFunction来不断地从ES读取新的数据,然后将其加入到已有的DataStream中。
具体实现方法如下:
具体代码示例如下:
# 从ES中读取数据的DataStreamSourceFunction
class ESDataGenerator(SourceFunction):
def run(self, ctx: SourceContext) -> None:
# 从ES中读取数据,并将其转换成DataStreamElement类型加入到DataStream中
while True:
data = read_from_ES()
ctx.collect(DataStreamElement(data, timestamp))
def cancel(self):
pass
# 初始化MemSourceStreamOp
mem_source = MemSourceStreamOp([("col1", "int"), ("col2", "string")], [[1, "a"], [2, "b"]])
# 将MemSourceStreamOp转换成DataStream,并保存其状态
data_stream = mem_source.link(StreamOperator())
# 加入新数据的DataStreamSourceFunction
es_generator = ESDataGenerator()
data_stream.add_source(es_generator)
# 创建滑动窗口,设置allowedLateness参数
windowed_stream = data_stream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.allowedLateness(Time.seconds(5)))
# 对滑动窗口进行一些操作,如获取特征等
...
通过上述方法,你可以实现随时向已有的StreamOperator中添加数据,并保证滑动窗口能够随时获取新的数据并进行计算。
是的,您可以使用DataStreamSourceStreamOp组件从Elasticsearch中读取数据,并将其转换为DataStreamOperator,然后使用DataStreamOperator.addSink()方法将数据发送到您的滑动窗口操作中。
当新的数据到达时,您可以使用DataStreamOperator.addSink()方法将其发送到StreamOperator中进行处理。这样,当新的数据到达时,它会被添加到StreamOperator中,并随着滑动窗口一起被滑动处理。
人工智能平台 PAI(Platform for AI,原机器学习平台PAI)是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务,内置140+种优化算法,具备丰富的行业场景插件,为用户提供低门槛、高性能的云原生AI工程化能力。