接上篇:https://developer.aliyun.com/article/1622574?spm=a2c6h.13148508.setting.28.27ab4f0ehhuqRu
select相关
- 列的多种表示
- select
- selectExpr
启动 Spark-Shell 继续进行测试
// 这里注意 option("header", "true") 自动解析一下表头 val df1 = spark.read.option("header", "true").csv("/opt/wzk/data/people1.csv") // $ col() 等等 不可以混用!!!(有解决方法,但是建议不混用!!!) // 可以多种形式获取到列 df1.select($"name", $"age", $"job").show
执行结果如下图所示:
继续进行测试
df1.select("name", "age", "job").show(3) df1.select(col("name"), col("age"), col("job")).show(3) df1.select($"name", $"age"+1000, $"job").show(5)
运行结果如下图所示:
where相关
接着对上述内容进行测试:
df1.filter("age > 25").show df1.filter("age > 25 and name == 'wzk18'").show df1.where("age > 25").show df1.where("age > 25 and name == 'wzk19'").show
运行测试结果如下图:
groupBy相关
- groupBy
- agg
- max
- min
- avg
- sum
- count
进行测试:
// 由于我的字段中没有数值类型的,就不做测试了 df1.groupBy("Job").sum("sal").show df1.groupBy("Job").max("sal").show df1.groupBy("Job").min("sal").show df1.groupBy("Job").avg("sal").show df1.groupBy("Job").count.show df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"- >"avg", "sal"->"sum", "sal"->"count").show df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"- >"avg", "sal"->"sum", "sal"->"count").show
orderBy相关
orderBy == sort
df1.orderBy("name").show(5) df1.orderBy($"name".asc).show(5) df1.orderBy(-$"age").show(5)
运行测试的结果如下图所示:
继续进行测试:
df1.sort("age").show(3) df1.sort($"age".asc).show(3) df1.sort(col("age")).show(3)
测试结果如下图所示:
JOIN相关
// 笛卡尔积 df1.crossJoin(df1).count // 等值连接(单字段) df1.join(df1, "name").count // 等值连接(多字段) df1.join(df1, Seq("name", "age")).show
运行的测试结果如下图所示:
这里编写两个case:
// 第一个数据集 case class StudentAge(sno: Int, name: String, age: Int) val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19), StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20)) val ds1 = spark.createDataset(lst) // 第二个数据集 case class StudentHeight(sname: String, height: Int) val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160))) val ds2 = rdd.toDS
运行测试的结果如下图所示:
接下来我们进行连表操作:
// 连表操作 不可以使用 "name"==="sname" !!! ds1.join(ds2, 'name==='sname).show ds1.join(ds2, ds1("name")===ds2("sname")).show ds1.join(ds2, $"name"===$"sname").show ds1.join(ds2, $"name"===$"sname", "inner").show
测试的运行结果如下图所示:
集合相关
val ds3 = ds1.select("name") val ds4 = ds2.select("sname") // union 求并集、不去重 ds3.union(ds4).show // unionAll(过时了)与union等价 // intersect 求交 ds3.intersect(ds4).show // except 求差 ds3.except(ds4).show
运行结果如下图所示:
空值处理
math.sqrt(-1.0) math.sqrt(-1.0).inNaN() df1.show // 删除所有列的空值和NaN df1.na.drop.show // 删除某列的空值和NaN df1.na.drop(Array("xxx")).show // 对列进行填充 df1.na.fill(1000).show df1.na.fill(1000, Array("xxx")).show