Spark教程

简介: Spark官方网站http://spark.apache.orgSpark通用性解释Spark的特点官方网站都有。

Spark官方网站

http://spark.apache.org

Spark通用性解释

Spark通用性

Spark的特点

官方网站都有。目前支持Java,Scala, Python,R。推荐使用Java和Scala,spark2中对python的支持不够好。

Spark的数据源

  • HDFS,
  • HBase,
  • Cassandra(类似于hbase数据库,国外用的多)
  • Hive,
  • Tachyon(基于内存的分布式的文件系统, 阿里出的http://www.alluxio.org/,比较重要)

Spark的四种部署模式

  • hadoop 模式(spark on yarn)。用yarn来管理spark的资源,也是国内用的最多的模式。
  • Mesos模式。Mesos一个类似于yarn的资源管理器,国内用的少,国外多。
  • Standalone模式。上面两种都不是自己管理资源,使用第三方来管理,而standalone模式是spark自己来管理资源,多内用的比较多
  • 部署到云端

验证集群是否好用

验证spark集群是否可用

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 1 --driver-memory 500m --executor-memory 500m --executor-cores 1 lib/spark-examples-1.5.2-hadoop2.5*.jar 10

验证结果如下:
Pi is roughly 3.1411
scala编写完简单的运算后,可以在localhost:4040上查看

运行spark有两种:

  1. 本地运行
bin/spark-submit --master local --class org.apache.spark.examples.SparkPi lib/spark-examples-1.5.2-hadoop2.6.0-cdh5.4.4.jar 5

本地运行会直接输出结果

  1. 分布式运行
bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi lib/spark-examples-1.5.2-hadoop2.6.0-cdh5.4.4.jar 5

而分布式运行的结果在

cd $HADOOP_HOME/logs/userlogs/

scala 运行例子

val lines = sc.textFile("hdfs://master:9000/input0917/qqFriend.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

输出:
totalLength: Int = 188

RDD

sparkRDD

Spark架构

这里写图片描述

Spark安装前准备

  1. 安装好JAVA
  2. 安装好Hadoop
  3. 安装好scala
  4. 并配置好上述的的环境变量

Spark任务提交

http://spark.apache.org/docs/2.1.0/submitting-applications.html

SparkHistoryServer配置

关于sparkshell的使用

http://blog.csdn.net/sunflower_cao/article/details/26708797

创见RDD文件

第一种方式是通过读取本地或者hdfs上的文件创建RDD

sc.textFile("hdfs://")

第二种方式是通过并行化的方式创建RDD. 其实就是通过我们自己取模拟数据

val str=Array("you jump","I jump")
val list = Array(1,2,3,4,5,6)
val listadd = sc.parallelize(list) 可以看到返回值就是RDD,当然可以调用RDD中的函数,比如reduce算子等
listadd.reduce(_+_)

大多数方式是使用第一种

transformation 和action 原理

Spark支持两种RDD操作: transformation and action.
transformation 会针对已有的RDD创建一个新的RDD,而action则主要对RDD进行最后的操作。transformation只是记录了对RDD的操作,并不会触发spark程序的执行,只有当transform之后接着一个action操作,那么所有的transformation才会执行。比如

val file=sc.textFile("hdfs://hadoop1:9000/hello.txt").flatMap(line => line.split("\t"))
回车之后并没有触发spark的执行,因为flatMap等属于transformation操作
等到file.collect()后会看到spark的执行,collect是action操作

具体可以参考编程指南:http://spark.apache.org/docs/2.1.0/programming-guide.html

那为什么要有transformation和action呢?Spark可以通过这种lazy属性,来进行底层的spark应用程序的优化,避免过多的中间结果。
这里写图片描述

这里写图片描述

Spark算子

http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

练习如下,注意eclipse的使用。


 - 将鼠标放在方法上,ctrl+1 来选择返回值类型
 - 鼠标选中区域, ctrl+shift+/ 来进行多行注释
 - 当实现某个方法,查看错误信息,提示添加内部方法,添加即可

map和flatmap的区别:

map与mappartitions的区别:

map迭代的是RDD中的每一个元素,而mappartiontions迭代的一个分区。如果在映射过程中频繁创建额外的对象,那么mappartitions比map高效。但也需要注意内存,因为内存可能不够用,比分区小。

比如在迭代元素的时候要将10000条结果存到mysql数据库,如果用map需要创建10000个额外的connection,用map partitions可能只需要创建10个, 对每个分区建立连接。

下面是不同算子做实验的代码示例:

repartition coalesce 重新进行分区 宽以来 窄依赖 shuffle
那什么时候用repartition呢?比如: filter之后,partition的量会减少,比如之前是100 partitions,对应的需要起100 个tasks,现在filter后变成了50个,此时要是100个tasks就浪费了,可以repartition到50。repartition分区会进行shuffle操作

package com.ecaoyng.spark;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.cassandra.cli.CliParser.newColumnFamily_return;
import org.apache.cassandra.thrift.Cassandra.system_add_column_family_args;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.stringtemplate.v4.compiler.STParser.mapExpr_return;

import scala.Tuple2;

public class Trans {

    /**
     * repartition will do shuffle 
     */

    public static void repartition(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("repartition");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3,4,5,6);
        JavaRDD<Integer> list1RDD = sc.parallelize(list1);
        JavaRDD<Integer> repartition = list1RDD.repartition(2);
        repartition.foreach(new VoidFunction<Integer>() {

            public void call(Integer arg0) throws Exception {
                System.out.println(arg0);
            }

        });

    }




    /**
     * output: 
     * Hello1
Hello2
Hello3

Hello4
Hello5
Hello6

     */


    public static void mapPartitions(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("mapParititons");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3,4,5,6);
        JavaRDD<Integer> list1RDD = sc.parallelize(list1, 2);
        /**
         * return value is string
         */
        JavaRDD<String> mapPartitions = list1RDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() {

            /*
             * process each partition each time
             * partition1: 123
             * partition2: 456
             * */

            public Iterable<String> call(Iterator<Integer> t)
                    throws Exception {
                ArrayList<String> arrayList = new ArrayList<String>();
                while (t.hasNext()) {
                    Integer i = (Integer) t.next();
                    arrayList.add("Hello" + i);

                }   

                return arrayList;
            }

        });
        mapPartitions.foreach(new VoidFunction<String>() {

            public void call(String arg0) throws Exception {
                System.out.println(arg0);
            }


        });
    }


    /*
     * di ka er ji
     * output:1 a
1 b
1 c
2 a
2 b
2 c
3 a
3 b
3 c
    */
    public static void cartesian(){

        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("cartesian");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3);
        List<String> list2 = Arrays.asList("a","b","c");
        JavaRDD<Integer> list1RDD = sc.parallelize(list1);
        JavaRDD<String> list2RDD = sc.parallelize(list2);
        JavaPairRDD<Integer, String> cartesian = list1RDD.cartesian(list2RDD);
        cartesian.foreach(new VoidFunction<Tuple2<Integer,String>>() {

            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + " " + t._2);
            }
        });


    }


/*  qu chong
 * output:4
1
6
3
5
2
    */

    public static void distinct(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("distinct");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3,4,4,5,5,6,6);
        JavaRDD<Integer> listRDD = sc.parallelize(list1);
        JavaRDD<Integer> distinct = listRDD.distinct();
        distinct.foreach(new VoidFunction<Integer>() {

            public void call(Integer t) throws Exception {
                System.out.println(t);
            }

        });


    }


    /*
     * jiao ji
     * output: 4
3
    */
    public static void interSection() {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("union");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3,4);
        List<Integer> list2 = Arrays.asList(3,4,5,6);
        JavaRDD<Integer> list1RDD = sc.parallelize(list1);
        JavaRDD<Integer> list2RDD = sc.parallelize(list2);
        JavaRDD<Integer> intersection = list1RDD.intersection(list2RDD);

        intersection.foreach(new VoidFunction<Integer>() {

            public void call(Integer t) throws Exception {
                System.out.println(t);
            }
        });

    }


    /*
     * bingji, bu qu chong
    */
    public static void union() {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("union");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3,4);
        List<Integer> list2 = Arrays.asList(3,4,5,6);
        JavaRDD<Integer> list1RDD = sc.parallelize(list1);
        JavaRDD<Integer> list2RDD = sc.parallelize(list2);
        JavaRDD<Integer> union = list1RDD.union(list2RDD);
        union.foreach(new VoidFunction<Integer>() {

            public void call(Integer t) throws Exception {
                System.out.println(t);
            }
        });

    }




    /*
     * ID: 1
Name: [Tom, Tom]
Score: [30, 70]
ID: 3
Name: [Chirs]
Score: [60, 90]
ID: 2
Name: [Jerry]
Score: [40, 60]
     * 
    */
    public static void cogroup() {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("cogroup");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<Integer, String>> stu = Arrays.asList(
                    new Tuple2<Integer, String>(1, "Tom"),
                    new Tuple2<Integer, String>(2, "Jerry"),
                    new Tuple2<Integer, String>(3, "Chirs"),
                    new Tuple2<Integer, String>(1, "Tom")
                    );

        List<Tuple2<Integer, Integer>> score = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 30),
                new Tuple2<Integer, Integer>(2, 40),
                new Tuple2<Integer, Integer>(3, 60),
                new Tuple2<Integer, Integer>(1, 70),
                new Tuple2<Integer, Integer>(2, 60),
                new Tuple2<Integer, Integer>(3, 90)
                );

        JavaPairRDD<Integer, String> stuRDD = sc.parallelizePairs(stu);
        JavaPairRDD<Integer, Integer> scoreRDD = sc.parallelizePairs(score);
//      <2,tuple<Jerry,{40,60}>>
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroupRDD = stuRDD.cogroup(scoreRDD);
        cogroupRDD.foreach(new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {

            public void call(
                    Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
                    throws Exception {
                System.out.println("ID: " + t._1);
                System.out.println("Name: " + t._2._1);
                System.out.println("Score: " + t._2._2);
            }
        });
    }



    /*
     * output:
ID: 1
Name: Tom
Score: 30
==================>
ID: 3
Name: Chirs
Score: 60
==================>
ID: 2
Name: Jerry
Score: 40
==================>
    */

    public static void join(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("join");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<Integer, String>> stu = Arrays.asList(
                    new Tuple2<Integer, String>(1, "Tom"),
                    new Tuple2<Integer, String>(2, "Jerry"),
                    new Tuple2<Integer, String>(3, "Chirs")
                    );

        List<Tuple2<Integer, Integer>> score = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 30),
                new Tuple2<Integer, Integer>(2, 40),
                new Tuple2<Integer, Integer>(3, 60)
                );

        JavaPairRDD<Integer, String> stuRDD = sc.parallelizePairs(stu);
        JavaPairRDD<Integer, Integer> scoreRDD = sc.parallelizePairs(score);
//      Integer:id, Tuple2<String, Integer>: name,score
        JavaPairRDD<Integer, Tuple2<String, Integer>> joined = stuRDD.join(scoreRDD);
        joined.foreach(new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

            public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
                    throws Exception {
                System.out.println("ID: " + t._1);
                System.out.println("Name: " + t._2._1);
                System.out.println("Score: " + t._2._2);
                System.out.println("==================>");
            }

        });




    }




    public static void sortByKey(){

    }

    /* calculate sum by each key
     * output:
PBCS:95
TDCS:120
    */
    public static void reduceByKey(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("reduceByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<String, Integer>> asList = Arrays.asList(
                    new Tuple2<String, Integer>("TDCS", 30),
                    new Tuple2<String, Integer>("TDCS", 40),
                    new Tuple2<String, Integer>("TDCS", 50),
                    new Tuple2<String, Integer>("PBCS", 40),
                    new Tuple2<String, Integer>("PBCS", 55)             
                    );

        JavaPairRDD<String, Integer> parallelizePairsRDD = sc.parallelizePairs(asList);
        /*
         * input: Integer, Integer
         * output: sum:Integer
         * scala.reduceByKey(_+_)
        */
        parallelizePairsRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer v1, Integer v2) throws Exception {

                return v1 + v2;
            }
        }).foreach(new VoidFunction<Tuple2<String,Integer>>() {

            public void call(Tuple2<String, Integer> sum) throws Exception {
                System.out.println(sum._1 + ":" + sum._2);
            }
        });

    }



    /*
     * src: key-value   
     * Tuple2 in scala is similar with map in Java  
     * output:
PBCS
Terry
Shawn
======================================>
TDCS
Tom
Jerry
Chris
     * 
    */
    public static void groupByKey(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("groupByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<String, String>> list = Arrays.asList(
                new Tuple2<String, String>("TDCS", "Tom"),
                new Tuple2<String, String>("TDCS", "Jerry"),
                new Tuple2<String, String>("TDCS", "Chris"),
                new Tuple2<String, String>("PBCS", "Terry"),
                new Tuple2<String, String>("PBCS", "Shawn")             
                );
//      due to key-value, it should be use parallelizePairs 
        JavaPairRDD<String, String> parallelizePairsRDD = sc.parallelizePairs(list);
        JavaPairRDD<String, Iterable<String>> groupByKey = parallelizePairsRDD.groupByKey();
        groupByKey.foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {

            public void call(Tuple2<String, Iterable<String>> t)
                    throws Exception {
                System.out.println(t._1);
                Iterator<String> iterator = t._2.iterator();
                while (iterator.hasNext()) {
                    System.out.println(iterator.next());

                }
                System.out.println("======================================>");

            }

        });
    }


    /*
     * flatmap: {You jump I jump}
     * split each word into a sigle line 
    */
    public static void flatMap(){
        SparkConf conf = new SparkConf();
//      if no set on conf.setMaster, the default setting will use distributed mode. 
        conf.setMaster("local");
        conf.setAppName("flatmap");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list = Arrays.asList("you jump","I jump");
        JavaRDD<String> listRDD = sc.parallelize(list);
//      U refers to the return value of method: FlatMapFunction
        JavaRDD<String> flatMap = listRDD.flatMap(new FlatMapFunction<String, String>() {

            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));
            }
        });

        flatMap.foreach(new VoidFunction<String>() {

            public void call(String line) throws Exception {
                System.out.println(line);
            }
        });




    }


    /*
     * Map() test: say hello to each items within a array
    */
    public static void map() {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("map");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list =  Arrays.asList("Tom","Jerry","USA","China");
        JavaRDD<String> listRDD = sc.parallelize(list);
//      R refers to the retuen value of new Function
        JavaRDD<String> map = listRDD.map(new Function<String, String>() {

            public String call(String str) throws Exception {
                return "hello " + str;
            }
        });

//      action opts
        map.foreach(new VoidFunction<String>() {

            public void call(String str) throws Exception {
                System.out.println(str);
            }
        });


    }

    /*
     * fliter(): get odds from dataset
    */
    public static void fliter(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("map");
        JavaSparkContext sc_fliter = new JavaSparkContext(conf);
        List<Integer> fliter_list = Arrays.asList(1,2,3,4,5,6,7,8,9);
        JavaRDD<Integer> fliterRDD = sc_fliter.parallelize(fliter_list);
        JavaRDD<Integer> filter = fliterRDD.filter(new Function<Integer, Boolean>() {

            public Boolean call(Integer num) throws Exception {
                return num %2 ==0;
            }
        });
        filter.foreach(new VoidFunction<Integer>() {

            public void call(Integer num) throws Exception {
                System.out.println(num);
            }

        });









    }




    public static void main(String[] args) {
//      map();
//      fliter();
//      flatMap();
//      groupByKey();
//      reduceByKey();
//      join();
//      cogroup();
//      union();
//      interSection();
//      distinct();
//      cartesian();
//      mapPartitions();
        repartition();


    }
}

RDD持久化(RDD Persistence)

http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

当你缓存了一个RDD,每个节点都缓存了RDD的所有分区,这样就可以在内存中进行计算,速度更快(提高10倍)。
可以对希望缓存的RDD进行cache或者persist方法进行标记。她通过动作操作第一次在Rdd上进行计算后,它会被缓存在节点的内存中。Spark的缓存具有容错性,如果RDD的某个分区丢失,它会自动使用最初创建RDD的转换操作进行重新计算。

建议:

  1. 优先选择MEMORY_ONLY(=cache),纯内存计算,速度快
  2. MEMORY_ONLY 缓存不了所有的数据,MEMORY_ONLY_SER 将数据序列化后进行存储,需要耗费一些CPU资源,并需要反序列化。
  3. 可以选在带2的方式。恢复的时候可以使用备份,不需要重新计算
  4. 能不是用disk的就不使用disk,有时候从磁盘读还不如重新计算

之外,OFF_HEAP (experimental) 是将RDD存到tachyon上。

那什么是tachyon

一个开源的基于jvm的内存分布式文件系统,介于计算层和存储层之间,简单的理解为存储层在内存内的缓存系统。

为何会出现tachyon?以内存替换磁盘,就能明显的减少延时,所以涌现出很多基于内存的计算工具,比如spark计算框架。

  1. spark运行在jvm中,spark的任务会将数据存入jvm的堆中,随着计算的迭代,jvm堆中存放的数据量迅速增大,对于spark而言,spark的计算引擎和存储引擎处在同一个jvm中,所以会有重复的gc方面的开销,增大了系统的延时

    1. 当jvm崩溃时,缓存在jvm堆中的数据也会消失,这个时候spark不得不根据rdd的血缘关系重新计算数据。
    2. 如果spark需要和其他的计算框架,比如mapreduce,此时就需要通过第三方共享,比如hdfs。需要额外的开销,比如磁盘的IO开销

因基于内存的分布式计算框架有以上的问题,那么就促使了内存分布式文件系统的诞生, 比如tachyon。

共享变量(广播变量,累加变量)

广播变量

这里写图片描述

举例: 之前 val a的时候需要将a拷贝好多份到每个task上,如果a大道1个G,就浪费好多空间。现在有了broadcast var 就将变量拷到

累加变量

这里写图片描述

按照之前的习惯写sum会出错,一种新的声明方式

Spark on Yarn

只需要在spark下的conf目录下的spark-env.sh 中配置Hadoop_conf_dir(hadoop的位置)即可。让spark通过hadoop的配置文件找到yarn.

之前用standalone模式需要启动master和worker,现在用yarn模式不需要启动master和worker,只需要启动hdfs和yarn即可

start-dfs.sh, jps -> 3个服务
start-yarn.sh

之后spark的开发和之前一样,只需要在提交代码的时候指定运行的模式
具体请参考官方文档:
http://spark.apache.org/docs/latest/submitting-applications.html

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

yarn提交也分为client和cluster模式

cluster

这里写图片描述

注意executor会向driver(application master)

client:

建议:

  1. 调试的时候使用client模式。使用client模式的时候打印出来的信息非常详细,有利于我们调试程序。
  2. 调试完成以后,建议使用cluster模式提交任务。

举例,先在本地运行如下代码,成功在本地生成union的并集:

package com.ecaoyng.spark;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class SaveAsText {

    public static void Save(){
        SparkConf conf = new SparkConf();
        conf.setMaster("local[3]"); // 因为是本地运行
        conf.setAppName("SaveAsTextFile");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Integer> list1 = Arrays.asList(1,2,3,4,5);
        List<Integer> list2 = Arrays.asList(4,5,6,7,8);
        JavaRDD<Integer> list1RDD = sc.parallelize(list1);
        JavaRDD<Integer> list2RDD = sc.parallelize(list2);
        JavaRDD<Integer> unionRDD = list1RDD.union(list2RDD);
        unionRDD.repartition(1).saveAsTextFile("/home/zkpk/union");
        # 注意是在本地磁盘的目录下生成union文件夹

    }


    public static void main(String[] args) {
        Save();
    }


}

测试通过后,因为要放到cluster上运行,所以将conf.setMaster注释掉。之后用eclipse将jar包导出
下面是提交的代码:

/home/zkpk/spark-1.5.2-bin-2.5.2/bin/spark-submit \
  --class com.ecaoyng.spark.SaveAsText \
  --master yarn \
  --deploy-mode cluster \
  --executor-memory 100M \
  --num-executors 1 \
  /home/zkpk/output/SaveUnion.jar \

注意里面不要出现注释,否则可能会出错。如果发现参数错误可以spark-submit –help 查看哪些是可以运行在cluster上的参数。 输出的结果放在hdfs上。当然事先需要设置export SPARK_CONF_DIR=/home/zkpk/hadoop-2.5.2 或者在spark的配置中的spark-env.sh中配置
也可以在web中查看http://master:18088/ 提交的进度

宽依赖和窄依赖

窄依赖,父RDD的每个分区只被一个子RDD的分区依赖。
宽依赖,父RDD的分区被多个子RDD的分区所依赖。
这里写图片描述

shuffle的原理

这里写图片描述

本例中前面三个RDD都不涉及到其他的节点,但是真实环境是需要其他节点的参与的。shuffle阶段需要大量的磁盘IO,序列化与反序列化,网络数据的传输,所以spark很大的性能损耗都在shuffle上,所以有必要进行调优。

shuffle的发展经历了三个阶段:1.2,1.3, 1.5之后。分别取名:

  • 未经优化的HashshuffleManager
  • 优化后的HashshuffleManager
  • SortShuffleManager
    • 普通机制
    • bypass机制

[未经优化的HashshuffleManager]

这里写图片描述

每一个map task生成的buffer和file和 reduce的数量有关,可以联想MR中的partition,可以看到生成的碎文件太多,产生的中间文件数量如图

[经过优化的HashshuffleManager]

这里写图片描述

[现在在使用的是sort-based shuffle manage ]

shuffle 配置参数地址:http://spark.apache.org/docs/2.0.0/configuration.html

正常情况使用的是普通运行机制,但是当shuffle read task小于200(默认值)时就启用bypass机制

这里写图片描述

因为合并成了一个磁盘文件,所以中间文件将被删除

bypass模式
这里写图片描述


未完待续

目录
相关文章
|
8月前
|
SQL 分布式计算 Spark
Spark 教程系列
Spark 教程系列
65 0
|
6月前
|
分布式计算 Java Serverless
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
本文以 ECS 连接 EMR Serverless Spark 为例,介绍如何通过 EMR Serverless spark-submit 命令行工具进行 Spark 任务开发。
441 7
EMR Serverless Spark 实践教程 | 通过 spark-submit 命令行工具提交 Spark 任务
|
6月前
|
分布式计算 运维 Serverless
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
在大数据快速发展的时代,流式处理技术对于实时数据分析至关重要。EMR Serverless Spark提供了一个强大而可扩展的平台,它不仅简化了实时数据处理流程,还免去了服务器管理的烦恼,提升了效率。本文将指导您使用EMR Serverless Spark提交PySpark流式任务,展示其在流处理方面的易用性和可运维性。
297 7
EMR Serverless Spark 实践教程 | 通过 EMR Serverless Spark 提交 PySpark 流任务
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
242 0
|
5月前
|
分布式计算 Java Linux
【Deepin 20系统】Linux 系统安装Spark教程及使用
在Deepin 20系统上安装和使用Apache Spark的详细教程,包括安装Java JDK、下载和解压Spark安装包、配置环境变量和Spark配置文件、启动和关闭Spark集群的步骤,以及使用Spark Shell和PySpark进行简单操作的示例。
88 0
|
8月前
|
SQL 分布式计算 Hadoop
【Spark】Spark基础教程知识点
【Spark】Spark基础教程知识点
|
8月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
71 0
|
消息中间件 分布式计算 Kafka
Flink教程(30)- Flink VS Spark(下)
Flink教程(30)- Flink VS Spark(下)
95 0
|
分布式计算 API 调度
Flink教程(30)- Flink VS Spark(上)
Flink教程(30)- Flink VS Spark(上)
171 0
|
SQL 机器学习/深度学习 分布式计算
spark与pyspark教程(一)
spark与pyspark教程(一)
427 0