Spark Test

简介: 练习关于讲list分为奇 偶 并求出占比 练习 关于需求 表合并 reduce filter 等操作 Spark 中 RDD 过程 transformation 和 Action 大多数操作基于transformation 所以 可以链式写法 package com.

练习关于讲list分为奇 偶 并求出占比

练习 关于需求 表合并 reduce filter 等操作

Spark 中 RDD 过程 transformation 和 Action 大多数操作基于transformation 所以 可以链式写法


package com.zhiyou100

import org.apache.spark.{SparkConf, SparkContext}

object HomeWork {

  val conf =new  SparkConf().setMaster("local[*]").setAppName("home work")
  val sc= SparkContext.getOrCreate(conf)

//作业一
  def ListsortTest()={

    val list =List(1,2,43,5,6,7,76,8,9,0,3)
    val rdd=sc.parallelize(list)

    val allnum =rdd.count()
    val qiAccmulator=sc.longAccumulator("qishu")
    val ouAccmulator=sc.longAccumulator("oushu")
    rdd.foreach(x=>{if (x%2==0)  qiAccmulator.add(1) else ouAccmulator.add(1) })
    val qibi=(qiAccmulator.value/allnum.toDouble)
    val oubi =(ouAccmulator.value/allnum.toDouble)
    println(s"总数:$allnum:奇数:$qiAccmulator 占比:$qibi:偶数:$ouAccmulator:占比:$oubi")
  }

  //作业二

  def getorderResult()={
    val rdd =sc.textFile("/orderdata/customers/")
    val userId = rdd.map(x=>{
      val infos =x.split("\\|")
        (infos(0).toInt,s"${infos(1)}.${infos(2)}")
    })
    userId.foreach(println)

    val rdd1 =sc.textFile("/orderdata/orders")
    val orderId =rdd1.map(x=>{
      val infos1=x.split("\\|")
        (infos1(2).toInt, infos1(0))
//        (infos1(1), infos1(0))
    })
    val orderId2=rdd1.filter(x=>{
      x.split("\\|")(3).equals("COMPLETE")
    })map(x=>{
     val infos11=x.split("\\|")
      (infos11(0),infos11(2))
    })
    val getJoin=orderId.join(userId).map(x=>(x._1,1)).reduceByKey(_+_).join(userId)
  orderId.foreach(println)
    val kuanbiao=orderId.join(userId)
    //userId(订单数,username)
   getJoin.foreach(println)

    val rdd2=sc.textFile("/orderdata/products/")
    val productId =rdd2.map(x=>{
      val infos2=x.split("\\|")
      (infos2(0).toInt,(infos2(2),infos2(4)))
    })
//    productId.foreach(println)

//    getJoin.foreach(println)
    kuanbiao.foreach(println)
    //(productId ,(userId ,username))
    val kuanbiao1=kuanbiao.map(x=>{
      (x._2._1.toInt,(x._1,x._2._2))
    }).join(productId).map(x=>{
      (x._2._1._1.toInt,x._2._2._2.toDouble)
    }).reduceByKey(_+_).join(userId)
//    //计算出个人消费总金额
//    kuanbiao.foreach(println)

    val kuanbiao2=kuanbiao.map(x=>{
      (x._2._1.toInt,(x._1,x._2._2))
    }).join(productId).map(x=>{
      (x._2._1._1,x._2._2._1)
    }).countByKey()
    //计算user的购买产品个数
      kuanbiao2.foreach(println)
  }

  def thirdHome()={
      val rdd =sc.textFile("/orderdata/categories/")
    val categories=rdd.map(x=>{
     val infos = x.split("\\|")
      (infos(1).toInt,(infos(0),(infos(2))))
    })
    val rdd1=sc.textFile("/orderdata/departments")
    val department=rdd1.map(x=>{
    val infos1 =  x.split("\\|")
      (infos1(0).toInt,infos1(1))
    })
    val rdd2 =sc.textFile("/orderdata/products/")
    val productId =rdd2.map(x=>{
      val infos2=x.split("\\|")
      (infos2(1).toInt,(infos2(2),infos2(4)))
    })
    val q1 =department.join(categories).map(x=>(x._1,x._2._2._1.toInt)).reduceByKey(_+_)
    val q2 =department.join(categories).map(x=>(x._1,x._2._2._1)).countByKey()
    val q3 =department.join(categories).map(x=>())
    //商品数量
    q1.foreach(println)
    q2.foreach(println)
    q3.foreach(println)
  }


  def main(args: Array[String]): Unit = {
//    ListsortTest()
//    getorderResult()
    thirdHome()
  }



}


相关文章
|
6月前
|
存储 缓存 分布式计算
【Spark】Spark Core Day04
【Spark】Spark Core Day04
51 1
|
6月前
test114514
test114514
44 0
|
存储 SQL 缓存
test1
1340009911118900001111
339 0
|
分布式计算 Java API
Spark 2.4.0编程指南--spark dataSet action
## 技能标签 - Spark session 创建 - 在Spark 2.0之后,RDD被数据集(Dataset)取代 ,保留RDD旧api - 数据集数据集介绍 - 读取本地文件(txt,json),HDFS文件 - 对txt格式文件数据遍历(行数据转成对象) - 对json格式文件...
1621 0
|
机器学习/深度学习 存储 分布式计算
【译】Apache spark 2.4:内置 Image Data Source的介绍
主要介绍Apache Spark 2.4版本内置Image Data Source数据源
1699 0