Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码

Spark案例实

在之前的学习中,我们已经学习了Spark的基础编程方式,接下来,我们看看在实际的工作中如何使用这些API实现具体的需求。这些需求是电商网站的真实需求,所以在实现功能前,咱们必须先将数据准备好。

image.png

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:

 数据文件中每行数据采用下划线分隔数据

 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种

 如果搜索关键字为null,表示数据不是搜索数据

 如果点击的品类ID和产品ID为-1,表示数据不是点击数据

 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示

 支付行为和下单行为类似

详细字段说明:

编号

字段名称

字段类型

字段含义

1

date

String

用户点击行为的日期

2

user_id

Long

用户的ID

3

session_id

String

Session的ID

4

page_id

Long

某个页面的ID

5

action_time

String

动作的时间点

6

search_keyword

String

用户搜索的关键词

7

click_category_id

Long

某一个商品品类的ID

8

click_product_id

Long

某一个商品的ID

9

order_category_ids

String

一次订单中所有品类的ID集合

10

order_product_ids

String

一次订单中所有商品的ID集合

11

pay_category_ids

String

一次支付中所有品类的ID集合

12

pay_product_ids

String

一次支付中所有商品的ID集合

13

city_id

Long

城市 id

样例类:
//用户访问动作表
case class UserVisitAction(
    date: String,//用户点击行为的日期
    user_id: Long,//用户的ID
    session_id: String,//Session的ID
    page_id: Long,//某个页面的ID
    action_time: String,//动作的时间点
    search_keyword: String,//用户搜索的关键词
    click_category_id: Long,//某一个商品品类的ID
    click_product_id: Long,//某一个商品的ID
    order_category_ids: String,//一次订单中所有品类的ID集合
    order_product_ids: String,//一次订单中所有商品的ID集合
    pay_category_ids: String,//一次支付中所有品类的ID集合
    pay_product_ids: String,//一次支付中所有商品的ID集合
    city_id: Long//城市 id
)
6.1 需求1:Top10热门品类

需求1:统计Top10热门品类

品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

点击数 下单数  支付数

衣服 点击数 下单数  支付数

电脑 点击数 下单数  支付数

例如,综合排名 = 点击数*20%+下单数*30%+支付数*50%

本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。

方法一

package com.atguigu.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//将品类进行排序,并且取前10名
//点击数量排序,下单数量排序,支付数量排序
//元祖排序:先比较第一个,再比较第二个,再比较第三个,依次类推
object HotCategoryTop1 {
  def main(args: Array[String]): Unit = {
//    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
//    val sc: SparkContext = new SparkContext(sparkConf)
//    // 热门品类Top10
//    //1. 读数据
//    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")
//    // 点击数 -> 下单数  -> 支付数
//    // 统计品类的点击数量:(品类ID, 点击数)
//    val clickCountRDD: RDD[(String, Int)] = logRDD.filter(
//      line => {
//        val datas: Array[String] = line.split("_")
//        datas(6) != "-1"
//      }
//    ).map(action => {
//      val datas: Array[String] = action.split("_")
//      (datas(6), 1)
//    }).reduceByKey(_ + _)
//    //orderid=>1,2,3
//    //[(1,1),(2,1),(3,1)]
//    // (品类ID, 下单数)
//    val ordderDataRDD: RDD[(String, Int)] = logRDD.filter(
//      line => {
//        val datas: Array[String] = line.split("_")
//        datas(8) != "null"
//      }
//    ).flatMap(line => {
//      val datas: Array[String] = line.split("_")
//      val ids: Array[String] = datas(8).split(",")
//      ids.map(
//        id => (id, 1)
//      )
//    }).reduceByKey(_ + _)
//    // (品类ID, 支付数)
//    val payDataRDD: RDD[(String, Int)] = logRDD.filter(
//      line => {
//        val datas: Array[String] = line.split("_")
//        datas(10) != "null"
//      }
//    ).flatMap(
//      line => {
//        val datas: Array[String] = line.split("_")
//        val ids: Array[String] = datas(10).split(",")
//        ids.map(
//          id => (id, 1)
//        )
//      }
//    ).reduceByKey(_ + _)
//    //(品类,(点击数量,下单数量,支付数量))
//    // (品类ID, 点击数)  (品类ID, 下单数)  (品类ID, 支付数)  => (品类ID, (点击数,  下单数, 支付数))
//    //join zip leftOuterJoin cogroup
//    val copRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRDD.cogroup(ordderDataRDD, payDataRDD)
//    //偏函数的作用就是filter+map
//    //case
//    val analysisRDD = copRDD.mapValues {
//      case (clickIter, orderIter, payIter) => {
//        var clickCnt = 0
//        val iter1: Iterator[Int] = clickIter.iterator
//        if (iter1.hasNext) {
//          clickCnt = iter1.next()
//        }
//        var orderCnt = 0
//        val iter2: Iterator[Int] = orderIter.iterator
//        if (iter2.hasNext) {
//          orderCnt = iter2.next()
//        }
//        var payCnt = 0
//        val iter3: Iterator[Int] = payIter.iterator
//        if (iter3.hasNext) {
//          payCnt = iter3.next()
//        }
//        (clickCnt, orderCnt, payCnt)
//      }
//
//    }
//    val resultRDD: Array[(String, (Int, Int, Int))] = analysisRDD.sortBy(_._2, false).take(10)
//    resultRDD.collect(println)
//    sc.stop()

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    // 热门品类Top10
    //1. 读数据
    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")

    // 点击数 -> 下单数  -> 支付数
    // (品类ID, 点击数)
    val clickDataRDD: RDD[(String, Int)] = logRDD.filter(
      line => {
        val datas: Array[String] = line.split("_")
        datas(6) != "-1"
      }
    ).map(
      line => {
        val datas: Array[String] = line.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    // (品类ID, 下单数)
    val orderDataRDD: RDD[(String, Int)] = logRDD.filter(
      line => {
        val datas: Array[String] = line.split("_")
        datas(8) != "null"
      }
    ).flatMap(
      line => {
        val datas: Array[String] = line.split("_")
        val ids: Array[String] = datas(8).split(",")
        ids.map(
          id => (id, 1)
        )
      }
    ).reduceByKey(_ + _)

    // (品类ID, 支付数)
    val payDataRDD: RDD[(String, Int)] = logRDD.filter(
      line => {
        val datas: Array[String] = line.split("_")
        datas(10) != "null"
      }
    ).flatMap(
      line => {
        val datas: Array[String] = line.split("_")
        val ids: Array[String] = datas(10).split(",")
        ids.map(
          id => (id, 1)
        )
      }
    ).reduceByKey(_ + _)

    // (品类ID, 点击数)  (品类ID, 下单数)  (品类ID, 支付数)  => (品类ID, (点击数,  下单数, 支付数))

    //cogroup
    val copRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
      clickDataRDD.cogroup(orderDataRDD,payDataRDD)

    // 转换结构
    val copToSumRDD: RDD[(String, (Int, Int, Int))] = copRDD.mapValues {
      case (iter1, iter2, iter3) => {
        //var clickCount: Int = 0
        //var orderCount: Int = 0
        //var payCount: Int = 0
        (iter1.sum, iter2.sum, iter3.sum)
      }
    }

    val result: Array[(String, (Int, Int, Int))] = copToSumRDD.sortBy(_._2,false).take(10)
    result.foreach(println)

    sc.stop()

  }

}

image.png

方法二

package com.atguigu.rdd.builder
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object HotCategoryTop2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    // 热门品类Top10
    //1. 读数据
    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")
    logRDD.cache()

    // 点击数 -> 下单数  -> 支付数
    // (品类ID, 点击数)
    val clickDataRDD: RDD[(String, Int)] = logRDD.filter(
      line => {
        val datas: Array[String] = line.split("_")
        datas(6) != "-1"
      }
    ).map(
      line => {
        val datas: Array[String] = line.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    // (品类ID, 下单数)
    val orderDataRDD: RDD[(String, Int)] = logRDD.filter(
      line => {
        val datas: Array[String] = line.split("_")
        datas(8) != "null"
      }
    ).flatMap(
      line => {
        val datas: Array[String] = line.split("_")
        val ids: Array[String] = datas(8).split(",")
        ids.map(
          id => (id, 1)
        )
      }
    ).reduceByKey(_ + _)

    // (品类ID, 支付数)
    val payDataRDD: RDD[(String, Int)] = logRDD.filter(
      line => {
        val datas: Array[String] = line.split("_")
        datas(10) != "null"
      }
    ).flatMap(
      line => {
        val datas: Array[String] = line.split("_")
        val ids: Array[String] = datas(10).split(",")
        ids.map(
          id => (id, 1)
        )
      }
    ).reduceByKey(_ + _)


    // (品类ID, 点击数)  => (品类ID , (点击数 , 0 , 0) )
    // (品类ID, 下单数)  => (品类ID , (0,下单数 , 0)  )
    // (品类ID, 支付数)  => (品类ID , (0, 0, 支付数)  )

    val clickToCop: RDD[(String, (Int, Int, Int))] = clickDataRDD.map {
      case (cid, clickCount) => (cid, (clickCount, 0, 0))
    }

    val orderToCop: RDD[(String, (Int, Int, Int))] = orderDataRDD.map {
      case (cid, orderCount) => (cid, (0, orderCount, 0))
    }

    val payToCop: RDD[(String, (Int, Int, Int))] = payDataRDD.map {
      case (cid, payCount) => (cid, (0, 0, payCount))
    }

    //union:将三个数据源合并在一起,统一进行聚合计算
    val cidCop: RDD[(String, (Int, Int, Int))] = clickToCop.union(orderToCop).union(payToCop)

    //cidCop.collect().foreach(println)

    val cidCopCount: RDD[(String, (Int, Int, Int))] = cidCop.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )
    val result: Array[(String, (Int, Int, Int))] = cidCopCount.sortBy(_._2, false).take(10)

    result.foreach(println)


    sc.stop()
  }

}

image.png

方法三

package com.atguigu.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//前面一种存在大量的shuffle操作(reduceByKey)
//reduceByKey聚合算子,spark会提供优化,缓存
object HotCategoryTop3 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    // 热门品类Top10
    //1. 读数据
    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")
    //2.将数据转换结构
    // 点击数 -> 下单数  -> 支付数
    // 点击的一条数据line =>  (品类ID, (1,0,0 ))
    // 下单的一条数据line =>  (品类ID, (0,1,0 ))
    // 支付的一条数据line =>  (品类ID, (0,0,1 ))

    val lineToCop: RDD[(String, (Int, Int, Int))] = logRDD.flatMap(
      line => {
        //判断数据的类型
        val datas: Array[String] = line.split("_")
        if (datas(6) != "-1") {
          //点击数据
          List((datas(6), (1, 0, 0)))

        } else if (datas(8) != "null") {
          //下单数据
          val ids: Array[String] = datas(8).split(",")
          ids.map(
            id => (id, (0, 1, 0))
          )

        } else if (datas(10) != "null") {
          //支付数据
          val ids: Array[String] = datas(10).split(",")
          ids.map(
            id => (id, (0, 0, 1))
          )
        } else {
          Nil
        }

      }
    )
    //将相同的品类ID的数据进行分组聚合
    //(品类ID,(点击数量,下单数量,支付数量))

    val copCount: RDD[(String, (Int, Int, Int))] = lineToCop.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )
    val result: Array[(String, (Int, Int, Int))] = copCount.sortBy(_._2, false).take(10)
    result.foreach(println)

    sc.stop()


  }

}

image.png

方法四

package com.atguigu.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object HotCategoryTop4 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")
    // TODO 分别统计每个品类点击的次数,下单的次数和支付的次数: (品类,点击总数)(品类,下单总数)(品类,支付总数)
    val accumulator = new MyAccumulator
    sc.register(accumulator, "demand")
    
    // 第二种:使用累加器来避免shuffle
    logRDD.foreach(
      value => {
        val values: Array[String] = value.split("_")
        if (values(6) != "-1") {
          accumulator.add(values(6), "click")
        } else if (values(8) != "null") {
          val ids: Array[String] = values(8).split(",")
          ids.foreach(
            id => accumulator.add(id, "order")
          )
        } else if (values(10) != "null") {
          val ids: Array[String] = values(10).split(",")
          ids.foreach(
            id => accumulator.add(id, "pay")
          )
        }
      }
    )

    // 要自定义比较规则
    accumulator.value.map(_._2).toList.sortWith(
      (l, r) => {
        if (l.clickCnt > r.clickCnt) {
          true
        } else if (l.clickCnt == r.clickCnt) {
          if (l.orderCnt > r.orderCnt) {
            true
          } else if (l.orderCnt == r.orderCnt) {
            if (l.payCnt > r.payCnt) {
              true
            } else false
          } else false
        } else false
      }
    ).take(10).foreach(println)


    sc.stop()
  }

  // 辅助类
  case class UserVisitAction(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

  // 自定义累加器

  /**
   * 自定义累加器
   * 1. 继承 AccumulatorV2
   * 2. 确定泛型:
   * IN: CategoryInfo
   *
   * OUT:mutable.Map[String, CategoryInfo]()
   * 3. 重写方法
   */
  class MyAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, UserVisitAction]] {

    private var values: mutable.Map[String, UserVisitAction] = mutable.Map[String, UserVisitAction]()

    // 累加器是否为初始状态
    override def isZero: Boolean = values.isEmpty

    // 复制累加器
    override def copy(): AccumulatorV2[(String, String), mutable.Map[String, UserVisitAction]] = {
      new MyAccumulator()
    }

    // 重置累加器
    override def reset(): Unit = values.clear()

    // 向累加器中添加数据
    override def add(v: (String, String)): Unit = {

      val cid: String = v._1
      val action: String = v._2

      // 获取品类对应的UserVisitAction数据,从而来实现对相应操作的累加赋值
      val action1: UserVisitAction = values.getOrElse(cid, UserVisitAction(cid, 0, 0, 0))
      if (action == "click") {
        action1.clickCnt += 1
      } else if (action == "order") {
        action1.orderCnt += 1
      } else if (action == "pay") {
        action1.payCnt += 1
      }
      // 数据一定要进行更新
      values.update(cid, action1)
    }

    // 合并累加器
    override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, UserVisitAction]]): Unit = {

      // 你将要输出的map
      val map1 = this.values
      // 相对于输出的map以外的来自其他task任务返回的map
      val map2 = other.value

      // 对结果进行merge合并
      map2.foreach {
        case (cid, use) => {
          val action: UserVisitAction = map1.getOrElse(cid, UserVisitAction(cid, 0, 0, 0))
          action.clickCnt += use.clickCnt
          action.orderCnt += use.orderCnt
          action.payCnt += use.payCnt

          // 一定要记得对数据更新到你要输出的map中去
          map1.update(cid, action)
        }
      }
    }
    // 返回累加器的结果
    override def value: mutable.Map[String, UserVisitAction] = values
  }
}

需求2:Top10热门品类中每个品类的Top10活跃Session统计

需求说明

在需求一的基础上,增加每个品类用户session点击统计

package com.atguigu.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

object HotCategoryTop10Session {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")
    logRDD.cache()
    val top10ds: Array[String] = top10Category(logRDD)
    //1.过滤原始数据,保留点击和前10品类ID
    val filterActionRDD = logRDD.filter(
      action => {
        val datas: Array[String] = action.split("_")
          if (datas(6) != "-1") {
            top10ds.contains(datas(6))
          } else {
            false
          }
        }
        )
        //2.根据品类ID和Sessionid进行点击量的统计
        val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
          action => {
            val datas: Array[String] = action.split("_")
            ((datas(6), datas(2)), 1)
          }

        ).reduceByKey(_ + _)
     //3.将统计的结果进行结构转化
    //((品类ID,sessionId),sum)=>(品类ID,(sessionId,sum))
    val mapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
      case ((cid, sid), sum) => {
        (cid, (sid, sum))
      }
    }
    //相同的品类进行分组
    val grouupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
    //5.将分组后的数据进行点击量的排序,取前十名
    val resultRDD: RDD[(String, List[(String, Int)])] = grouupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
      }
    )1
    resultRDD.collect().foreach(println)

    sc.stop()



  }

    def top10Category(logRDD: RDD[String]): Array[String] = {
      val lineToCop: RDD[(String, (Int, Int, Int))] = logRDD.flatMap(
        line => {
          val datas: Array[String] = line.split("_")
          if (datas(6) != "-1") {
            List((datas(6), (1, 0, 0)))
          } else if (datas(8) != "null") {
            val ids: Array[String] = datas(8).split(",")
            ids.map(
              id => (id, (0, 1, 0))
            )
          } else if (datas(10) != "null") {
            val ids: Array[String] = datas(10).split(",")
            ids.map(
              id => (id, (0, 0, 1))
            )
          } else {
            Nil
          }
        }
      )
      val copCount: RDD[(String, (Int, Int, Int))] = lineToCop.reduceByKey(
        (t1, t2) => {
          (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
        }
      )
      copCount.sortBy(_._2, false).take(10).map(_._1)



  }
}

image.png

需求3:页面单跳转换率统计

 需求说明

1)页面单跳转化率

计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。

比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。

image.png

2)统计页面单跳转化率意义

产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。

数据分析师,可以此数据做更深一步的计算和分析。

企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略

或策略。

图解

image.png

package com.atguigu.rdd.builder
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
object Spark_Req_pageFlow {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setAppName("rdd").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)

    // 热门品类Top10
    //1. 读数据
    val logRDD: RDD[String] = sc.textFile("F:\\SparkCore代码\\Spark-core\\input\\user_visit_action.txt")
    // 将每条数据封装成一个对象
    val userVisitRDD: RDD[UserVisitAction] = logRDD.map(
      line => {
        UserVisitAction.lineToUserVisitAction(line)
      }
    )

    //求分母
    val pageCount: RDD[(String, Int)] =
      userVisitRDD.map(uservisit => (uservisit.page_id, 1)).reduceByKey(_ + _)
    val pageSum: Map[String, Int] = pageCount.collect().toMap
    val pageSumBC: Broadcast[Map[String, Int]] = sc.broadcast(pageSum)

    //求分子
    //根据session进行分组
    val sessionGroup: RDD[(String, Iterable[UserVisitAction])] = userVisitRDD.groupBy(_.session_id)
    //分组后,根据访问时间进行排序(升序)
    val pageToPageToOneRDD: RDD[(String, List[((String, String), Int)])] = sessionGroup.mapValues(
      iter => {
        val actions: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
        //[1,2,3,4]
        //Sliding:滑窗
        //[1,2],[2,3],[3,4]
        //[1-2,2-3,3-4]
        //zip:拉链
        //[1,2,3,4]
        //[2,3,4]  pages.tail


        val pages: List[String] = actions.map(_.page_id)
        val pageToPage: List[(String, String)] = pages.zip(pages.tail)
        pageToPage.map {
          case (prevPage, nextPage) => ((prevPage, nextPage), 1)
        }
      }
    )
    //((1,2),1)=>((1,2),sum)
    val pageToPageCount: RDD[((String, String), Int)] =
      pageToPageToOneRDD.map(_._2).flatMap(list => list).reduceByKey(_ + _)

    //求单跳转换率

    pageToPageCount.foreach {
      case ((prevPage, nextPage), count) => {
        val pageView: Int = pageSumBC.value.getOrElse(prevPage, 1)

        println(s"[$prevPage - $nextPage] 的单跳转换率为: " + count.toDouble / pageView * 100 + " %")
      }
    }


  }

  //用户访问动作表
  case class UserVisitAction(
                              date: String, //用户点击行为的日期
                              user_id: String, //用户的ID
                              session_id: String, //Session的ID
                              page_id: String, //某个页面的ID
                              action_time: String, //动作的时间点
                              search_keyword: String, //用户搜索的关键词
                              click_category_id: String, //某一个商品品类的ID
                              click_product_id: String, //某一个商品的ID
                              order_category_ids: String, //一次订单中所有品类的ID集合
                              order_product_ids: String, //一次订单中所有商品的ID集合
                              pay_category_ids: String, //一次支付中所有品类的ID集合
                              pay_product_ids: String, //一次支付中所有商品的ID集合
                              city_id: String //城市 id
                            )

  object UserVisitAction {
    def lineToUserVisitAction(line: String): UserVisitAction = {
      val datas: Array[String] = line.split("_")
      UserVisitAction(
        datas(0),
        datas(1),
        datas(2),
        datas(3),
        datas(4),
        datas(5),
        datas(6),
        datas(7),
        datas(8),
        datas(9),
        datas(10),
        datas(11),
        datas(12)
      )
    }

  }

}

image.png

SparkCore-工程代码

image.png

image.png

image.png

WordCountApplication

package faramework.application
import faramework.common.TApplication
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import faramework.controller.WordCountController
import org.apache.spark.SparkConf

object WordCountApplication extends App with TApplication{
 //启动应用程序
 start(){
  val controller = new WordCountController()
  controller.dispatch()
 }



}
package faramework.common

import faramework.controller.WordCountController
import faramework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}

trait TApplication {
  def start(master:String="local[*]",app:String="Application")(op : =>Unit):Unit={
    val sparkConf: SparkConf = new SparkConf().setMaster(master).setAppName(app)
    // 1.创建Spark上下文环境对象(连接对象)
    //    val context = new SparkContext(wordCount)
    val sc: SparkContext = new SparkContext(sparkConf)
    EnvUtil.put(sc)
     try

       {
         op
       }catch {
       case ex=>println(ex.getMessage)
     }


    sc.stop()
    EnvUtil.clear()
  }

}
package faramework.common

trait TController {
  def dispatch():Unit

}
package faramework.common

import faramework.util.EnvUtil

trait TDao {
  def readFile(path: String) = {
    EnvUtil.take().textFile(path)

  }

}
package faramework.common

trait TService {
  def dataAnalysis():Any

}
package faramework.controller

import faramework.common.TController
import faramework.service.WordCountService

class WordCountController extends TController{
  private val WordCountService=new WordCountService()
  def dispatch():Unit={
    val array=WordCountService.dataAnalysis()
    array.foreach(println)

  }

}
package faramework.dao


import faramework.common.TDao
import org.apache.spark.rdd.RDD

//持久层
class WordCountDao extends TDao {


}
package faramework.service

import faramework.common.TService
import faramework.dao.WordCountDao
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//服务层
class WordCountService extends TService{
  private val wordCountDao=new WordCountDao();
  def dataAnalysis()={
    // 读取文件数据,获取一行一行数据
    val lines: RDD[String] = wordCountDao.readFile("F:\\SparkCore代码\\Spark-core\\input\\1.txt")
    // 将文件中的数据进行分词,扁平化
    //    val word: RDD[String] = lines.flatMap(words => words.split(" "))
    //    val word: RDD[String] = lines.flatMap(words => words.split(" "))
    //    val words: RDD[String] = lines.flatMap(_.split(" "))
    val words: RDD[String] = lines.flatMap(_.split(" "))
    //3.将数据根据单词进行分组,便于统计
    //(hello,hello,hello),(world,world)
    val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
    //4.对分组后的数据进行转化
    //(hello,3) (world,2)
    //    val wordTocout: RDD[(String, Int)] = wordGroup.map(kv => {
    //      (kv._1, kv._2.size)
    //    })
    var wordTocout = wordGroup.map {
      case (word, list) => {
        (word, list.size)
      }

    }
    //    var wordToCount=wordGroup.map{
    //      case(word,list)=>{
    //        (word,list.size)
    //      }
    //    }

    //5.将转化结果采集到控制台打印
    val array: Array[(String, Int)] = wordTocout.collect()

    array
  }

}
package faramework.util

import org.apache.spark.SparkContext

object EnvUtil {
  private val scLocal=new ThreadLocal[SparkContext]()
  def put(sc:SparkContext):Unit={
    scLocal.set(sc)
  }
  def take():SparkContext={
    scLocal.get()
  }
  def clear():Unit={
    scLocal.remove()
  }

}

image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
1月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
49 5
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
48 3
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
60 0
|
16天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
42 4
|
1月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
27 0
|
1月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
16 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
2月前
|
分布式计算 Shell Scala
学习使用Spark
学习使用Spark
101 3
|
3月前
|
分布式计算 Shell Scala
如何开始学习使用Spark?
【8月更文挑战第31天】如何开始学习使用Spark?
90 2