【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化

简介: 【Spark 3.0-JavaAPI-pom】体验JavaRDD函数封装变化

一、pom


    <properties>
      <maven.compiler.source>1.8</maven.compiler.source>
      <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>3.0.0</spark.version>
        <hadoop.version>3.2.1</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>
        <build>
        <pLuginManagement>
          <plugins>
            <!--编译scala的插件-- ->
            <pLugin>
              <groupId>net.alchim31.maven</groupId>
              <artifactId>scala-maven-plugin</artifactId>
              <version>3.2.2</version>
            </plugin>
            <!--编译java的插件-->
            <pLugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.5.1</version>
            </plugin>
          </plugins>
        </pLuginManagement>
      <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compile</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


二、Spark3.0-JavaAPI程序


实现Spark读取HDFS中的文本文件,实现单词计数,并2将结果输出到HDFS中。


2.1 java匿名实现类

// 1.创建配置
SparkConf conf = SparkConf().setAppName("JavaWordCount");
// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
// 4.切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFuntion<String,String>(){
  @Override
  public Iterator<String> call(String line) throws Expection{
    return Arrays.asList(line.split(" ")).iterator;
  }
});
// 5.单词和1组合
JavaPairRDD<String,Integer> wordAndOne = words.maoToPair(new PairFunction<String,String,Integer>(){
    @Override
  public Tuple2<String,Integer> call(String words) throws Exception{
    return Tuple2.apply(word,1);
  }
});
// 6.分组聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer,Integer,Integer>(){
  @Override
  public Integer call(Integer v1,Integer v2) throws Exception{
    return v1+v2;
  }
});
// 7.交换kv顺序
JavaPairRDD<Integer,String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer,String>(){
  @Override
  public Tuple2<Integer,String> call(Tuple2<String,Integer> tp) throws Exception{
    return tp.swap();
  }
});
// 8.排序
JavaPairRDD<Integer,String> sorted = swapped.sortedByKey(false);
// 9.交换kv顺序
JavaPairRDD<String,Integer> result = sorted.mapToPair(new PairFunction<Tuple2<String,Integer>,String,Integer>(){
  @Override
  public Tuple2<String,Integer> call(Tuple2<Integer,String> tp) throws Exception{
    return tp.swap();
  }
});
// 10.触发action保存到HDFS
result.saveAsTextFile(args[1]);
// 11.释放资源
jsc.stop();


2.2 Lambda表达式实现

// 1.创建配置
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount");
// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());
// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));
// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);
// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
  .mapToPair(tp -> tp.swap());
// 8.保存hdfs
sorted.saveAsTextFile(args[1]);
// 9.释放资源
jsc.stop();

2.3 程序打包

20201005095757615.png


2.4 上传到Linux

20201005095846361.png

2.5 启动HDFS


hadoop-daemon.sh start namenode

hadoop-daemon.sh start datanode


2.6 Spark执行jar包

bin/spark-3.0.0-bin-hadoop3.2/bin/spark-submit  --master  spark://hadoop1:7077,hadoop2:7077,hadoop3:7077  --executor-memory 1g  --total-executor-cores 5  --class com.wang.spark.LambdaJavaWordCount  /root/spark-in-active-1.0-SNAPSHOT.jar  hdfs://hadoop1:9000/wc  hdfs://hadoop1:9000/out

image.png

2.7 查看结果


hdfs -dfs -cat /out/*

image.png


三、本机执行


本地测试,不会建立集群链接,再本地的一个进程运行。

// 1.创建配置【本地测试】
SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount").setMaster("local[*]");
// 2.封装了SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
// 3.jsc 创建 java RDD
JavaRDD<String> lines = jsc.textFile(args[0]);
// 4.切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator());
// 5.单词 1
JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1));
// 6.聚合
JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j);
// 7.排序
JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false)
  .mapToPair(tp -> tp.swap());
// 8.保存hdfs
sorted.saveAsTextFile(args[1]);
// 9.释放资源
jsc.stop();

20201005103410165.png

20201005103117145.png

运行时候,传入参数本地数据或者hdfs的数据。

如果出现这个错误,需要将pom中的scala的****放开

或者全部读取本机的文件。

执行结果如下:

20201005103533527.png

目录
相关文章
|
存储 分布式计算 并行计算
大数据Spark RDD 函数 1
大数据Spark RDD 函数
105 0
spark3.5.1中内置函数大全
spark3.5.1中内置函数大全
|
4月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
分布式计算 Java Spark
图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理
51 0
|
6月前
|
SQL 分布式计算 Spark
Spark【Spark SQL(四)UDF函数和UDAF函数】
Spark【Spark SQL(四)UDF函数和UDAF函数】
|
分布式计算 大数据 数据挖掘
大数据Spark RDD 函数 2
大数据Spark RDD 函数
96 0
|
SQL JSON 分布式计算
spark2 sql读取数据源编程学习样例2:函数实现详解
spark2 sql读取数据源编程学习样例2:函数实现详解
90 0
spark2 sql读取数据源编程学习样例2:函数实现详解
|
SQL 存储 分布式计算
Spark强大的函数扩展功能
Spark强大的函数扩展功能
|
分布式计算 Java Scala
一天学完spark的Scala基础语法教程四、方法与函数(idea版本)
一天学完spark的Scala基础语法教程四、方法与函数(idea版本)
95 0
一天学完spark的Scala基础语法教程四、方法与函数(idea版本)
|
SQL 缓存 分布式计算
spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么
spark2的SparkSession思考与总结2:SparkSession有哪些函数及作用是什么
285 0