1、使用scala实现wordCount
scala> import scala.io.Source scala> val lines = Source.fromFile("/usr/local/src/badou_code/mr/mr_wc/The_Man_of_Property.txt").getLines lines: Iterator[String] = non-empty iterator Iterator it 这是一个迭代器 it.next(): 获取迭代器中下一个元素 it.hasNext():判断集合中是否还有元素 最简单采用while循环进行遍历 scala> val lines = Source.fromFile("/usr/local/src/badou_code/mr/mr_wc/The_Man_of_Property.txt").getLines.toList toList: 将上面迭代器中放入列表中进行返回 scala> lines.length 和 wc -l The_Man_of_Property.txt 返回的数据结果一致 res0: Int = 2866 需要对每一行的数据进行单词的切割(提取单词) // select split(sentence,' ') re // from tmp
理解Range:
定义:可以理解为一个序列
Range 就是区间类型
scala> val a = Range(0,5) [0,5) 步长是1 a: scala.collection.immutable.Range = Range(0, 1, 2, 3, 4)
等价于
val b = 0 until 5
包含起始
scala> val c = 1 to 5 <==> val d = 1.to(5)
Range转换为List:
a.toList val list1 = (1 to 10).toList
理解map:
scala> a.map(x=>x*2) <==> a.map(_*2) 对每个元素进行遍历操作 *2
理解Vector: 可以认为是保存数据的容器,也称为集合
1、创建Vector 对象
scala> val v1 =Vector(1,2,3)
获取 Vector元素 索引下标从0 开始
scala> println(v1(0))
2、Vector 遍历
scala> for(i<- v1) print(i+" ") 1 2 3
理解_:
作用是通配符
(1)集合中每一个元素
a.map(_*2)
(2)获取tuple中的元素
scala> val s = ("hello","badou") s: (String, String) = (hello,badou) s._1 s._2
(3) 导入所有包
import scala.collection.immutable.xxx 指定具体包
import scala.collection.immutable._
(4)初始化变量
val a=1 定义的变量不能被修改 , var可以修改
scala> var name:String=_ name: String = null scala> var score:Int=_ score: Int = 0
理解split:
scala> val s = "The Man of Property" scala> s.split(" ") res18: Array[String] = Array(The, Man, of, Property)
结合:
scala> lines.map(x=>x.split(" ")) scala> lines.map(_.split(" "))
返回的是List (Array(), Array*()...) ?
目标:将Array进行打平
理解flatten函数
scala> val s1 = List(Range(0,5), Range(0,5), Range(0,5)) scala> val s2 = s1.flatten s2: List[Int] = List(0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4)
每个元素遍历操作
s2.map(x=>x+2) s2.map(_+2)
直接针对s1进行处理
scala> s1.map(x=>x.map(x=>x*2)) scala> s1.map(_.map(_*2))
将Vector进行打散
scala> s1.flatMap(_.map(_*2))
等价于
scala> s1.map(_.map(_*2)).flatten res30: List[Int] = List(0, 2, 4, 6, 8, 0, 2, 4, 6, 8, 0, 2, 4, 6, 8)
映射到lines
map + flatten <==> flatMap scala> lines.map(x=>x.split(" ")).flatten scala> lines.flatMap(_.split(" ")) MR Map: scala> lines.flatMap(x=>x.split(" ")).map(x=>(x,1)) scala> lines.flatMap(_.split(" ")).map(x=>(x,1)) scala> lines.flatMap(_.split(" ")).map((_,1)) scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(_._1) res36: scala.collection.immutable.Map[String,List[(String, Int)]] = Map(forgotten -> List((forgotten,1), (forgotten,1), (forgotten,1))) 从tuple 中(forgotten,1) 获取第一个单词 forgotten 作为key
将整个tuple作为value,收集到一个List中
这样对应的value
_1: forgotten _2: List((forgotten,1), (forgotten,1), (forgotten,1))
整个list的大小,就是forgotten 出现的次数
scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(x=>(x._1, x._2.length))
等价于
scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(x=>(x._1, x._2.size))
理解数组求和的方式:
scala> val a1 = List((1,2), (3,4),(5,6)) scala> a1.map(_._2).sum scala> a1.map(_._2).reduce(_+_)
reduce(_+_)计算原理:
List(1,1,1) => ((1+1)+1) => sum +=x scala> lines.flatMap(_.split(" ")).map((_,1)) .groupBy(_._1) .map(x=>(x._1,x._2.map(_._2).sum))
等价于
scala> lines.flatMap(_.split(" ")).map((_,1)) .groupBy(_._1) .map(x=>(x._1,x._2.map(_._2) .reduce(_+_)))
需求点:
1、统计单词中出现的前N个,top N 获取数据,这里的N=10
slice(0,10) : 返回数组中有多少个数 sortBy():从小到大的排序,升序 scala> val a1=List((3,2),(1,0)) scala> a1.sortBy(_._2) scala> val a = Array((3,4),(5,0)) a: Array[(Int, Int)] = Array((3,4), (5,0))
降序:
scala> a1.sortBy(_._2).reverse <==> a1.sortWith(_._2 > _._2)
_._2 表示按照tuple中第二个元素进行排序
方式一:
scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(_._1) .map(x=>(x._1,x._2.size)) .toList.sortBy(_._2).reverse.slice(0,10)
方式二:
scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(_._1) .map(x=>(x._1,x._2.size)) .toList.sortWith(_._2 > _._2).slice(0,10)
方式三:
scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(_._1) .mapValues(_.size) .toArray.sortWith(_._2 > _._2).slice(0,10)
2、需求: 过滤掉标点符号, 只获取数字或者字符(去除特殊字符(正则))
python: import re p = re.compile(r'\w+') s = p.findall(s)[0] scala: scala> val p = "[0-9]+".r scala> val s = "avbdd123099" scala> p.findAllIn(s).toArray res68: Array[String] = Array(123099) scala> p.findAllIn(s).foreach(x=>println(x)) scala> p.findAllIn(s).foreach(println(_)) mkString 类似于 concat concat_ws scala> val p = "[0-9a-zA-Z]+".r scala> p.findAllIn(s).mkString("[","","]") scala> lines.flatMap(_.split(" ")).map(x=>(p.findAllIn(x).mkString(""),1)) .groupBy(_._1) .mapValues(_.size) .toArray .sortWith(_._2 > _._2) .slice(0,5)
需求List:
(1) 统计orders中produce的数量统计
-- 商品被购买的数据量
hive : group by count
spark:
scala> val orders=sql("select * from badou.orders") scala> val products=sql("select * from badou.products") scala> val priors=sql("select * from badou.priors") scala> priors.groupBy("product_id").count() res6: org.apache.spark.sql.DataFrame = [product_id: string, count: bigint]
方式一:
scala> res6.show(10)
方式二:
scala> priors.groupBy("product_id").count().show(10)
方式三:
scala> val proCnt = priors.groupBy("product_id").count() proCnt.show(10)
show(): 默认显示20条
show(10): 显示指定的条数
show(1,false) 显示的记录数 和针对字符过长进行格式化显示
方式四:
scala> priors.groupByKey(row=>{row.getString(1)}).count().show(10)
多个分组: groupBy("age","name")
常使用: groupBy
row : 就是一行数据
cache:
scala> val proCnt = priors.groupBy("product_id").count().cache 没有执行,只是加载到内存中
scala> proCnt.show(10) 原计划运行
proCnt.show(10) 直接内存中读取
scala> proCnt.unpersist 内存中直接移除
proCnt.show(10)
(2) 统计produce被 reordered的数量(再次购买)
product_id 进行group by , sum(reordered) 的值
场景:当一个商品被重复购买,重复购买的比率越高(这类商品可以理解为消耗品,抽纸,洗发水等等),那下一次购买的可能性很高
预测:购买这些商品的用户,下一次最容易购买哪些商品
filter: 针对集合中的元素进行过滤, <==> where
scala> orders.filter(col("eval_set")==="test").show(5) scala> orders.where(col("eval_set")==="test").show(5) scala> orders.filter(col("eval_set")==="test").filter(col("order_dow")==="1").show(10)
select: 进行列的方式处理
selectExpr: 处理字符串表达式,直接写SQL语句
scala> orders.select("*").show(10) scala> orders.select(col("order_id"), col("order_number")).show(10)
方式一:
scala> priors.selectExpr("product_id","cast(reordered as int)") .filter(col("reordered")===1) .groupBy("product_id").count()
方式二:
priors.selectExpr("product_id","cast(reordered as int)").filter(col("reordered")===1).groupBy("product_id").sum()
方式三:
scala> priors.selectExpr("product_id","cast(reordered as int)") .groupBy("product_id").sum("reordered")
方式四:
scala> priors.selectExpr("product_id","cast(reordered as int)") .groupBy("product_id").agg(sum("reordered"))
agg 一般搭配group by 这种聚合函数使用和sum区别,在一次聚合中可以统计多个值,sum, avg ,max,min
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)
(3) 结合上面的 统计被重复购买的比率 avg("reordered")
公式: 重复购买的商品量 / 总的商品量 => sum / count
字段重命名 hive as spark
scala> priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id") .agg(sum("reordered"),avg("reordered")) .withColumnRenamed("sum(reordered)","sum_re") .show(5)
a、重复购买的商品量
val productSumRe = priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).withColumnRenamed("sum(reordered)","sum_re")
b、总的商品量
val proCnt = priors.groupBy("product_id").count()
方式一: scala
priors.selectExpr("product_id","cast(reordered as int)").groupBy("product_id").agg(sum("reordered"),avg("reordered")).show(5)
方式二: SQL
scala> val jCnt = proCnt.join(productSumRe, "product_id") jCnt.selectExpr("*", "sum_re / count as mean_re").show(5)
方式三: udf
import org.apache.spark.sql.functions._ scala> val avg_udf = udf((sm:Long,cnt:Long)=>sm.toDouble/cnt.toDouble) scala> jCnt.withColumn("mean_re", avg_udf(col("sum_re"),col("count"))).show(5)