我有一个数据框如下:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import spark.implicits._
// some data...
val df = Seq(
(1, "AA", "BB", ("AA", "BB")),
(2, "AA", "BB", ("AA", "BB")),
(3, "AB", "BB", ("AB", "BB"))
).toDF("id","name", "surname", "array")
df.show()
我期待计算连续行中“数组”列之间的编辑距离。作为示例,我想计算第1列(“AA”,“BB”)中的“数组”实体与第2列(“AA”,“BB”)中的“数组”实体之间的编辑距离。这是我正在使用的编辑距离函数:
def editDist2A: Int = {
val startRow = (0 to b.size).toList
a.foldLeft(startRow) { (prevRow, aElem) =>
(prevRow.zip(prevRow.tail).zip(b)).scanLeft(prevRow.head + 1) {
case (left, ((diag, up), bElem)) => {
val aGapScore = up + 1
val bGapScore = left + 1
val matchScore = diag + (if (aElem == bElem) 0 else 1)
List(aGapScore, bGapScore, matchScore).min
}
}
}.last
}
我知道我需要为这个函数创建一个UDF,但似乎无法做到。如果我按原样使用该函数并使用Spark Windowing来获取前一行:
// creating window - ordered by ID
val window = Window.orderBy("id")
// using the window with lag function to compare to previous value in each column
df.withColumn("edit-d", editDist2(($"array"), lag("array", 1).over(window))).show()
我收到以下错误:
:245: error: type mismatch;
found : org.apache.spark.sql.ColumnName
required: Iterable[?]
df.withColumn("edit-d", editDist2(($"array"), lag("array", 1).over(window))).show()
可以使用Spark自己的levenshtein功能。此函数需要两个字符串进行比较,因此不能与数组一起使用。
// creating window - ordered by ID
val window = Window.orderBy("id")
// using the window with lag function to compare to previous value in each column
df.withColumn("edit-d", levenshtein(($"name"), lag("name", 1).over(window)) + levenshtein(($"surname"), lag("surname", 1).over(window))).show()
给出所需的输出:
id | name | surname | array | edit-d |
---|---|---|---|---|
1 | AA | BB | [AA, BB] | null |
2 | AA | BB | [AA, BB] | 0 |
3 | AB | BB | [AB, BB] | 1 |
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。