作者:周志湖
微信号:zhouzhihubyond
本节主要内容
- IndexedRowMatrix
- BlockMatrix
1. IndexedRowMatrix的使用
IndexedRowMatrix,顾名思义就是带索引的RowMatrix,它采用case class IndexedRow(index: Long, vector: Vector)类来表示矩阵的一行,index表示的就是它的索引,vector表示其要存储的内容。其使用方式如下:
package cn.ml.datastruct
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Matrices
import org.apache.spark.mllib.linalg.distributed.IndexedRow
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
object IndexRowMatrixDemo extends App {
val sparkConf = new SparkConf().setAppName("IndexRowMatrixDemo ").setMaster("spark://sparkmaster:7077")
val sc = new SparkContext(sparkConf)
//定义一个隐式转换函数
implicit def double2long(x:Double)=x.toLong
//数据中的第一个元素为IndexedRow中的index,剩余的映射到vector
//f.take(1)(0)获取到第一个元素并自动进行隐式转换,转换成Long类型
val rdd1= sc.parallelize(
Array(
Array(1.0,2.0,3.0,4.0),
Array(2.0,3.0,4.0,5.0),
Array(3.0,4.0,5.0,6.0)
)
).map(f => IndexedRow(f.take(1)(0),Vectors.dense(f.drop(1))))
val indexRowMatrix = new IndexedRowMatrix(rdd1)
//计算拉姆矩阵
var gramianMatrix:Matrix=indexRowMatrix.computeGramianMatrix()
//转换成行矩阵RowMatrix
var rowMatrix:RowMatrix=indexRowMatrix.toRowMatrix()
//其它方法例如computeSVD计算奇异值、multiply矩阵相乘等操作,方法使用与RowMaxtrix相同
}
2. BlockMatrix的使用
分块矩阵将一个矩阵分成若干块,例如:
可以将其分成四块
从而矩阵P有如下形式
更多分块矩阵的相关内容包括分块矩阵的转置、分块矩阵的相乘操作可以参见https://en.wikipedia.org/wiki/Block_matrix
package cn.ml.datastruct
import org.apache.spark.mllib.linalg.distributed.BlockMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
import org.apache.spark.mllib.linalg.distributed.MatrixEntry
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.distributed.IndexedRow
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.SparkConf
object BlockMatrixDemo extends App {
val sparkConf = new SparkConf().setAppName("BlockMatrixDemo").setMaster("spark://sparkmaster:7077") //这里指在本地运行,2个线程
val sc = new SparkContext(sparkConf)
implicit def double2long(x:Double)=x.toLong
val rdd1= sc.parallelize(
Array(
Array(1.0,20.0,30.0,40.0),
Array(2.0,50.0,60.0,70.0),
Array(3.0,80.0,90.0,100.0)
)
).map(f => IndexedRow(f.take(1)(0),Vectors.dense(f.drop(1))))
val indexRowMatrix = new IndexedRowMatrix(rdd1)
//将IndexedRowMatrix转换成BlockMatrix,指定每块的行列数
val blockMatrix:BlockMatrix=indexRowMatrix.toBlockMatrix(2, 2)
//执行后的打印内容:
//Index:(0,0)MatrixContent:2 x 2 CSCMatrix
//(1,0) 20.0
//(1,1) 30.0
//Index:(1,1)MatrixContent:2 x 1 CSCMatrix
//(0,0) 70.0
//(1,0) 100.0
//Index:(1,0)MatrixContent:2 x 2 CSCMatrix
//(0,0) 50.0
//(1,0) 80.0
//(0,1) 60.0
//(1,1) 90.0
//Index:(0,1)MatrixContent:2 x 1 CSCMatrix
//(1,0) 40.0
//从打印内容可以看出:各分块矩阵采用的是稀疏矩阵CSC格式存储
blockMatrix.blocks.foreach(f=>println("Index:"+f._1+"MatrixContent:"+f._2))
//转换成本地矩阵
//0.0 0.0 0.0
//20.0 30.0 40.0
//50.0 60.0 70.0
//80.0 90.0 100.0
//从转换后的内容可以看出,在indexRowMatrix.toBlockMatrix(2, 2)
//操作时,指定行列数与实际矩阵内容不匹配时,会进行相应的零值填充
blockMatrix.toLocalMatrix()
//块矩阵相加
blockMatrix.add(blockMatrix)
//块矩阵相乘blockMatrix*blockMatrix^T(T表示转置)
blockMatrix.multiply(blockMatrix.transpose)
//转换成CoordinateMatrix
blockMatrix.toCoordinateMatrix()
//转换成IndexedRowMatrix
blockMatrix.toIndexedRowMatrix()
//验证分块矩阵的合法性
blockMatrix.validate()
}