Spark Test

简介: 练习关于讲list分为奇 偶 并求出占比 练习 关于需求 表合并 reduce filter 等操作 Spark 中 RDD 过程 transformation 和 Action 大多数操作基于transformation 所以 可以链式写法 package com.

练习关于讲list分为奇 偶 并求出占比

练习 关于需求 表合并 reduce filter 等操作

Spark 中 RDD 过程 transformation 和 Action 大多数操作基于transformation 所以 可以链式写法


package com.zhiyou100

import org.apache.spark.{SparkConf, SparkContext}

object HomeWork {

  val conf =new  SparkConf().setMaster("local[*]").setAppName("home work")
  val sc= SparkContext.getOrCreate(conf)

//作业一
  def ListsortTest()={

    val list =List(1,2,43,5,6,7,76,8,9,0,3)
    val rdd=sc.parallelize(list)

    val allnum =rdd.count()
    val qiAccmulator=sc.longAccumulator("qishu")
    val ouAccmulator=sc.longAccumulator("oushu")
    rdd.foreach(x=>{if (x%2==0)  qiAccmulator.add(1) else ouAccmulator.add(1) })
    val qibi=(qiAccmulator.value/allnum.toDouble)
    val oubi =(ouAccmulator.value/allnum.toDouble)
    println(s"总数:$allnum:奇数:$qiAccmulator 占比:$qibi:偶数:$ouAccmulator:占比:$oubi")
  }

  //作业二

  def getorderResult()={
    val rdd =sc.textFile("/orderdata/customers/")
    val userId = rdd.map(x=>{
      val infos =x.split("\\|")
        (infos(0).toInt,s"${infos(1)}.${infos(2)}")
    })
    userId.foreach(println)

    val rdd1 =sc.textFile("/orderdata/orders")
    val orderId =rdd1.map(x=>{
      val infos1=x.split("\\|")
        (infos1(2).toInt, infos1(0))
//        (infos1(1), infos1(0))
    })
    val orderId2=rdd1.filter(x=>{
      x.split("\\|")(3).equals("COMPLETE")
    })map(x=>{
     val infos11=x.split("\\|")
      (infos11(0),infos11(2))
    })
    val getJoin=orderId.join(userId).map(x=>(x._1,1)).reduceByKey(_+_).join(userId)
  orderId.foreach(println)
    val kuanbiao=orderId.join(userId)
    //userId(订单数,username)
   getJoin.foreach(println)

    val rdd2=sc.textFile("/orderdata/products/")
    val productId =rdd2.map(x=>{
      val infos2=x.split("\\|")
      (infos2(0).toInt,(infos2(2),infos2(4)))
    })
//    productId.foreach(println)

//    getJoin.foreach(println)
    kuanbiao.foreach(println)
    //(productId ,(userId ,username))
    val kuanbiao1=kuanbiao.map(x=>{
      (x._2._1.toInt,(x._1,x._2._2))
    }).join(productId).map(x=>{
      (x._2._1._1.toInt,x._2._2._2.toDouble)
    }).reduceByKey(_+_).join(userId)
//    //计算出个人消费总金额
//    kuanbiao.foreach(println)

    val kuanbiao2=kuanbiao.map(x=>{
      (x._2._1.toInt,(x._1,x._2._2))
    }).join(productId).map(x=>{
      (x._2._1._1,x._2._2._1)
    }).countByKey()
    //计算user的购买产品个数
      kuanbiao2.foreach(println)
  }

  def thirdHome()={
      val rdd =sc.textFile("/orderdata/categories/")
    val categories=rdd.map(x=>{
     val infos = x.split("\\|")
      (infos(1).toInt,(infos(0),(infos(2))))
    })
    val rdd1=sc.textFile("/orderdata/departments")
    val department=rdd1.map(x=>{
    val infos1 =  x.split("\\|")
      (infos1(0).toInt,infos1(1))
    })
    val rdd2 =sc.textFile("/orderdata/products/")
    val productId =rdd2.map(x=>{
      val infos2=x.split("\\|")
      (infos2(1).toInt,(infos2(2),infos2(4)))
    })
    val q1 =department.join(categories).map(x=>(x._1,x._2._2._1.toInt)).reduceByKey(_+_)
    val q2 =department.join(categories).map(x=>(x._1,x._2._2._1)).countByKey()
    val q3 =department.join(categories).map(x=>())
    //商品数量
    q1.foreach(println)
    q2.foreach(println)
    q3.foreach(println)
  }


  def main(args: Array[String]): Unit = {
//    ListsortTest()
//    getorderResult()
    thirdHome()
  }



}


相关文章
|
7月前
|
存储 缓存 分布式计算
【Spark】Spark Core Day04
【Spark】Spark Core Day04
55 1
|
7月前
test114514
test114514
47 0
|
7月前
|
SQL JSON 分布式计算
Spark【Spark SQL(三)DataSet】
Spark【Spark SQL(三)DataSet】
|
存储 SQL 缓存
|
SQL 存储 JSON
Apache Spark,Parquet和麻烦的Null
  关于类型安全性的经验教训,并承担过多   介绍   在将SQL分析ETL管道迁移到客户端的新Apache Spark批处理ETL基础结构时,我注意到了一些奇特的东西。 开发的基础结构具有可为空的DataFrame列架构的概念。 乍看起来似乎并不奇怪。 大多数(如果不是全部)SQL数据库都允许列为可空或不可空,对吗? 让我们研究一下在创建Spark DataFrame时,这种看似明智的概念为什么会带来问题。   from pyspark.sql import types   schema=types.StructType([
898 0
just test
click me
1029 0
test1
1340009911118900001111
346 0