大数据Spark MLlib机器学习

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据Spark MLlib机器学习

1 什么是Spark MLlib?

MLlib是Spark的机器学习(ML)库。旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。


MLlib目前分为两个代码包:


spark.mllib 包含基于RDD的原始算法API。

spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习管道。MLlib支持的算法库如下:

2 支持的数据类型

MLlib支持的数据类型是比较丰富的,从最基本的Spark数据集RDD到部署在集群中向量和矩阵,并且还支持部署在本地计算机中的本地化格式。

2.1 本地向量集

MLlib使用的本地化存储类型是向量,这里的向量主要由两类构成:稀疏型数据集(sparse)和密集型数据集(dense)。

导入MLlib的依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.12</artifactId>
    <version>2.4.3</version>
</dependency>

密集向量由double类型的数组支持,而稀疏向量则由两个平行数组支持。


example:


向量(5.2,0.0,5.5)


密集向量表示:[5.2,0.0,5.5]


稀疏向量表示:(3,[0,2],[5.2,5.5]) # 3是向量(5.2,0.0,5.5)的长度,除去0值外,其他两个值的索引和值分别构成了数组[0,2]和数组[5.2,5.5]。

2.1.1、密集型数据集

package cn.itcast.spark;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.junit.Test;
public class SparkMLlib {
    @Test
    public void testDense(){
        Vector vd = Vectors.dense(9, 5, 2, 7); //定义密集型向量
        double v = vd.apply(2); //获取下标为2的值
        System.out.println(v); //2
    }
}

2.1.2 稀疏型数据集

    @Test
    public void testSparse() {
        int[] indexes = new int[]{0, 1, 2, 5};
        double[] values = new double[]{9, 5, 2, 7};
        Vector vd = Vectors.sparse(6, indexes, values); //定义稀疏型向量
        double v = vd.apply(2); //获取下标为2的值
        double v2 = vd.apply(4); //获取下标为4的值
        double v3 = vd.apply(5); //获取下标为5的值
        System.out.println(v); //2
        System.out.println(v2); //0
        System.out.println(v3); //7
    }

2.2 向量标签

向量标签用于MLLIB中机器学习算法做标记。在分类问题中,可以将不同的数据集分成若干份,以整数型0、1、2进行标记。例如:垃圾邮件标记为1,非垃圾邮件标记为0。

    @Test
    public void testLabeledPoint(){
        Vector vd = Vectors.dense(9, 5, 2, 7); //定义密集型向量
        LabeledPoint lp = new LabeledPoint(1, vd); //定义标签向量
        System.out.println(lp.features());
        System.out.println(lp.label());
        System.out.println(lp);
        System.out.println("------------");
        int[] indexes = new int[]{0, 1, 2, 3};
        double[] values = new double[]{9, 5, 2, 7};
        Vector vd2 = Vectors.sparse(4, indexes, values); //定义稀疏型向量
        LabeledPoint lp2 = new LabeledPoint(2, vd2);
        System.out.println(lp2.features());
        System.out.println(lp2.label());
        System.out.println(lp2);
    }

2.3 本地矩阵

大数据运算中,为了更好地提升计算效率,可以更多地使用矩阵运算进行数据处理。部署在单机中的本地矩阵就是一个很好的存储方法。举一个简单的例子,例如一个数组Array(1,2,3,4,5,6),将其分为2行3列的:

    @Test
    public void testMatrix() {
        double[] values = new double[]{1, 2, 3, 4, 5, 6};
        Matrix mx = Matrices.dense(2, 3, values);
        System.out.println(mx);
    }

运行结果:

1.0  3.0  5.0  
2.0  4.0  6.0

2.4 分布式矩阵

分布式矩阵由长整型行列索引和双精度浮点型值数据组成,分布式存储在一个或多个RDD中,对于巨大的分布式矩阵来说,选择正确的存储格式非常重要,将一个分布式矩阵转化为另外一个不同格式需要混洗(shuffle),其代价很高。在MLlib实现了四类分布式矩阵存储格式,分别是:


行矩阵(RowMatrix)

行索引矩阵(IndexedRowMatrix)

坐标矩阵(CoordinateMatrix)

分块矩阵(BlockMatrix)

2.4.1 行矩阵

行矩阵是最基本的一种矩阵类型。行矩阵是以行作为基本方向的矩阵存储格式,列的作用相对较小。可以将其理解为行矩阵是一个巨大的特征向量的集合。每一行就是一个具有相同格式的向量数据,且每一行的向量内容都可以单独取出来进行操作。

a.txt:
1 2 3
4 5 6
@Test
    public void testRowMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<Vector> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            double[] ds = new double[ss.length];
            for (int i = 0; i < ss.length; i++) {
                ds[i] = Double.valueOf(ss[i]);
            }
            return Vectors.dense(ds);
        });
        RowMatrix rmx = new RowMatrix(rdd2.rdd());
        System.out.println(rmx.numRows()); //2
        System.out.println(rmx.numCols()); //3
    }

2.4.2 行索引矩阵

An IndexedRowMatrix类似于a RowMatrix但具有有意义的行索引。它由索引行的RDD支持,因此每行由其索引(long-typed)和本地向量表示。

@Test
    public void testIndexedRowMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<Vector> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            double[] ds = new double[ss.length];
            for (int i = 0; i < ss.length; i++) {
                ds[i] = Double.valueOf(ss[i]);
            }
            return Vectors.dense(ds);
        });
        JavaRDD<IndexedRow> rdd3 = rdd2.map(v1 -> new IndexedRow(v1.size(), v1)); //建立待索引的行
        IndexedRowMatrix irmx = new IndexedRowMatrix(rdd3.rdd()); //行索引
        JavaRDD<IndexedRow> rdd4 = irmx.rows().toJavaRDD();
        List<IndexedRow> collect = rdd4.collect();
        collect.forEach(indexedRow -> System.out.println(indexedRow)); //打印行内容
    }

2.4.3 坐标矩阵

CoordinateMatrix是由其条目的RDD支持的分布式矩阵。每个条目都是一个元组(i: Long, j: Long, value: Double),其中i是行索引,j是列索引, value是条目值。只有当矩阵的两个维度都很大并且矩阵非常稀疏时,才应该使用它。

@Test
    public void testCoordinateMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<MatrixEntry> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            return new MatrixEntry(Long.valueOf(ss[0]),Long.valueOf(ss[1]),Double.valueOf(ss[2]));
        });
        CoordinateMatrix matrix = new CoordinateMatrix(rdd2.rdd());
        List<MatrixEntry> collect = matrix.entries().toJavaRDD().collect();
        for (MatrixEntry matrixEntry : collect) {
            System.out.println(matrixEntry);
        }
    }

2.4.4 分块矩阵

BlockMatrix是支持矩阵分块RDD的分布式矩阵,其中矩阵分块由((int,int),matrix)元祖所构成(int,int)是该部分矩阵所处的矩阵的索引位置,Matrix表示该索引位置上的子矩阵

分块矩阵支持矩阵加法和乘法,并设有辅助函数验证用于检查矩阵是否设置正确。

@Test
    public void testCoordinateMatrix() {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SparkMLlib")
                .setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        JavaRDD<String> rdd1 = jsc.textFile("F:\\code\\files\\a.txt");
        JavaRDD<MatrixEntry> rdd2 = rdd1.map(v1 -> {
            String[] ss = v1.split(" ");
            return new MatrixEntry(Long.valueOf(ss[0]),Long.valueOf(ss[1]),Double.valueOf(ss[2]));
        });
        CoordinateMatrix matrix = new CoordinateMatrix(rdd2.rdd());
        //转化成块矩阵
        BlockMatrix blockMatrix = matrix.toBlockMatrix();
        // 对该分块矩阵进行检验,确认该分块是否正确,如果不正确则抛出异常
        blockMatrix.validate(); 
        BlockMatrix addBlockMatrix = blockMatrix.add(blockMatrix); // 相加
        List<Tuple2<Tuple2<Object, Object>, Matrix>> collect = addBlockMatrix.blocks().toJavaRDD().collect();
        for (Tuple2<Tuple2<Object, Object>, Matrix> tuple2MatrixTuple2 : collect) {
            System.out.println(tuple2MatrixTuple2);
        }
//        ((0,0),5 x 6 CSCMatrix
//                (1,2) 6.0
//        (4,5) 12.0)
    }

结果说明:

不论是索引值还是坐标值都是从0开始,所以行数是索引值加1

3 RDD、DataSet、Dataframe区别及转化

RDD(Spark1.0)—>DataFrame(Spark1.3)---->DataSet(Spark1.6)


SparkSql提供了Dataframe和DataSet的数据抽象,DataFrame就是RDD+Schema,可以认为是一张二维表格。它的劣势是在编译器不对表格中的字段进行类型检查。在运行期间检查。DataSet是Spark最新数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。


个人理解:

JavaRDD<String[]> map = source.map(line -> line.split(","));

List<String[]> collect = source.map(line -> line.split(",")).collect();

rdd是一种特殊的分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类Abstract,当collect转变为本质的存储类型.map进行以行为单位或者list进行遍历.每行的数据类型为String[].


示例:

package cn.itcast.spark;
import java.io.Serializable;
public class Student implements Serializable {
   private Long id;
   private String name;
   private Integer age;
    public Long getId() {
        return id;
    }
    public void setId(Long id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Integer getAge() {
        return age;
    }
    public void setAge(Integer age) {
        this.age = age;
    }
    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
users.txt:
1,zhangsan,20
2,lisi,21
3,wangwu,19
4,zhaoliu,18
 @Test
    public void testDataSet() {
        SparkSession sparkSession = SparkSession.builder().appName("SparkMLlib").master("local[*]").getOrCreate();
        // 读取文件
        JavaRDD<String> source = sparkSession.read().textFile("F:\\code\\users.txt").toJavaRDD();
        JavaRDD<Student> rowRDD = source.map(line -> {
            String parts[] = line.split(",");
            Student student = new Student();
            student.setId(Long.valueOf(parts[0]));
            student.setName(parts[1]);
            student.setAge(Integer.valueOf(parts[2]));
            return student;
        });
        Dataset<Row> df = sparkSession.createDataFrame(rowRDD, Student.class);
//        df.show();
        df.select("id", "name").orderBy(df.col("id").desc()).show();
    }

运行结果:

+---+--------+
| id|    name|
+---+--------+
|  4| zhaoliu|
|  3|  wangwu|
|  2|    lisi|
|  1|zhangsan|
+---+--------+


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
9天前
|
机器学习/深度学习 数据可视化 大数据
机器学习与大数据分析的结合:智能决策的新引擎
机器学习与大数据分析的结合:智能决策的新引擎
85 15
|
4月前
|
机器学习/深度学习 自然语言处理 算法
【数据挖掘】金山办公2020校招大数据和机器学习算法笔试题
金山办公2020校招大数据和机器学习算法笔试题的解析,涵盖了编程、数据结构、正则表达式、机器学习等多个领域的题目和答案。
112 10
|
1月前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
224 5
|
2月前
|
机器学习/深度学习 自然语言处理 算法
大数据与机器学习
大数据与机器学习紧密相关,前者指代海量、多样化且增长迅速的数据集,后者则是使计算机通过数据自动学习并优化的技术。大数据涵盖结构化、半结构化及非结构化的信息,其应用广泛,包括商业智能、金融和医疗保健等领域;而机器学习分为监督学习、无监督学习及强化学习,被应用于图像识别、自然语言处理和推荐系统等方面。二者相结合,能有效提升数据分析的准确性和效率,在智能交通、医疗及金融科技等多个领域创造巨大价值。
152 2
|
4月前
|
机器学习/深度学习 分布式计算 并行计算
性能优化视角:Python与R在大数据与高性能机器学习中的选择
【8月更文第6天】随着数据量的激增,传统的单机计算已经难以满足处理大规模数据集的需求。Python和R作为流行的数据科学语言,各自拥有独特的特性和生态系统来应对大数据和高性能计算的挑战。本文将从性能优化的角度出发,探讨这两种语言在处理大数据集和高性能计算时的不同表现,并提供具体的代码示例。
118 3
|
4月前
|
机器学习/深度学习 分布式计算 算法
MaxCompute 的 MapReduce 与机器学习
【8月更文第31天】随着大数据时代的到来,如何有效地处理和分析海量数据成为了一个重要的课题。MapReduce 是一种编程模型,用于处理和生成大型数据集,其核心思想是将计算任务分解为可以并行处理的小任务。阿里云的 MaxCompute 是一个面向离线数据仓库的计算服务,提供了 MapReduce 接口来处理大规模数据集。本文将探讨如何利用 MaxCompute 的 MapReduce 功能来执行复杂的计算任务,特别是应用于机器学习场景。
110 0
|
7月前
|
机器学习/深度学习 存储 搜索推荐
利用机器学习算法改善电商推荐系统的效率
电商行业日益竞争激烈,提升用户体验成为关键。本文将探讨如何利用机器学习算法优化电商推荐系统,通过分析用户行为数据和商品信息,实现个性化推荐,从而提高推荐效率和准确性。
256 14
|
7月前
|
机器学习/深度学习 算法 数据可视化
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
实现机器学习算法时,特征选择是非常重要的一步,你有哪些推荐的方法?
140 1
|
7月前
|
机器学习/深度学习 算法 搜索推荐
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
Machine Learning机器学习之决策树算法 Decision Tree(附Python代码)
|
7月前
|
机器学习/深度学习 数据采集 算法
解码癌症预测的密码:可解释性机器学习算法SHAP揭示XGBoost模型的预测机制
解码癌症预测的密码:可解释性机器学习算法SHAP揭示XGBoost模型的预测机制
362 0