开发者社区> 问答> 正文

如何使用转换高阶函数?

这是关于transform高阶函数(https://issues.apache.org/jira/browse/SPARK-23908)。

有没有办法将它用作标准功能(在包中org.apache.spark.sql.functions._)?

我有一个字符串数组,我想对每个字符串应用URI规范化。现在我用UDF做了。我刚刚用spark 2.4.0跳过它,我可以跳过UDF。

当我看到它应该使用selectExpr类似df.selectExpr("transform(i, x -> x + 1)"),但它只是为了与使用selectExpr?

无论如何使用这种方式为转换提供自定义功能?有没有办法实现它,还是应该使用好的旧UDF?

展开
收起
社区小助手 2018-12-21 13:39:58 1706 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    目前它只适用于SQL表达式,但是如果你想要返回Column你的用途expr:

    org.apache.spark.sql.functions._

    expr("transform(i, x -> x + 1)"): Column
    无论如何使用这种方式为转换提供自定义功能?

    可以使用Scala UDF *:

    spark.udf.register("f", (x: Int) => x + 1)

    Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
    .withColumn("xsinc", expr("transform(xs, x -> f(x))"))

    .show
    id xs xsinc
    1 [1, 2, 3] [2, 3, 4]

    虽然它似乎没有提供任何真正的好处超过UDF采取Seq。

    *对Python UDF的部分支持似乎已经到位(udfs被识别,类型被正确派生,调用也被调度),但是从2.4.0开始,序列化机制似乎被打破(所有记录都传递给UDF) as None):

    from typing import Optional
    from tpyspark.sql.functions import expr

    sc.version
    '2.4.0'
    def f(x: Optional[int]) -> Optional[int]:

    return x + 1 if x is not None else None
    

    spark.udf.register('f', f, "integer")

    df = (spark

    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))
    

    df.printSchema()
    root
    |-- id: long (nullable = true)
    |-- xs: array (nullable = true)
    | |-- element: long (containsNull = true)
    |-- xsinc: array (nullable = true)
    | |-- element: integer (containsNull = true)

    df.show()
    id xs xsinc
    1 [1, 2, 3] [,,]

    当然,这里没有真正的性能提升潜力 - 它的调度BasePythonRunner应该与普通的开销相同udf。

    2019-07-17 23:23:23
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载