Spark案例实
在之前的学习中,我们已经学习了Spark的基础编程方式,接下来,我们看看在实际的工作中如何使用这些API实现具体的需求。这些需求是电商网站的真实需求,所以在实现功能前,咱们必须先将数据准备好。
上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:
数据文件中每行数据采用下划线分隔数据
每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
如果搜索关键字为null,表示数据不是搜索数据
如果点击的品类ID和产品ID为-1,表示数据不是点击数据
针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示
支付行为和下单行为类似
详细字段说明:
编号 |
字段名称 |
字段类型 |
字段含义 |
1 |
date |
String |
用户点击行为的日期 |
2 |
user_id |
Long |
用户的ID |
3 |
session_id |
String |
Session的ID |
4 |
page_id |
Long |
某个页面的ID |
5 |
action_time |
String |
动作的时间点 |
6 |
search_keyword |
String |
用户搜索的关键词 |
7 |
click_category_id |
Long |
某一个商品品类的ID |
8 |
click_product_id |
Long |
某一个商品的ID |
9 |
order_category_ids |
String |
一次订单中所有品类的ID集合 |
10 |
order_product_ids |
String |
一次订单中所有商品的ID集合 |
11 |
pay_category_ids |
String |
一次支付中所有品类的ID集合 |
12 |
pay_product_ids |
String |
一次支付中所有商品的ID集合 |
13 |
city_id |
Long |
城市 id |
样例类: //用户访问动作表 case class UserVisitAction( date: String,//用户点击行为的日期 user_id: Long,//用户的ID session_id: String,//Session的ID page_id: Long,//某个页面的ID action_time: String,//动作的时间点 search_keyword: String,//用户搜索的关键词 click_category_id: Long,//某一个商品品类的ID click_product_id: Long,//某一个商品的ID order_category_ids: String,//一次订单中所有品类的ID集合 order_product_ids: String,//一次订单中所有商品的ID集合 pay_category_ids: String,//一次支付中所有品类的ID集合 pay_product_ids: String,//一次支付中所有商品的ID集合 city_id: Long//城市 id ) 6.1 需求1:Top10热门品类
需求1:统计Top10热门品类
品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
例如,综合排名 = 点击数*20%+下单数*30%+支付数*50%
本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
方法一
package com.atguigu.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //将品类进行排序,并且取前10名 //点击数量排序,下单数量排序,支付数量排序 //元祖排序:先比较第一个,再比较第二个,再比较第三个,依次类推 object HotCategoryTop1 { def main(args: Array[String]): Unit = { // val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") // val sc: SparkContext = new SparkContext(sparkConf) // // 热门品类Top10 // //1. 读数据 // val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") // // 点击数 -> 下单数 -> 支付数 // // 统计品类的点击数量:(品类ID, 点击数) // val clickCountRDD: RDD[(String, Int)] = logRDD.filter( // line => { // val datas: Array[String] = line.split("_") // datas(6) != "-1" // } // ).map(action => { // val datas: Array[String] = action.split("_") // (datas(6), 1) // }).reduceByKey(_ + _) // //orderid=>1,2,3 // //[(1,1),(2,1),(3,1)] // // (品类ID, 下单数) // val ordderDataRDD: RDD[(String, Int)] = logRDD.filter( // line => { // val datas: Array[String] = line.split("_") // datas(8) != "null" // } // ).flatMap(line => { // val datas: Array[String] = line.split("_") // val ids: Array[String] = datas(8).split(",") // ids.map( // id => (id, 1) // ) // }).reduceByKey(_ + _) // // (品类ID, 支付数) // val payDataRDD: RDD[(String, Int)] = logRDD.filter( // line => { // val datas: Array[String] = line.split("_") // datas(10) != "null" // } // ).flatMap( // line => { // val datas: Array[String] = line.split("_") // val ids: Array[String] = datas(10).split(",") // ids.map( // id => (id, 1) // ) // } // ).reduceByKey(_ + _) // //(品类,(点击数量,下单数量,支付数量)) // // (品类ID, 点击数) (品类ID, 下单数) (品类ID, 支付数) => (品类ID, (点击数, 下单数, 支付数)) // //join zip leftOuterJoin cogroup // val copRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(ordderDataRDD, payDataRDD) // //偏函数的作用就是filter+map // //case // val analysisRDD = copRDD.mapValues { // case (clickIter, orderIter, payIter) => { // var clickCnt = 0 // val iter1: Iterator[Int] = clickIter.iterator // if (iter1.hasNext) { // clickCnt = iter1.next() // } // var orderCnt = 0 // val iter2: Iterator[Int] = orderIter.iterator // if (iter2.hasNext) { // orderCnt = iter2.next() // } // var payCnt = 0 // val iter3: Iterator[Int] = payIter.iterator // if (iter3.hasNext) { // payCnt = iter3.next() // } // (clickCnt, orderCnt, payCnt) // } // // } // val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, false).take(10) // resultRDD.collect(println) // sc.stop() val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) // 热门品类Top10 //1. 读数据 val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") // 点击数 -> 下单数 -> 支付数 // (品类ID, 点击数) val clickDataRDD: RDD[(String, Int)] = logRDD.filter( line => { val datas: Array[String] = line.split("_") datas(6) != "-1" } ).map( line => { val datas: Array[String] = line.split("_") (datas(6), 1) } ).reduceByKey(_ + _) // (品类ID, 下单数) val orderDataRDD: RDD[(String, Int)] = logRDD.filter( line => { val datas: Array[String] = line.split("_") datas(8) != "null" } ).flatMap( line => { val datas: Array[String] = line.split("_") val ids: Array[String] = datas(8).split(",") ids.map( id => (id, 1) ) } ).reduceByKey(_ + _) // (品类ID, 支付数) val payDataRDD: RDD[(String, Int)] = logRDD.filter( line => { val datas: Array[String] = line.split("_") datas(10) != "null" } ).flatMap( line => { val datas: Array[String] = line.split("_") val ids: Array[String] = datas(10).split(",") ids.map( id => (id, 1) ) } ).reduceByKey(_ + _) // (品类ID, 点击数) (品类ID, 下单数) (品类ID, 支付数) => (品类ID, (点击数, 下单数, 支付数)) //cogroup val copRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickDataRDD.cogroup(orderDataRDD,payDataRDD) // 转换结构 val copToSumRDD: RDD[(String, (Int, Int, Int))] = copRDD.mapValues { case (iter1, iter2, iter3) => { //var clickCount: Int = 0 //var orderCount: Int = 0 //var payCount: Int = 0 (iter1.sum, iter2.sum, iter3.sum) } } val result: Array[(String, (Int, Int, Int))] = copToSumRDD.sortBy(_._2,false).take(10) result.foreach(println) sc.stop() } }
方法二
package com.atguigu.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object HotCategoryTop2 { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) // 热门品类Top10 //1. 读数据 val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") logRDD.cache() // 点击数 -> 下单数 -> 支付数 // (品类ID, 点击数) val clickDataRDD: RDD[(String, Int)] = logRDD.filter( line => { val datas: Array[String] = line.split("_") datas(6) != "-1" } ).map( line => { val datas: Array[String] = line.split("_") (datas(6), 1) } ).reduceByKey(_ + _) // (品类ID, 下单数) val orderDataRDD: RDD[(String, Int)] = logRDD.filter( line => { val datas: Array[String] = line.split("_") datas(8) != "null" } ).flatMap( line => { val datas: Array[String] = line.split("_") val ids: Array[String] = datas(8).split(",") ids.map( id => (id, 1) ) } ).reduceByKey(_ + _) // (品类ID, 支付数) val payDataRDD: RDD[(String, Int)] = logRDD.filter( line => { val datas: Array[String] = line.split("_") datas(10) != "null" } ).flatMap( line => { val datas: Array[String] = line.split("_") val ids: Array[String] = datas(10).split(",") ids.map( id => (id, 1) ) } ).reduceByKey(_ + _) // (品类ID, 点击数) => (品类ID , (点击数 , 0 , 0) ) // (品类ID, 下单数) => (品类ID , (0,下单数 , 0) ) // (品类ID, 支付数) => (品类ID , (0, 0, 支付数) ) val clickToCop: RDD[(String, (Int, Int, Int))] = clickDataRDD.map { case (cid, clickCount) => (cid, (clickCount, 0, 0)) } val orderToCop: RDD[(String, (Int, Int, Int))] = orderDataRDD.map { case (cid, orderCount) => (cid, (0, orderCount, 0)) } val payToCop: RDD[(String, (Int, Int, Int))] = payDataRDD.map { case (cid, payCount) => (cid, (0, 0, payCount)) } //union:将三个数据源合并在一起,统一进行聚合计算 val cidCop: RDD[(String, (Int, Int, Int))] = clickToCop.union(orderToCop).union(payToCop) //cidCop.collect().foreach(println) val cidCopCount: RDD[(String, (Int, Int, Int))] = cidCop.reduceByKey( (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3) } ) val result: Array[(String, (Int, Int, Int))] = cidCopCount.sortBy(_._2, false).take(10) result.foreach(println) sc.stop() } }
方法三
package com.atguigu.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //前面一种存在大量的shuffle操作(reduceByKey) //reduceByKey聚合算子,spark会提供优化,缓存 object HotCategoryTop3 { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) // 热门品类Top10 //1. 读数据 val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") //2.将数据转换结构 // 点击数 -> 下单数 -> 支付数 // 点击的一条数据line => (品类ID, (1,0,0 )) // 下单的一条数据line => (品类ID, (0,1,0 )) // 支付的一条数据line => (品类ID, (0,0,1 )) val lineToCop: RDD[(String, (Int, Int, Int))] = logRDD.flatMap( line => { //判断数据的类型 val datas: Array[String] = line.split("_") if (datas(6) != "-1") { //点击数据 List((datas(6), (1, 0, 0))) } else if (datas(8) != "null") { //下单数据 val ids: Array[String] = datas(8).split(",") ids.map( id => (id, (0, 1, 0)) ) } else if (datas(10) != "null") { //支付数据 val ids: Array[String] = datas(10).split(",") ids.map( id => (id, (0, 0, 1)) ) } else { Nil } } ) //将相同的品类ID的数据进行分组聚合 //(品类ID,(点击数量,下单数量,支付数量)) val copCount: RDD[(String, (Int, Int, Int))] = lineToCop.reduceByKey( (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3) } ) val result: Array[(String, (Int, Int, Int))] = copCount.sortBy(_._2, false).take(10) result.foreach(println) sc.stop() } }
方法四
package com.atguigu.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object HotCategoryTop4 { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") // TODO 分别统计每个品类点击的次数,下单的次数和支付的次数: (品类,点击总数)(品类,下单总数)(品类,支付总数) val accumulator = new MyAccumulator sc.register(accumulator, "demand") // 第二种:使用累加器来避免shuffle logRDD.foreach( value => { val values: Array[String] = value.split("_") if (values(6) != "-1") { accumulator.add(values(6), "click") } else if (values(8) != "null") { val ids: Array[String] = values(8).split(",") ids.foreach( id => accumulator.add(id, "order") ) } else if (values(10) != "null") { val ids: Array[String] = values(10).split(",") ids.foreach( id => accumulator.add(id, "pay") ) } } ) // 要自定义比较规则 accumulator.value.map(_._2).toList.sortWith( (l, r) => { if (l.clickCnt > r.clickCnt) { true } else if (l.clickCnt == r.clickCnt) { if (l.orderCnt > r.orderCnt) { true } else if (l.orderCnt == r.orderCnt) { if (l.payCnt > r.payCnt) { true } else false } else false } else false } ).take(10).foreach(println) sc.stop() } // 辅助类 case class UserVisitAction(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int) // 自定义累加器 /** * 自定义累加器 * 1. 继承 AccumulatorV2 * 2. 确定泛型: * IN: CategoryInfo * * OUT:mutable.Map[String, CategoryInfo]() * 3. 重写方法 */ class MyAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, UserVisitAction]] { private var values: mutable.Map[String, UserVisitAction] = mutable.Map[String, UserVisitAction]() // 累加器是否为初始状态 override def isZero: Boolean = values.isEmpty // 复制累加器 override def copy(): AccumulatorV2[(String, String), mutable.Map[String, UserVisitAction]] = { new MyAccumulator() } // 重置累加器 override def reset(): Unit = values.clear() // 向累加器中添加数据 override def add(v: (String, String)): Unit = { val cid: String = v._1 val action: String = v._2 // 获取品类对应的UserVisitAction数据,从而来实现对相应操作的累加赋值 val action1: UserVisitAction = values.getOrElse(cid, UserVisitAction(cid, 0, 0, 0)) if (action == "click") { action1.clickCnt += 1 } else if (action == "order") { action1.orderCnt += 1 } else if (action == "pay") { action1.payCnt += 1 } // 数据一定要进行更新 values.update(cid, action1) } // 合并累加器 override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, UserVisitAction]]): Unit = { // 你将要输出的map val map1 = this.values // 相对于输出的map以外的来自其他task任务返回的map val map2 = other.value // 对结果进行merge合并 map2.foreach { case (cid, use) => { val action: UserVisitAction = map1.getOrElse(cid, UserVisitAction(cid, 0, 0, 0)) action.clickCnt += use.clickCnt action.orderCnt += use.orderCnt action.payCnt += use.payCnt // 一定要记得对数据更新到你要输出的map中去 map1.update(cid, action) } } } // 返回累加器的结果 override def value: mutable.Map[String, UserVisitAction] = values } }
需求2:Top10热门品类中每个品类的Top10活跃Session统计
需求说明
在需求一的基础上,增加每个品类用户session的点击统计
package com.atguigu.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} object HotCategoryTop10Session { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") logRDD.cache() val top10ds: Array[String] = top10Category(logRDD) //1.过滤原始数据,保留点击和前10品类ID val filterActionRDD = logRDD.filter( action => { val datas: Array[String] = action.split("_") if (datas(6) != "-1") { top10ds.contains(datas(6)) } else { false } } ) //2.根据品类ID和Sessionid进行点击量的统计 val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map( action => { val datas: Array[String] = action.split("_") ((datas(6), datas(2)), 1) } ).reduceByKey(_ + _) //3.将统计的结果进行结构转化 //((品类ID,sessionId),sum)=>(品类ID,(sessionId,sum)) val mapRDD: RDD[(String, (String, Int))] = reduceRDD.map { case ((cid, sid), sum) => { (cid, (sid, sum)) } } //相同的品类进行分组 val grouupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey() //5.将分组后的数据进行点击量的排序,取前十名 val resultRDD: RDD[(String, List[(String, Int)])] = grouupRDD.mapValues( iter => { iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10) } )1 resultRDD.collect().foreach(println) sc.stop() } def top10Category(logRDD: RDD[String]): Array[String] = { val lineToCop: RDD[(String, (Int, Int, Int))] = logRDD.flatMap( line => { val datas: Array[String] = line.split("_") if (datas(6) != "-1") { List((datas(6), (1, 0, 0))) } else if (datas(8) != "null") { val ids: Array[String] = datas(8).split(",") ids.map( id => (id, (0, 1, 0)) ) } else if (datas(10) != "null") { val ids: Array[String] = datas(10).split(",") ids.map( id => (id, (0, 0, 1)) ) } else { Nil } } ) val copCount: RDD[(String, (Int, Int, Int))] = lineToCop.reduceByKey( (t1, t2) => { (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3) } ) copCount.sortBy(_._2, false).take(10).map(_._1) } }
需求3:页面单跳转换率统计
需求说明
1)页面单跳转化率
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
2)统计页面单跳转化率意义
产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
数据分析师,可以此数据做更深一步的计算和分析。
企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略
或策略。
图解
package com.atguigu.rdd.builder import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object Spark_Req_pageFlow { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) // 热门品类Top10 //1. 读数据 val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt") // 将每条数据封装成一个对象 val userVisitRDD: RDD[UserVisitAction] = logRDD.map( line => { UserVisitAction.lineToUserVisitAction(line) } ) //求分母 val pageCount: RDD[(String, Int)] = userVisitRDD.map(uservisit => (uservisit.page_id, 1)).reduceByKey(_ + _) val pageSum: Map[String, Int] = pageCount.collect().toMap val pageSumBC: Broadcast[Map[String, Int]] = sc.broadcast(pageSum) //求分子 //根据session进行分组 val sessionGroup: RDD[(String, Iterable[UserVisitAction])] = userVisitRDD.groupBy(_.session_id) //分组后,根据访问时间进行排序(升序) val pageToPageToOneRDD: RDD[(String, List[((String, String), Int)])] = sessionGroup.mapValues( iter => { val actions: List[UserVisitAction] = iter.toList.sortBy(_.action_time) //[1,2,3,4] //Sliding:滑窗 //[1,2],[2,3],[3,4] //[1-2,2-3,3-4] //zip:拉链 //[1,2,3,4] //[2,3,4] pages.tail val pages: List[String] = actions.map(_.page_id) val pageToPage: List[(String, String)] = pages.zip(pages.tail) pageToPage.map { case (prevPage, nextPage) => ((prevPage, nextPage), 1) } } ) //((1,2),1)=>((1,2),sum) val pageToPageCount: RDD[((String, String), Int)] = pageToPageToOneRDD.map(_._2).flatMap(list => list).reduceByKey(_ + _) //求单跳转换率 pageToPageCount.foreach { case ((prevPage, nextPage), count) => { val pageView: Int = pageSumBC.value.getOrElse(prevPage, 1) println(s"[$prevPage - $nextPage] 的单跳转换率为: " + count.toDouble / pageView * 100 + " %") } } } //用户访问动作表 case class UserVisitAction( date: String, //用户点击行为的日期 user_id: String, //用户的ID session_id: String, //Session的ID page_id: String, //某个页面的ID action_time: String, //动作的时间点 search_keyword: String, //用户搜索的关键词 click_category_id: String, //某一个商品品类的ID click_product_id: String, //某一个商品的ID order_category_ids: String, //一次订单中所有品类的ID集合 order_product_ids: String, //一次订单中所有商品的ID集合 pay_category_ids: String, //一次支付中所有品类的ID集合 pay_product_ids: String, //一次支付中所有商品的ID集合 city_id: String //城市 id ) object UserVisitAction { def lineToUserVisitAction(line: String): UserVisitAction = { val datas: Array[String] = line.split("_") UserVisitAction( datas(0), datas(1), datas(2), datas(3), datas(4), datas(5), datas(6), datas(7), datas(8), datas(9), datas(10), datas(11), datas(12) ) } } }
SparkCore-工程代码
WordCountApplication
package faramework.application import faramework.common.TApplication import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import faramework.controller.WordCountController import org.apache.spark.SparkConf object WordCountApplication extends App with TApplication{ //启动应用程序 start(){ val controller = new WordCountController() controller.dispatch() } }
package faramework.common import faramework.controller.WordCountController import faramework.util.EnvUtil import org.apache.spark.{SparkConf, SparkContext} trait TApplication { def start(master:String="local[*]",app:String="Application")(op : =>Unit):Unit={ val sparkConf: SparkConf = new SparkConf().setMaster(master).setAppName(app) // 1.创建Spark上下文环境对象(连接对象) // val context = new SparkContext(wordCount) val sc: SparkContext = new SparkContext(sparkConf) EnvUtil.put(sc) try { op }catch { case ex=>println(ex.getMessage) } sc.stop() EnvUtil.clear() } }
package faramework.common trait TController { def dispatch():Unit }
package faramework.common import faramework.util.EnvUtil trait TDao { def readFile(path: String) = { EnvUtil.take().textFile(path) } }
package faramework.common trait TService { def dataAnalysis():Any }
package faramework.controller import faramework.common.TController import faramework.service.WordCountService class WordCountController extends TController{ private val WordCountService=new WordCountService() def dispatch():Unit={ val array=WordCountService.dataAnalysis() array.foreach(println) } }
package faramework.dao import faramework.common.TDao import org.apache.spark.rdd.RDD //持久层 class WordCountDao extends TDao { }
package faramework.service import faramework.common.TService import faramework.dao.WordCountDao import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //服务层 class WordCountService extends TService{ private val wordCountDao=new WordCountDao(); def dataAnalysis()={ // 读取文件数据,获取一行一行数据 val lines: RDD[String] = wordCountDao.readFile("F:\\SparkCore代码\\Spark-core\\input\\1.txt") // 将文件中的数据进行分词,扁平化 // val word: RDD[String] = lines.flatMap(words => words.split(" ")) // val word: RDD[String] = lines.flatMap(words => words.split(" ")) // val words: RDD[String] = lines.flatMap(_.split(" ")) val words: RDD[String] = lines.flatMap(_.split(" ")) //3.将数据根据单词进行分组,便于统计 //(hello,hello,hello),(world,world) val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word) //4.对分组后的数据进行转化 //(hello,3) (world,2) // val wordTocout: RDD[(String, Int)] = wordGroup.map(kv => { // (kv._1, kv._2.size) // }) var wordTocout = wordGroup.map { case (word, list) => { (word, list.size) } } // var wordToCount=wordGroup.map{ // case(word,list)=>{ // (word,list.size) // } // } //5.将转化结果采集到控制台打印 val array: Array[(String, Int)] = wordTocout.collect() array } }
package faramework.util import org.apache.spark.SparkContext object EnvUtil { private val scLocal=new ThreadLocal[SparkContext]() def put(sc:SparkContext):Unit={ scLocal.set(sc) } def take():SparkContext={ scLocal.get() } def clear():Unit={ scLocal.remove() } }