机器学习 spark.mllib 数据类型学习

简介: 机器学习 spark.mllib 数据类型学习MLLib提供了一序列基本数据类型以支持底层的机器学习算法。主要的数据内心包括:本地向量、标注点(Labeled Point)、本地矩阵、分布式矩阵等。单机模式存储的本地向量与矩阵,以及基于一个或多个RDD的分布式矩阵。其中本地向量与本地矩阵作为公共接口提供简单数据模型,底层的线性代数操作由Breeze库和jblas库提供。标注点类型用来表示监督学习(Supervised Learning)中的一个训练样本。

机器学习 spark.mllib 数据类型学习


MLLib提供了一序列基本数据类型以支持底层的机器学习算法。主要的数据内心包括:本地向量、标注点(Labeled Point)、本地矩阵、分布式矩阵等。单机模式存储的本地向量与矩阵,以及基于一个或多个RDD的分布式矩阵。其中本地向量与本地矩阵作为公共接口提供简单数据模型,底层的线性代数操作由Breeze库和jblas库提供。标注点类型用来表示监督学习(Supervised Learning)中的一个训练样本。

在正式学习机器学习算法之前,让我们先了解下这些数据类型的用法。

MLlib基本数据类型

向量
package datatype
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg._
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object vectorDemo1 {
  def main(args: Array[String]): Unit = {
    /**
     * 局部变量
     *
     * 单词
     *
     *  dense: 稠密
     *  sparse: 稀疏
     *
     *  indices: index array, must be strictly increasing. 索引 索引数组,必须严格递增
     *  elements: vector elements in (index, value) pairs  元素(索引,值)对中的向量元素
     *
     *  稠密向量
     *      由表示其输入值的双精度数组支持,而稀疏变量有两个并行数组支持:索引和值。
     *          例如:一个向量(1.0,0.0,3.0)可以用稠密格式表示为[1.0,0.0,3.0],或者以(3,[0,2],[1.0,3.0])的稀疏格式表示
     *  稀疏变量
     *      一个向量(1.0,0.0,3.0)表示为(3,[0,2],[1.0,3.0])的稀疏格式表示,其中第一个值3为向量的大小,第二个值表示向量中有值数据的索引,第三个值表示向量的值
     *
     *  局部变量的基类是Vector,提供了两个实现:DenseVector SparseVector 。 建议使用Vectors中实现的工厂方法创建局部变量。org.apache.spark.ml.linalg.Vectors
     *
     */
    //创建稠密向量(1.0,0.0,3.0)
    println("创建稠密向量(1.0,0.0,3.0)Vectors.dense(1.0,0.0,3.0)")
    //                    元素
    Vectors.dense(1.0,0.0,3.0).foreachActive((i,j)=>println(i+"  "+j))
    println("创建稀疏向量(1.0,0.0,3.0) Vectors.sparse(3,Array(0,2),Array(1.0,3.0))")
    //通过指定非零条目的索引和值,创建一个稀疏变量(1.0,0.0,3.0)
    //                       索引         值
    Vectors.sparse(3,Array(0,2),Array(1.0,3.0)).foreachActive((i,j)=>println(i+"  "+j))
    println("创建稀疏向量(1.0,0.0,3.0) Vectors.sparse(3,Seq((0,1.0),(2,3.0)))")
    //通过指定非零条目创建一个 稀疏变量 (1.0,0.0,3.0)
    //                       元素
    Vectors.sparse(3,Seq((0,1.0),(2,3.0))).foreachActive((i,j)=>println(i+"  "+j))
    println()
    /**
     * 标签向量
     *    标签向量是一个稠密或稀疏的局部变量,而且关联了标签.在MLlib中,标签向量用于有监督学习算法.
     *    因为使用双精度存储标签,所以可以在回归和分类中使用标签向量.
     *    对于二元分类,标签应该是0(负)或1(正).对于多雷分类,标签应该是从零开始的类索引:0,1,2,3。。。.标签向量由案例类Labeledoint表示
     *
     * 单词
     * LabeledPoint: 标记点
     * vector: 向量
     * Features: 特征
     */
    //使用正标签和稠密特征创建标签向量
    LabeledPoint(1.0,Vectors.dense(1.0,0.0,3.0))
    //使用负标签和稀疏特征向量创建标签向量
    LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))
    /**
     * LIBSVM
     *
     * 在实践中,使用稀疏的训练数据是很常见的。saprkML支持以LIBSVM格式存储的阅读训练样本,
     * 这是LIBSVM和LIBLINEAR使用的默认格式,是一种文本格式,其中每行代表使用以下格式标记的稀疏特征向量:
     * label index1:value1 index2:value2...
     * 索引一开始就按升序排列。加载后,特征索引将转换为基于零的索引。libsvm包用于奖LIBSVM数据加载为DataFrame的数据元API.
     * 载的DataFrame有两列:包含作为双精度存储的标签和包含作为向量存储的特征。要使用LIBSVM格式的数据原,需要在DataFrameReader中将格式设置为libsvm,并可以指定option例如:
     * SparkSession.builder().getOrCreate().read.format("libsvm")
        *.option("numFeatures","780")
        *.load("/home/rjxy/IdeaProjects/spark/spark_mllib_course/src/main/resources/data/sample_libsvm_data.txt")
     *
     * numFeatures 特征数量
        *指定特征向量数量,如果为指定或不是正数,特征向量的数量将自动确定,但额外需要计算的代价
       *vectorType 特征向量类型 稀疏或稠密
        *特征向量类型,稀疏(默认)或稠密。
     *
     *    LIBSVM是台湾大学林智仁(Lin Chih-Jen)教授等开发设计的一个简单、易于使用和快速有效的SVM模式识别与回归的软件包,
     * 他不但提供了编译好的可在Windows系列系统的执行文件,还提供了源代码,方便改进、修改以及在其它操作系统上应用;
     * 该软件对SVM所涉及的参数调节相对比较少,提供了很多的默认参数,利用这些默认参数可以解决很多问题;并提供了交互检验(Cross Validation)
     * 的功能。该软件可以解决C-SVM、ν-SVM、ε-SVR和ν-SVR等问题,包括基于一对一算法的多类模式识别问题。
     */
    val session: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
      session.read.format("libsvm")
      .option("numFeatures","780")
      .load("/home/rjxy/IdeaProjects/spark/spark_mllib_course/src/main/resources/data/sample_libsvm_data.txt")
    //numFeatures 特征数量
      //指定特征向量数量,如果为指定或不是正数,特征向量的数量将自动确定,但额外需要计算的代价
    //vectorType 特征向量类型 稀疏或稠密
      //特征向量类型,稀疏(默认)或稠密。
  }
}
}
矩阵
package datatype
import breeze.linalg
import breeze.linalg.{DenseMatrix, diag}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Matrix, SparseMatrix, Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, IndexedRow, IndexedRowMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object matrixDemo1 {
  def main(args: Array[String]): Unit = {
    val value: linalg.DenseVector[Double] = breeze.linalg.DenseVector.ones[Double](10)
    val value1: DenseMatrix[Double] = diag(value)
    println(value1)
    SetLogger
    val session: SparkSession = SparkSession.builder().master("local[1]").getOrCreate()
    /**
     * 局部矩阵
     *
     * 单词
     *
     *  Matrix: 矩阵
     *
     *
     *
     *  局部矩阵具有整数类型的行和列索引以及双精度类型的值,他们存储在单个计算机上。
     *  Mllib支持稠密矩阵(其条目值以列优先顺序存储在单个双精度数组中)和稀疏矩阵(其非零条目值以列优先顺序和压缩稀疏列格式存储)
     *
     *
     * 以矩阵大小(3,2)存储在一维数组[1.0,3.0,5.0,2.0,4.0,6.0]中.
     * 局部矩阵的基类是Matrix,提供了两个实现:DenseMatrix和SparseMatrix.
     * 建议使用在矩阵中实现的工厂方法创建局部矩阵。 (mllib 中的局部矩阵以列优先顺序存储)
     *
     */
    //创建稠密矩阵
    println("创建稠密矩阵")
    println(Matrices.dense(3, 2, Array(1.0, 4.0, 5.0, 6.0, 2.0, 3.0)).toString())
    //创建稀疏矩阵
    //Matrices.sparse
    //   * @param numRows number of rows  行数
    //   * @param numCols number of columns  列数
    //   * @param colPtrs the index corresponding to the start of a new column  对应于新列开头的索引
    //   * @param rowIndices the row index of the entry  行索引
    //   * @param values non-zero matrix entries in column major  非零矩阵在主列上
    val matris: Matrix = Matrices.sparse(3, 3, Array(0, 2, 1, 3), Array(0, 1, 2), Array(9, 6, 8))
    //强制转换
    val s: SparseMatrix = matris.asInstanceOf[SparseMatrix]
    //打印稀疏矩阵
    println("创建稀疏矩阵")
    println(s.toString())
    //打印稠密矩阵
    println("创建稀疏矩阵 转换打印稠密矩阵")
    println(s.toDense.toString())
    println()
    /**
     * 分布矩阵
     *
     * 单词
     *
     * RowMatrix 行矩阵
       IndexedRowMatrix 索引行矩阵
       CoordinateMatrix 坐标矩阵
       BlockMatrix 块矩阵
     *
     *    分布式矩阵具有长整形行和列索引以及双精度,他们以分布式方式存储在一个或多个RDD中。
     * 选择正确的格式存储大型的分布式矩阵非常重要。将分布式矩阵转换为不同的格式可能需要全局洗牌,相当耗费资源.
     * 到目前为止,已经实现了四种类型的分布矩阵。
     *    基本类型称为RowMatrix,是面向行的分布式矩阵,例如特征向量的集合,行索引不具有意义。
     * RowMatrix条目的保存格式为RDD,每行是一个局部变量。假设RowMatrix的列数并不是很大,
     * 因此如果单个局部向量可以合理的传递给驱动程序,也可以使用单个节点进行存储和操作。
     *    IndexedRowMatrix 与 RowMatrix类似,但具有行索引,可用于识别行和连接操作。
     *    CoordinateMatrix是以坐标列表格式存储的分布式矩阵,其条目的保存格式为RDD。
     *    BlockMatrix是分布式矩阵,其由包含 MatrixBlock的RDD组成。MatrixBlock是(Int,Int,Matrix)的元组
     *
     */
    /**
     * 1.RowMatrix 行矩阵
     * parallelize 并行化
     *
     * RowMatrix 就能将每行对应一个RDD,将矩阵的每行分布式存储,
     * 矩阵的每行是一个局部向量。由于每一行均由局部向量表示,
     * 因此列数受正数范围限制,但实际上应小得多
     *
     */
    //创建RDD[Vector]
    val rddVector: RDD[Vector] = session.sparkContext.parallelize(Seq(
      Vectors.dense(2.0, 3.0, 4.0),
      Vectors.dense(5.0, 5.0, 5.0),
      Vectors.dense(2.0, 3.0, 4.0)
    ))
    //从RDD[Vector]创建RDDMatrix
    val rowMatrix: RowMatrix = new RowMatrix(rddVector)
    println("创建RDDMatrix")
    println(rowMatrix.rows.foreach(println))
    println(rowMatrix.numRows()+" RowMatrix行数")
    println(rowMatrix.numCols()+" RowMatrix列数")
    println()
    /**
     * 2.IndexedRowMatrix 索引行矩阵
     * IndexedRow 索引的行
     *
     *    IndexedRowMatrix 类似于 RowMatrix,但但行索引有意义。它由带索引行的RDD存储,因此每行都由长整形索引和局部变量表示。
     *    IndexRowMatrix 可以用 RDD[IndexRow]实例创建,其中IndexRow是一个基于(long,Vector)的包装器。
     *    IndexRowMatrix 可以通过删除行索引转换为RowMatrix
     */
    //创建RDD[IndexedRow]
    val rddIndexedRow: RDD[IndexedRow] = session.sparkContext.parallelize(Seq(
      IndexedRow(0, Vectors.dense(1, 3)),
      IndexedRow(1, Vectors.dense(4, 5))
    ))
    //用RDD[IndexedRow]创建 IndexedRowMatrix
    val indexedRowMatrix = new IndexedRowMatrix(rddIndexedRow)
    //行数
    println("IndexedRowMatrix矩阵")
    println(indexedRowMatrix.rows.foreach(println))
    println(indexedRowMatrix.numRows()+" indexedRowMatrix行数")
    //indexedRowMatrix去掉行索引
    println("indexedRowMatrix去掉行索引")
    indexedRowMatrix.toRowMatrix().rows.foreach(println)
    println()
    /**
     * 3.CoordinateMatrix 坐标矩阵
     *
     *    CoordinateMatrix 也是分布式矩阵,每个条目由RDD保存。每个条目是(i:Long,j:Long,value:Double)
     * 的一个元组,其中i是行索引,j是列索引,value是条目值。
     * CoordinateMatrix 只有在矩阵的两个维度都很大且矩阵非常稀疏时才能使用。CoordinateMatrix 可以由RDD[MaxtrixEntry]
     * 实例创建,其中MatrixEntry是基于(Long,Long,Double)的包装器。可以通过调用 toIndexRowMatrix 将
     * CoordinateMatrix 转换为具有稀疏行的 IndexRowMatrix。目前还不支持 CoordinateMatrix 的其他计算
     */
    //创建RDD[MatrixEntry]
    val rddMatrixEntry: RDD[MatrixEntry] = session.sparkContext.parallelize(Seq(
        MatrixEntry(0, 1, 1), MatrixEntry(0, 2, 2), MatrixEntry(0, 3, 3),
        MatrixEntry(0, 4, 4), MatrixEntry(2, 3, 5), MatrixEntry(2, 4, 6),
        MatrixEntry(3, 4, 7), MatrixEntry(4, 5, 8)
    ))
    //用RDD[MatrixEntry]创建 CoordinateMatrix
    val coordinateMatrix: IndexedRowMatrix = new CoordinateMatrix(rddMatrixEntry).toIndexedRowMatrix()
    //转换成IndexRowmatrix,其中的行 为稀疏向量
    println("用RDD[MatrixEntry]创建  CoordinateMatrix  稀疏 >>\n")
    coordinateMatrix.rows.foreach(println)
    println("CoordinateMatrix  稠密矩阵 ")
    plintMaxtrix(coordinateMatrix)
    /**
     * 4.BlockMatrix 块矩阵
     *
     * 单词
     *
     *
     */
    //创建 RDD[MaxEntry]
    val rddblockMatrixEntry: RDD[MatrixEntry] = session.sparkContext.parallelize(Seq(
      MatrixEntry(0, 0, 1.2),MatrixEntry(0, 1, 1.3),MatrixEntry(0, 2, 1.4),
      MatrixEntry(1, 0, 2.1),MatrixEntry(1, 1, 1.5),MatrixEntry(1, 2, 1.6),
      MatrixEntry(6, 10, 3.7),MatrixEntry(2, 0, 1.7)
    ))
    //用RDD[MaxEntry]创建 CoordinateMatrix
    val blockCoordinateMatrix: CoordinateMatrix = new CoordinateMatrix(rddblockMatrixEntry)
    //将CoordinateMatrix 转换为 BlockMatrix
    val blockMatrix: BlockMatrix = blockCoordinateMatrix.toBlockMatrix().cache()
    //验证BlockMatrix的设置是否正确,当它是无效的,则抛出一个异常
    println(blockMatrix.validate())
    println("BlockMatrix toIndexedRowMatrix() 稀疏矩阵  打印")
    blockMatrix.toIndexedRowMatrix().rows.sortBy(_.index).foreach((item: IndexedRow) => {
      println(item.index +""+ item.vector)
    })
    println("BlockMatrix toIndexedRowMatrix() 稠密矩阵  打印")
    val denseBlockMatrix: IndexedRowMatrix = blockMatrix.toIndexedRowMatrix()
    plintMaxtrix(denseBlockMatrix)
    println()
    println(" 计算A^T A ")
    val multiplyBlockMatrix: IndexedRowMatrix = blockMatrix.transpose.multiply(blockMatrix).toIndexedRowMatrix()
    plintMaxtrix(multiplyBlockMatrix)
  }
  /**
   * 打印机 IndexedRowMatrix   "[ \t \t ]"
   *
   * @param IndexedRowMatrix
   */
  def plintMaxtrix(IndexedRowMatrix:IndexedRowMatrix){
    IndexedRowMatrix.rows.sortBy(_.index).foreach((item: IndexedRow) => {
      println(item.index + item.vector.toDense.values.mkString("[","\t\t","]"))
    })
  }
  def SetLogger = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress", "false")
    Logger.getRootLogger().setLevel(Level.OFF);
  }
}
目录
相关文章
|
1月前
|
机器学习/深度学习 分布式计算 算法
联邦学习是保障数据隐私的分布式机器学习方法
【6月更文挑战第13天】联邦学习是保障数据隐私的分布式机器学习方法,它在不暴露数据的情况下,通过在各设备上本地训练并由中心服务器协调,实现全局模型构建。联邦学习的优势在于保护隐私、提高训练效率和增强模型泛化。已应用于医疗、金融和物联网等领域。未来趋势包括更高效的数据隐私保护、提升可解释性和可靠性,以及与其他技术融合,有望在更多场景发挥潜力,推动机器学习发展。
31 4
|
2天前
|
机器学习/深度学习 算法 前端开发
集成学习(Ensemble Learning)是一种机器学习技术,它通过将多个学习器(或称为“基学习器”、“弱学习器”)的预测结果结合起来,以提高整体预测性能。
集成学习(Ensemble Learning)是一种机器学习技术,它通过将多个学习器(或称为“基学习器”、“弱学习器”)的预测结果结合起来,以提高整体预测性能。
7 1
|
23天前
|
机器学习/深度学习 算法 前端开发
机器学习中的集成学习(二)
**集成学习概述** 集成学习通过结合多个弱学习器创建强学习器,如Bagging(Bootstrap Aggregating)和Boosting。Bagging通过随机采样产生训练集,训练多个弱模型,然后平均(回归)或投票(分类)得出结果,减少方差和过拟合。Boosting则是迭代过程,每个弱学习器专注于难分类样本,逐步调整样本权重,形成加权平均的强学习器。典型算法有AdaBoost、GBDT、XGBoost等。两者区别在于,Bagging模型并行训练且独立,而Boosting模型间有依赖,重视错误分类。
|
23天前
|
机器学习/深度学习 人工智能 自然语言处理
机器学习中的集成学习(一)
集成学习是一种将多个弱学习器组合成强学习器的方法,通过投票法、平均法或加权平均等策略减少错误率。它分为弱分类器集成、模型融合和混合专家模型三个研究领域。简单集成技术包括投票法(用于分类,少数服从多数)、平均法(回归问题,预测值取平均)和加权平均法(调整模型权重以优化结果)。在实际应用中,集成学习如Bagging和Boosting是与深度学习并驾齐驱的重要算法,常用于数据竞赛和工业标准。
|
2天前
|
机器学习/深度学习 算法 Python
强化学习(Reinforcement Learning, RL)** 是一种机器学习技术,其中智能体(Agent)通过与环境(Environment)交互来学习如何执行决策以最大化累积奖励。
强化学习(Reinforcement Learning, RL)** 是一种机器学习技术,其中智能体(Agent)通过与环境(Environment)交互来学习如何执行决策以最大化累积奖励。
9 0
|
26天前
|
机器学习/深度学习 算法 Python
【机器学习】集成学习在信用评分领域实例
【机器学习】集成学习在信用评分领域实例
44 1
|
28天前
|
机器学习/深度学习 前端开发 算法
【机器学习】集成学习方法:Bagging与Boosting的应用与优势
【机器学习】集成学习方法:Bagging与Boosting的应用与优势
37 2
|
28天前
|
机器学习/深度学习 算法 TensorFlow
强化学习是一种通过与环境交互来学习最优行为策略的机器学习方法。
强化学习是一种通过与环境交互来学习最优行为策略的机器学习方法。
|
18天前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
14 0
|
19天前
|
人工智能 自然语言处理 机器人
大模型训练的艺术:从预训练到增强学习的四阶段之旅
大模型训练的艺术:从预训练到增强学习的四阶段之旅