Scala写Spark笔记

简介: Scala写Spark笔记

学习感悟


(1)配置环境最费劲

(2)动手写,动手写,动手写


WordCount


package wordcount
import org.apache.spark.{SparkConf, SparkContext}
/**
  * @author CBeann
  * @create 2019-08-10 18:02
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    //使用sc创建rdd并且执行相应的tranformation和action
    val data = sc.textFile("C:\\Users\\Lenovo\\Desktop\\leetcode.txt")
    //操作
    val result = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _,1)
    //控制台打印
    result.collect().foreach(println _)
    //保存
    result.saveAsTextFile("F:\\temp\\aa")
    sc.stop()
    println("-----over-----")
  }
}


排序


第一种方式:按照某一字段排序
val result = data.sortBy(_._2, false)
第二种方式:用类继承Ordered
val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)


package mysort
import org.apache.spark.{SparkConf, SparkContext}
/**
  * @author CBeann
  * @create 2019-08-10 18:26
  */
object MysortDemo {
  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    //使用sc创建rdd并且执行相应的tranformation和action
    val data = sc.makeRDD(List(("张三", 10, 14), ("张三", 9, 9), ("张三", 13, 15)))
//    //第一种方式:按照某一字段排序
//    val result = data.sortBy(_._2, false)
    //第二种方式:用类继承Ordered
     val result =data.sortBy(x => Boy(x._1,x._2,x._3),false)
    //控制台打印
    result.collect().foreach(println _)
  }
}
case class Boy(name: String, faceVale: Int, age: Int) extends Ordered[Boy]{
  override def compare(that: Boy): Int = {
    if(this.faceVale!=that.faceVale){
      this.faceVale-that.faceVale
    }else{
      this.age-that.age
    }
  }
}


自定义分区


自定义分区器


package mypartition
import org.apache.spark.Partitioner
import scala.collection.mutable
/**
  * @author CBeann
  * @create 2019-08-10 18:36
  *         自定义分区器,继承Partitioner
  */
class MyPartitioner extends Partitioner {
  val map = new mutable.HashMap[String, Int]()
  map.put("Java", 0)
  map.put("Scala", 1)
  map.put("Go", 2)
  //一共分多少个区
  override def numPartitions: Int = map.size
  //分区的业务逻辑
  override def getPartition(key: Any): Int = {
    map.getOrElse(key.toString, 0)
  }
}


测试类


package mypartition
import org.apache.spark.{SparkConf, SparkContext}
/**
  * @author CBeann
  * @create 2019-08-10 18:59
  */
object PartitionDemo {
  def main(args: Array[String]): Unit = {
    //创建SparkConf()并且设置App名称
    val conf = new SparkConf().setMaster("local[8]").setAppName("cbeann")
    //创建SparkContext
    val sc = new SparkContext(conf);
    val data = sc.makeRDD(List(("Java", 11), ("Java", 9), ("Scala", 13), ("Go", 11)))
    val result = data.partitionBy(new MyPartitioner)
    result.saveAsTextFile("F:\\temp\\aaa")
    println("--------------OVER------------")
  }
}


SparkSQL


person.json


 {  "name": "王小二",   "age": 15}
 {  "name": "王小三",   "age": 25}
 {  "name": "王小四",   "age": 35}


测试类


package sparksql
import org.apache.spark.sql.SparkSession
/**
  * @author CBeann
  * @create 2019-08-10 18:20
  */
object SparkSqlDemo {
  def main(args: Array[String]): Unit = {
    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example").master("local[8]")
      //.config("spark.some.config.option", "some-value")
      .getOrCreate()
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    val df = spark.read.json("E:\\IntelliJ IDEA 2019.1.3Workspace\\ScalaSpark\\SparkDemo\\src\\main\\resources\\json\\person.json")
    // Displays the content of the DataFrame to stdout
    df.show()
    df.filter($"age" > 21).show()
    df.createOrReplaceTempView("persons")
    spark.sql("SELECT * FROM persons where age > 21").show()
    spark.stop()
    printf("-----over---------")
  }
}


SparkStream


无状态wordcount


package stream
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
  * @author CBeann
  * @create 2019-08-10 18:38
  */
object StreamDemo {
  def main(args: Array[String]): Unit = {
    //需要新建了一个sparkconf变量
    val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
    //新建一个StreamContext入口
    val ssc = new StreamingContext(conf, Seconds(5))
    //从hostname 机器上的9999短空不断的获取数据
    val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);
    //val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));
    //处理数据(wordcount)
    val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
    result.print()
    //启动流式处理程序
    ssc.start()
    //等待你的停止信号
    ssc.awaitTermination()
    printf("--------OVER-------------")
  }
}


有状态wordcount


updateStateByKey方法是关键,传入一个固定参数的方法


package stream.withstatus
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import stream.MyRecever
/**
  * @author CBeann
  * @create 2019-08-10 19:24
  */
object UpdateStateByKeyTest {
  def main(args: Array[String]): Unit = {
    //需要新建了一个sparkconf变量
    val conf = new SparkConf().setAppName("SparkStreamDemo").setMaster("local[8]")
    //新建一个StreamContext入口
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("F:\\temp\\aaa")
    //从hostname 机器上的9999短空不断的获取数据
    val lines = ssc.socketTextStream("iZm5ea99qngm2v98asii1aZ", 9999);
    //处理数据
    val result = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
    //重点
    val allresult = result.updateStateByKey(updateFunction)
    allresult.print()
    //启动流式处理程序
    ssc.start()
    //等待你的停止信号
    ssc.awaitTermination()
    printf("--------OVER-------------")
  }
  //参数列表的类型是固定的,参数名称不是固定的,参数类型是固定的
  // currValues是当前批次RDD中相同的key的value集合
  //preValue是框架提供的上一次的值
  def updateFunction(currValues: Seq[Int], preValue: Option[Int]): Option[Int] = {
    //当前时间段内的数据
    val currValueSum = currValues.sum
    //当前时间段以前的数据
    val oldValueSum = preValue.getOrElse(0)
    //当前值的和加上历史值
    Some(currValueSum+oldValueSum)
  }
}


自定义接收器


package stream
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
/**
  * @author CBeann
  * @create 2019-08-10 18:39
  */
class MyRecever (host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  //recever启动调用的方法
  override def onStart(): Unit = {
    new Thread() {
      override def run(): Unit = {
        receive()
      }
    }.start()
  }
  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)
      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while (!isStopped && userInput != null) {
        // 传送出来
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()
      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
  override def onStop(): Unit = ???
}


val lines = ssc.receiverStream(new MyRecever("iZm5ea99qngm2v98asii1aZ",9999));


pom.xml


 <properties>
        <mysql.version>6.0.5</mysql.version>
        <spring.version>4.3.6.RELEASE</spring.version>
        <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
        <log4j.version>1.2.17</log4j.version>
        <quartz.version>2.2.3</quartz.version>
        <slf4j.version>1.7.22</slf4j.version>
        <hibernate.version>5.2.6.Final</hibernate.version>
        <camel.version>2.18.2</camel.version>
        <config.version>1.10</config.version>
        <jackson.version>2.8.6</jackson.version>
        <servlet.version>3.0.1</servlet.version>
        <net.sf.json.version>2.4</net.sf.json.version>
        <activemq.version>5.14.3</activemq.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
目录
相关文章
|
6月前
|
SQL 分布式计算 HIVE
pyspark笔记(RDD,DataFrame和Spark SQL)1
pyspark笔记(RDD,DataFrame和Spark SQL)
60 1
|
8天前
|
SQL 存储 分布式计算
在scala中使用spark
在scala中使用spark
7 0
|
8天前
|
分布式计算 Java Scala
spark 与 scala 的对应版本查看、在idea中maven版本不要选择17,弄了好久,换成11就可以啦
spark 与 scala 的对应版本查看、.在idea中maven版本不要选择17,弄了好久,换成11就可以啦
121 2
|
8天前
|
分布式计算 数据处理 Scala
Spark 集群和 Scala 编程语言的关系
Spark 集群和 Scala 编程语言的关系
38 0
|
8天前
|
分布式计算 Java Scala
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
Spark编程语言选择:Scala、Java和Python
|
6月前
|
SQL 存储 分布式计算
pyspark笔记(RDD,DataFrame和Spark SQL)2
pyspark笔记(RDD,DataFrame和Spark SQL)
58 2
|
6月前
|
分布式计算 资源调度 Java
Spark笔记(pyspark)2
Spark笔记(pyspark)
72 0
|
6月前
|
存储 分布式计算 资源调度
Spark笔记(pyspark)1
Spark笔记(pyspark)
72 0
|
12月前
|
存储 分布式计算 Scala
Spark-RDD 键值对的操作(Scala版)
Spark-RDD 键值对的操作(Scala版)