开发者社区 > 大数据与机器学习 > 人工智能平台PAI > 正文

机器学习PAI怎么用Alink的相关方法解析取到metrics里的值?

机器学习PAI怎么用Alink的相关方法解析取到metrics里的值?
LSTNetPredictBatchOp类型的变量存的是下面的数据

pred1

{"data":{"ts":["2021-12-04 00:00:00.0"],"metrics":["0.26510030031204224 0.7800503969192505"]},"schema":"ts TIMESTAMP,metrics VECTOR"}

展开
收起
你鞋带开了~ 2024-03-11 18:08:24 18 0
2 条回答
写回答
取消 提交回答
  • 可以使用这个组件 https://alinklab.cn/manual/jsonvaluebatchop.html
    --此回答整理自钉群“Alink开源--用户群”

    2024-03-13 22:51:28
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    要解析LSTNetPredictBatchOp类型的变量中的metrics值,可以使用Alink的TableSinkStreamOpCollectTableSinkFunction方法。

    首先,需要将LSTNetPredictBatchOp类型的变量转换为DataStream类型,然后使用TableSinkStreamOp将其转换为Table类型。接下来,可以使用CollectTableSinkFunctionTable类型的数据收集到一个列表中,最后从列表中提取metrics值。

    以下是示例代码:

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment, DataTypes
    from pyflink.table.descriptors import Schema, OldCsv, FileSystem
    from pyflink.table.udf import udf
    from alink.ml.operator import LSTNetPredictBatchOp
    
    # 初始化Flink环境
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    
    # 定义schema
    schema = Schema() \n    .field("ts", DataTypes.TIMESTAMP()) \n    .field("metrics", DataTypes.VECTOR())
    
    # 注册源表
    t_env.connect(FileSystem().path("/path/to/input")) \n    .with_format(OldCsv()
                  .field("ts", DataTypes.TIMESTAMP())
                  .field("metrics", DataTypes.STRING())) \n    .with_schema(schema) \n    .create_temporary_table("input_table")
    
    # 注册结果表
    t_env.connect(FileSystem().path("/path/to/output")) \n    .with_format(OldCsv()
                  .field("ts", DataTypes.TIMESTAMP())
                  .field("metrics", DataTypes.STRING())) \n    .with_schema(schema) \n    .create_temporary_table("output_table")
    
    # 创建LSTNetPredictBatchOp实例
    lstnet_predict_batch_op = LSTNetPredictBatchOp()
    
    # 将输入表转换为DataStream
    input_stream = t_env.from_path("input_table")
    
    # 应用LSTNetPredictBatchOp
    output_stream = lstnet_predict_batch_op.transform(input_stream)
    
    # 将DataStream转换为Table
    output_table = t_env.from_data_stream(output_stream, schema)
    
    # 将Table写入结果表
    t_env.insert_into("output_table", output_table)
    
    # 执行任务
    t_env.execute("LSTNetPredictBatchOp Example")
    

    在这个示例中,我们首先创建了一个Flink环境,并定义了输入和输出表的schema。然后,我们创建了一个LSTNetPredictBatchOp实例,并将其应用于输入表。最后,我们将输出表写入到指定的路径。

    2024-03-12 13:29:05
    赞同 展开评论 打赏

相关产品

  • 人工智能平台 PAI
  • 热门讨论

    热门文章

    相关电子书

    更多
    神龙云服务器产品及技术深度解析 立即下载
    弹性创造价值:基于ECS的最佳性价比实践解析 立即下载
    又快又稳:阿里云下一代虚拟交换机解析 立即下载

    相关镜像