这个机器学习PAI需求可以实现吗?

我现在从ES读取数据初始化了一个MemSourceStreamOp,然后把他转成了StreamOperator,然后我通过滑动窗口获取一些特征,但是我随时需要读取新的ES数据,需要加到StreamOperator中,让这个滑动窗口再加入数据之后继续滑动。但是我没看到这种类似的方式,这个机器学习PAI需求可以实现吗?f2f540a1700359dc8b7055be1f88a5c4.png

5764eecdc5ce879ab7dd8722a586764d.png

展开
收起
三分钟热度的鱼 2023-06-05 15:35:42 140 分享 版权
3 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    阿里云机器学习PAI是支持动态添加数据到StreamOperator的。在你的场景中,你需要通过DataStreamSourceFunction来不断地从ES读取新的数据,然后将其加入到已有的DataStream中。

    具体实现方法如下:

    1. 创建一个DataStreamSourceFunction,在其中实现从ES中读取数据的逻辑。
    2. 在初始化MemSourceStreamOp时,使用fromDataStream()方法将其转换为DataStream,并保存其状态。
    3. 在创建DataStreamSourceFunction后,使用addSource()方法将其加入到已有的DataStream中。
    4. 在创建滑动窗口时,设置允许延迟结果输出的时间,即allowedLateness方法,以确保当新的数据加入时仍能够继续滑动窗口。

    具体代码示例如下:

    # 从ES中读取数据的DataStreamSourceFunction
    class ESDataGenerator(SourceFunction):
        def run(self, ctx: SourceContext) -> None:
            # 从ES中读取数据,并将其转换成DataStreamElement类型加入到DataStream中
            while True:
                data = read_from_ES()
                ctx.collect(DataStreamElement(data, timestamp))
    
        def cancel(self):
            pass
    
    # 初始化MemSourceStreamOp
    mem_source = MemSourceStreamOp([("col1", "int"), ("col2", "string")], [[1, "a"], [2, "b"]])
    
    # 将MemSourceStreamOp转换成DataStream,并保存其状态
    data_stream = mem_source.link(StreamOperator())
    
    # 加入新数据的DataStreamSourceFunction
    es_generator = ESDataGenerator()
    data_stream.add_source(es_generator)
    
    # 创建滑动窗口,设置allowedLateness参数
    windowed_stream = data_stream.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .allowedLateness(Time.seconds(5)))
    
    # 对滑动窗口进行一些操作,如获取特征等
    ...
    

    通过上述方法,你可以实现随时向已有的StreamOperator中添加数据,并保证滑动窗口能够随时获取新的数据并进行计算。

    2023-06-05 22:08:59
    赞同 展开评论
  • 北京阿里云ACE会长

    是的,您可以使用DataStreamSourceStreamOp组件从Elasticsearch中读取数据,并将其转换为DataStreamOperator,然后使用DataStreamOperator.addSink()方法将数据发送到您的滑动窗口操作中。

    当新的数据到达时,您可以使用DataStreamOperator.addSink()方法将其发送到StreamOperator中进行处理。这样,当新的数据到达时,它会被添加到StreamOperator中,并随着滑动窗口一起被滑动处理。

    2023-06-05 21:06:00
    赞同 展开评论
  • 现在还没有es的数据源,简单的,你可以弄一个Kafka,做同样的逻辑。此回答整理自钉群“Alink开源--用户群”

    2023-06-05 15:55:11
    赞同 展开评论

人工智能平台 PAI(Platform for AI,原机器学习平台PAI)是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务,内置140+种优化算法,具备丰富的行业场景插件,为用户提供低门槛、高性能的云原生AI工程化能力。

收录在圈子:
还有其他疑问?
咨询AI助理