我想在Spark scala Dataframe中获取重复记录。例如,我想根据3列(例如“id”,“name”,“age”)获取重复值。条件部分包含任何列数(动态输入)。基于列值,我想采取重复记录。
以下代码我试过了。我试过的只有一个属性。如果不止一列,我不知道该怎么做。
我的代码:
var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')
val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()
输入数据帧:(findDuplicateRecordsDF.show())
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
在这里,我将基于4列(身份证,姓名,电话,电子邮件)获取重复记录。以上是示例数据帧。原始数据框包含任何列数。
输出数据帧应该是
重复记录输出
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
--------------------------------------------------------
唯一记录数据框输出:
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
可以使用窗口功能。看一下这个
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false) | |||||
---|---|---|---|---|---|
id | name | age | phone | email_id | cnt |
4 | karthi | 26 | 4321066666 | karthi@gmail.com | 1 |
7 | ram | 27 | 8765432190 | ram@gmail.com | 1 |
9 | ram | 27 | 8765432130 | ram94@gmail.com | 1 |
3 | sam | 23 | 9876543210 | sam@yahoo.com | 2 |
3 | sam | 28 | 9876543210 | sam@yahoo.com | 2 |
6 | haris | 30 | 6543210777 | haris@gmail.com | 2 |
6 | haris | 24 | 6543210777 | haris@gmail.com | 2 |
scala> df2.createOrReplaceTempView("contact2")
//重复
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show | |||
---|---|---|---|
id | name | phone | email_id |
3 | sam | 9876543210 | sam@yahoo.com |
3 | sam | 9876543210 | sam@yahoo.com |
6 | haris | 6543210777 | haris@gmail.com |
6 | haris | 6543210777 | haris@gmail.com |
// 独特
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show | |||
---|---|---|---|
id | name | phone | email_id |
4 | karthi | 4321066666 | karthi@gmail.com |
7 | ram | 8765432190 | ram@gmail.com |
9 | ram | 8765432130 | ram94@gmail.com |
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。