1 一般操作:查找和过滤
1.1 读取数据源
1.1.1读取json
使用spark.read。注意:路径默认是从HDFS,如果要读取本机文件,需要加前缀file://,如下
scala> val people = spark.read.format("json").load("file:///opt/software/data/people.json") people: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> people.show +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
spark.read.format(“json”).load(“file:///opt/software/data/people.json”)
等价于spark.read.json(“file:///opt/software/data/people.json”)
如要要读取其它格式文件,只需修改format(“json”)即可,如format(“parquet”)
1.1.2 读取Hive表
使用spark.sql。其中hive数据库名default(默认数据库名可省略),表为people
scala> val peopleDF=spark.sql("select * from default.people") peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] scala> peopleDF.show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | wangwu| 33| beijing| | lisi| 28|shanghai| +--------+---+--------+ scala> peopleDF.printSchema root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- address: string (nullable = true)
1.2 取数据列
取列的三种方式如下
scala> peopleDF.select("name","age").show +--------+---+ | name|age| +--------+---+ |zhangsan| 22| | wangwu| 33| | lisi| 28| +--------+---+ scala> peopleDF.select($"name",$"age").show +--------+---+ | name|age| +--------+---+ |zhangsan| 22| | wangwu| 33| | lisi| 28| +--------+---+ scala> peopleDF.select(peopleDF.col("name"),peopleDF.col("age")).show +--------+---+ | name|age| +--------+---+ |zhangsan| 22| | wangwu| 33| | lisi| 28| +--------+---+
注意:如果在IDEA中编辑代码,使用, 则 必 须 增 加 语 句 : i m p o r t s p a r k . i m p l i c i t s . , 否 则 ,则必须增加语句:import spark.implicits._,否则,则必须增加语句:importspark.implicits.
,
否则表达式会报错。spark-shell默认已经导入了的
$”列名”这个是语法糖,返回Column对象
1.3 过滤算子filter(filter等价于where算子)
DF.col("id")等价于$"id"
,取列ColumnName
DF.filter("name=''")
过滤name等于空的行
DF.filter($"age" > 21).show()
过滤age大于21的行,必须增加语句:import spark.implicits._,
否则$表达式会报错
DF.filter($"age" === 21)
取等于时必须用===
,否则报错,对应的不等于是=!=
。等价于DF.filter("age=21")
DF.filter("substring(name,0,1) = 'M'").show 显示name以M开头的行,其中substring是functions.scala,functions.scala包含很多函数方法,等价于DF.filter("substr(name,0,1) = 'M'").show
scala> peopleDF.printSchema root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- address: string (nullable = true) scala> peopleDF.show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | wangwu| 33| beijing| | lisi| 28|shanghai| +--------+---+--------+ scala> peopleDF.filter($"name" === "wangwu").show +------+---+-------+ | name|age|address| +------+---+-------+ |wangwu| 33|beijing| +------+---+-------+ scala> peopleDF.filter($"name" =!= "wangwu").show +--------+---+--------+ | name|age| address| +--------+---+--------+ |zhangsan| 22| chengdu| | lisi| 28|shanghai| +--------+---+--------+ scala> peopleDF.filter("age > 30").show +------+---+-------+ | name|age|address| +------+---+-------+ |wangwu| 33|beijing| +------+---+-------+ scala> peopleDF.filter($"age" > 30).show +------+---+-------+ | name|age|address| +------+---+-------+ |wangwu| 33|beijing| +------+---+-------+