Machine Learning On Spark——第二节:基础数据结构(二)

简介: 作者:周志湖 微信号:zhouzhihubyond本节主要内容IndexedRowMatrixBlockMatrix1. IndexedRowMatrix的使用IndexedRowMatrix,顾名思义就是带索引的RowMatrix,它采用case class IndexedRow(index: Long, vector: Vector)类来表示矩阵的一

作者:周志湖
微信号:zhouzhihubyond

本节主要内容

  1. IndexedRowMatrix
  2. BlockMatrix

1. IndexedRowMatrix的使用

IndexedRowMatrix,顾名思义就是带索引的RowMatrix,它采用case class IndexedRow(index: Long, vector: Vector)类来表示矩阵的一行,index表示的就是它的索引,vector表示其要存储的内容。其使用方式如下:

package cn.ml.datastruct

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.IndexedRow
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix

object IndexRowMatrixDemo extends App {
  val sparkConf = new SparkConf().setAppName("IndexRowMatrixDemo ").setMaster("spark://sparkmaster:7077")  
  val sc = new SparkContext(sparkConf)

  //定义一个隐式转换函数
  implicit def double2long(x:Double)=x.toLong
  //数据中的第一个元素为IndexedRow中的index,剩余的映射到vector
  //f.take(1)(0)获取到第一个元素并自动进行隐式转换,转换成Long类型
  val rdd1= sc.parallelize(
      Array(
          Array(1.0,2.0,3.0,4.0),
          Array(2.0,3.0,4.0,5.0),
          Array(3.0,4.0,5.0,6.0)
          )
      ).map(f => IndexedRow(f.take(1)(0),Vectors.dense(f.drop(1))))
   val indexRowMatrix = new IndexedRowMatrix(rdd1)
   //计算拉姆矩阵
   var gramianMatrix:Matrix=indexRowMatrix.computeGramianMatrix()
   //转换成行矩阵RowMatrix
   var rowMatrix:RowMatrix=indexRowMatrix.toRowMatrix()
   //其它方法例如computeSVD计算奇异值、multiply矩阵相乘等操作,方法使用与RowMaxtrix相同

}

2. BlockMatrix的使用

分块矩阵将一个矩阵分成若干块,例如:
这里写图片描述
可以将其分成四块
这里写图片描述
从而矩阵P有如下形式
这里写图片描述
更多分块矩阵的相关内容包括分块矩阵的转置、分块矩阵的相乘操作可以参见https://en.wikipedia.org/wiki/Block_matrix

package cn.ml.datastruct

import org.apache.spark.mllib.linalg.distributed.BlockMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.mllib.linalg.distributed.MatrixEntry
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed.IndexedRow
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.SparkConf

object BlockMatrixDemo extends App {
  val sparkConf = new SparkConf().setAppName("BlockMatrixDemo").setMaster("spark://sparkmaster:7077") //这里指在本地运行,2个线程  
  val sc = new SparkContext(sparkConf)

  implicit def double2long(x:Double)=x.toLong
  val rdd1= sc.parallelize(
      Array(
          Array(1.0,20.0,30.0,40.0),
          Array(2.0,50.0,60.0,70.0),
          Array(3.0,80.0,90.0,100.0)
          )
      ).map(f => IndexedRow(f.take(1)(0),Vectors.dense(f.drop(1))))
   val indexRowMatrix = new IndexedRowMatrix(rdd1)
   //将IndexedRowMatrix转换成BlockMatrix,指定每块的行列数
   val blockMatrix:BlockMatrix=indexRowMatrix.toBlockMatrix(2, 2)

   //执行后的打印内容:
  //Index:(0,0)MatrixContent:2 x 2 CSCMatrix
  //(1,0) 20.0
  //(1,1) 30.0
  //Index:(1,1)MatrixContent:2 x 1 CSCMatrix
  //(0,0) 70.0
  //(1,0) 100.0
  //Index:(1,0)MatrixContent:2 x 2 CSCMatrix
  //(0,0) 50.0
  //(1,0) 80.0
  //(0,1) 60.0
  //(1,1) 90.0
  //Index:(0,1)MatrixContent:2 x 1 CSCMatrix
  //(1,0) 40.0
  //从打印内容可以看出:各分块矩阵采用的是稀疏矩阵CSC格式存储
   blockMatrix.blocks.foreach(f=>println("Index:"+f._1+"MatrixContent:"+f._2))

   //转换成本地矩阵
   //0.0   0.0   0.0    
   //20.0  30.0  40.0   
   //50.0  60.0  70.0   
   //80.0  90.0  100.0 
   //从转换后的内容可以看出,在indexRowMatrix.toBlockMatrix(2, 2)
   //操作时,指定行列数与实际矩阵内容不匹配时,会进行相应的零值填充
   blockMatrix.toLocalMatrix()

   //块矩阵相加
   blockMatrix.add(blockMatrix)

   //块矩阵相乘blockMatrix*blockMatrix^T(T表示转置)
   blockMatrix.multiply(blockMatrix.transpose)

   //转换成CoordinateMatrix
   blockMatrix.toCoordinateMatrix()

   //转换成IndexedRowMatrix
   blockMatrix.toIndexedRowMatrix()

   //验证分块矩阵的合法性
   blockMatrix.validate()
}
目录
相关文章
|
分布式计算 Kubernetes Java
spark on k8s 镜像构建
spark on k8s 镜像构建
716 0
|
分布式计算 资源调度 Hadoop
十二、Spark的安装与部署详情(Local模式,Standalone模式,Spank on YARN模式)
十二、Spark的安装与部署详情(Local模式,Standalone模式,Spank on YARN模式)
十二、Spark的安装与部署详情(Local模式,Standalone模式,Spank on YARN模式)
|
4月前
|
分布式计算 Kubernetes Java
spark on k8s native
spark on k8s native
|
10月前
|
分布式计算 Kubernetes Serverless
Hago 的 Spark on ACK 实践
Hago 的 Spark on ACK 实践
|
资源调度 分布式计算 Hadoop
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
大数据平台搭建(容器环境)——Spark3.X on Yarn安装配置
|
11月前
|
分布式计算 资源调度 Hadoop
Spark on Yarn集群模式搭建及测试
Spark on Yarn集群模式搭建及测试
267 0
|
12月前
|
资源调度 分布式计算 大数据
大数据Spark on YARN
大数据Spark on YARN
107 0
|
分布式计算 资源调度 Kubernetes
[翻译]Spark on MR3——运行 Apache Spark 的新方式
> 此文是对 Spark on MR3 资料的翻译 原文链接:https://www.datamonad.com/post/2021-08-18-spark-mr3/ 代码链接:https://github.com/mr3project/spark-mr3 MR3 是一个通用的执行引擎,原生支持 Hadoop 和 Kubernetes。虽然 Hive on MR3 是主要应用,但 MR3 也可以
523 0
[翻译]Spark on MR3——运行 Apache Spark 的新方式
|
SQL 分布式计算 资源调度
Spark on Yarn Job的执行流程简介
2017-12-19-Hadoop2.0架构及HA集群配置(1) 2017-12-24-Hadoop2.0架构及HA集群配置(2) 2017-12-25-Spark集群搭建 2017-12-29-Hadoop和Spark的异同 2017-12-28-Spark-HelloWorld(Spark开发环境搭建)
|
存储 分布式计算 资源调度
Spark on k8s 在阿里云 EMR 的优化实践
本文整理自阿里云技术专家范佚伦在7月17日阿里云数据湖技术专场交流会的分享。
Spark on k8s 在阿里云 EMR 的优化实践