Machine Learning on Spark——第四节 统计基础（二)

本节主要内容

1. Correlation 相关性分析
2. 分层采样（Stratified sampling）
3. 随机数据生成（Random data generation）

1. Correlation 相关性分析

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.stat._
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Matrix, Vector}

object CorrelationDemo extends App {
val sparkConf = new SparkConf().setAppName("StatisticsDemo").setMaster("spark://sparkmaster:7077")
val sc = new SparkContext(sparkConf)

val rdd1:RDD[Double] = sc.parallelize(Array(11.0, 21.0, 13.0, 14.0))
val rdd2:RDD[Double] = sc.parallelize(Array(11.0, 20.0, 13.0, 16.0))
//两个rdd间的相关性
//返回值：correlation: Double = 0.959034501397483
//[-1, 1]，值越接近于1，其相关度越高
val correlation:Double = Statistics.corr(rdd1, rdd2, "pearson")

val rdd3:RDD[Vector]= sc.parallelize(
Array(
Array(1.0,2.0,3.0,4.0),
Array(2.0,3.0,4.0,5.0),
Array(3.0,4.0,5.0,6.0)
)
).map(f => Vectors.dense(f))
//correlation3: org.apache.spark.mllib.linalg.Matrix =
//1.0  1.0  1.0  1.0
//1.0  1.0  1.0  1.0
//1.0  1.0  1.0  1.0
//1.0  1.0  1.0  1.0
val correlation3:Matrix = Statistics.corr(rdd3, "pearson")
}



 val rdd4:RDD[Double] = sc.parallelize(Array(50.0, 60.0, 70.0, 80.0,90.0,95.0))
val rdd5:RDD[Double] = sc.parallelize(Array(500.0, 510.0, 530.0, 580.0,560,1000))
//执行结果为:
//correlation4: Double = 0.6915716600436548
val correlation4:Double = Statistics.corr(rdd4, rdd5, "pearson")

//采用spearman相关系数
//执行结果：
//correlation5: Double = 0.9428571428571412
val correlation5:Double = Statistics.corr(rdd4, rdd5, "spearman")

2. 分层采样（Stratified sampling）

package cn.ml.stat

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.SparkConf

object StratifiedSampleDemo extends App {

val sparkConf = new SparkConf().setAppName("StatisticsDemo").setMaster("spark://sparkmaster:7077")
val sc = new SparkContext(sparkConf)
//wordCount操作，返回（K,V)汇总结果
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)

//定义key为spark,采样比率为0.5
val fractions: Map[String, Double] = Map("Spark"->0.5)

//使用sampleByKey方法进行采样
val approxSample = wordCounts.sampleByKey(false, fractions)
//使用sampleByKeyExact方法进行采样，该方法资源消耗较sampleByKey更大
//但采样后的大小与预期大小更接近，可信度达到99.99%
val exactSample = wordCounts.sampleByKeyExact(false, fractions)
}

3. 随机数据生成（Random data generation）

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.mllib.random.RandomRDDs._
import org.apache.spark.mllib.random.RandomRDDs._

//生成100个服从标准正态分面N(0,1)的随机RDD数据，10为指定的分区数
scala> val u = normalRDD(sc, 100L, 10)
u: org.apache.spark.rdd.RDD[Double] = RandomRDD[26] at RDD at RandomRDD.scala:38

//转换使其服从N(1,4)的正太分布
scala> val v = u.map(x => 1.0 + 2.0 * x)
v: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[27] at map at <console>:27


|
10月前
|

195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
61 0
|

Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
Spark Streaming实时流处理项目实战笔记——将统计结果写入到MySQL数据库中
435 0
|
3月前
|

73 0
|
4月前
|

spark on k8s native
spark on k8s native
114 1
|
4月前
|

【大数据技术】Spark MLlib机器学习特征抽取 TF-IDF统计词频实战（附源码和数据集）
【大数据技术】Spark MLlib机器学习特征抽取 TF-IDF统计词频实战（附源码和数据集）
54 0
|
10月前
|

Hago 的 Spark on ACK 实践
Hago 的 Spark on ACK 实践
497 0
|

912 1
|
11月前
|

Spark on Yarn集群模式搭建及测试
Spark on Yarn集群模式搭建及测试
267 0
|
12月前
|

107 0
|
SQL JSON 分布式计算
【大数据学习篇10】Spark项目实战~网站转化率统计
【大数据学习篇10】Spark项目实战~网站转化率统计
460 0