这是关于transform高阶函数(https://issues.apache.org/jira/browse/SPARK-23908)。
有没有办法将它用作标准功能(在包中org.apache.spark.sql.functions._)?
我有一个字符串数组,我想对每个字符串应用URI规范化。现在我用UDF做了。我刚刚用spark 2.4.0跳过它,我可以跳过UDF。
当我看到它应该使用selectExpr类似df.selectExpr("transform(i, x -> x + 1)"),但它只是为了与使用selectExpr?
无论如何使用这种方式为转换提供自定义功能?有没有办法实现它,还是应该使用好的旧UDF?
目前它只适用于SQL表达式,但是如果你想要返回Column你的用途expr:
org.apache.spark.sql.functions._
expr("transform(i, x -> x + 1)"): Column
无论如何使用这种方式为转换提供自定义功能?
可以使用Scala UDF *:
spark.udf.register("f", (x: Int) => x + 1)
Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
.withColumn("xsinc", expr("transform(xs, x -> f(x))"))
.show | ||
---|---|---|
id | xs | xsinc |
1 | [1, 2, 3] | [2, 3, 4] |
虽然它似乎没有提供任何真正的好处超过UDF采取Seq。
*对Python UDF的部分支持似乎已经到位(udfs被识别,类型被正确派生,调用也被调度),但是从2.4.0开始,序列化机制似乎被打破(所有记录都传递给UDF) as None):
from typing import Optional
from tpyspark.sql.functions import expr
sc.version
'2.4.0'
def f(x: Optional[int]) -> Optional[int]:
return x + 1 if x is not None else None
spark.udf.register('f', f, "integer")
df = (spark
.createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
.withColumn("xsinc", expr("transform(xs, x -> f(x))")))
df.printSchema()
root
|-- id: long (nullable = true)
|-- xs: array (nullable = true)
| |-- element: long (containsNull = true)
|-- xsinc: array (nullable = true)
| |-- element: integer (containsNull = true)
df.show() | ||
---|---|---|
id | xs | xsinc |
1 | [1, 2, 3] | [,,] |
当然,这里没有真正的性能提升潜力 - 它的调度BasePythonRunner应该与普通的开销相同udf。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。