第一个spark应用开发详解(java版)

简介: java版的spark应用开发

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos
  • WordCount是大数据学习最好的入门demo,今天就一起开发java版本的WordCount,然后提交到Spark2.3.2环境运行;

版本信息

  1. 操作系统:CentOS7;
  2. JDK:1.8.0_191;
  3. Spark:2.3.3;
  4. Scala:2.11.12;
  5. Hadoop:2.7.7;
  6. Maven:3.5.0;

关于hadoop环境

关于spark环境

请注意,由于2.3.3版本的spark-core的jar包不支持scala2.12,所以在部署spark的时候,scala版本请使用2.11;

关于本次实战开发的应用

  • 本次实战开发的应用是经典的WorkCount,也就是指定一个文本文件,统计其中每个单词出现的次数,再取出现次数最多的10个,打印出来,并保存在hdfs文件中;

本次统计单词数用到的文本

  • 本次用到的txt文件,是我在网上找到的pdf版本的《乱世佳人》英文版,然后导出为txt,读者您可以自行选择适合的txt文件来测试;
  • 在hdfs服务所在的机器上执行以下命令,创建input文件夹:
~/hadoop-2.7.7/bin/hdfs dfs -mkdir /input
  • 在hdfs服务所在的机器上执行以下命令,创建output文件夹:
~/hadoop-2.7.7/bin/hdfs dfs -mkdir /output
  • 把本次用到的text文件上传到hdfs服务所在的机器,再执行以下命令将文本文件上传到hdfs的/input文件夹下:
~/hadoop-2.7.7/bin/hdfs dfs -put ~/GoneWiththeWind.txt /input

源码下载

  • 接下来详细讲述应用的编码过程,如果您不想自己写代码,也可以在GitHub下载完整的应用源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章源码在sparkwordcount这个文件夹下,如下图红框所示:

在这里插入图片描述

开发应用

  • 基于maven创建一个java应用sparkwordcount,pom.xml的内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.bolingcavalry</groupId>
    <artifactId>sparkwordcount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.2.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>exec</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <executable>java</executable>
                    <includeProjectDependencies>false</includeProjectDependencies>
                    <includePluginDependencies>false</includePluginDependencies>
                    <classpathScope>compile</classpathScope>
                    <mainClass>com.bolingcavalry.sparkwordcount.WordCount</mainClass>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  • 创建WrodCount类,关键代码位置都有注释,就不再赘述了:
package com.bolingcavalry.sparkwordcount;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @Description: spark的WordCount实战
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2019/2/8 17:21
 */
public class WordCount {

    public static void main(String[] args) {
        String hdfsHost = args[0];
        String hdfsPort = args[1];
        String textFileName = args[2];

        SparkConf sparkConf = new SparkConf().setAppName("Spark WordCount Application (java)");

        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        String hdfsBasePath = "hdfs://" + hdfsHost + ":" + hdfsPort;
        //文本文件的hdfs路径
        String inputPath = hdfsBasePath + "/input/" + textFileName;

        //输出结果文件的hdfs路径
        String outputPath = hdfsBasePath + "/output/"
                       + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());

        System.out.println("input path : " + inputPath);

        System.out.println("output path : " + outputPath);

        //导入文件
        JavaRDD<String> textFile = javaSparkContext.textFile(inputPath);

        JavaPairRDD<String, Integer> counts = textFile
                //每一行都分割成单词,返回后组成一个大集合
                .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
                //key是单词,value是1
                .mapToPair(word -> new Tuple2<>(word, 1))
                //基于key进行reduce,逻辑是将value累加
                .reduceByKey((a, b) -> a + b);

        //先将key和value倒过来,再按照key排序
        JavaPairRDD<Integer, String> sorts = counts
                //key和value颠倒,生成新的map
                .mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1()))
                //按照key倒排序
                .sortByKey(false);

        //取前10个
        List<Tuple2<Integer, String>> top10 = sorts.take(10);

        //打印出来
        for(Tuple2<Integer, String> tuple2 : top10){
            System.out.println(tuple2._2() + "\t" + tuple2._1());
        }

        //分区合并成一个,再导出为一个txt保存在hdfs
        javaSparkContext.parallelize(top10).coalesce(1).saveAsTextFile(outputPath);

        //关闭context
        javaSparkContext.close();
    }
}
  • 在pom.xml目录下执行以下命令,编译构建jar包:
mvn clean package -Dmaven.test.skip=true
  • 构建成功后,在target目录下生成文件sparkwordcount-1.0-SNAPSHOT.jar,上传到spark服务器的~/jars/目录下;
  • 假设spark服务器的IP地址为192.168.119.163,在spark服务器执行以下命令即可提交任务:
~/spark-2.3.2-bin-hadoop2.7/bin/spark-submit \
--master spark://192.168.119.163:7077 \
--class com.bolingcavalry.sparkwordcount.WordCount \
--executor-memory 512m \
--total-executor-cores 2 \
~/jars/sparkwordcount-1.0-SNAPSHOT.jar \
192.168.119.163 \
8020 \
GoneWiththeWind.txt
  • 上述命令的最后三个参数,是java的main方法的入参,具体的使用请参照WordCount类的源码;
  • 提交成功后立即开始执行任务,看到类似如下信息表示任务完成:
2019-02-08 21:26:04 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-02-08 21:26:04 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-02-08 21:26:04 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-c3e2ea9e-7daf-4cab-a207-26f0a0394017
2019-02-08 21:26:04 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-d60e4d75-4189-4f33-a5e2-fbe9b06bdae7
  • 往前翻滚一下控制台输出的信息,如下所示,可以见到单词统计的前十名已经输出在控制台了:
2019-02-08 21:36:15 INFO  DAGScheduler:54 - Job 1 finished: take at WordCount.java:61, took 0.313008 s
the    18264
and    14150
to    10020
of    8615
a    7571
her    7086
she    6217
was    5912
in    5751
had    4502
2019-02-08 21:36:15 INFO  deprecation:1173 - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
2019-02-08 21:36:15 INFO  FileOutputCommitter:108 - File Output Committer Algorithm version is 1
  • 在hdfs服务器执行查看文件的命令,可见/output下新建了子目录20190208213610:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output
Found 1 items
drwxr-xr-x   - hadoop supergroup          0 2019-02-08 21:36 /output/20190208213610
  • 查看子目录,发现里面有两个文件:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -ls /output/20190208213610
Found 2 items
-rw-r--r--   3 hadoop supergroup          0 2019-02-08 21:36 /output/20190208213610/_SUCCESS
-rw-r--r--   3 hadoop supergroup        108 2019-02-08 21:36 /output/20190208213610/part-00000
  • 上面看到的/output/20190208213610/part-00000就是输出结果,用cat命令查看其内容:
[hadoop@node0 ~]$ ~/hadoop-2.7.7/bin/hdfs dfs -cat /output/20190208213610/part-00000
(18264,the)
(14150,and)
(10020,to)
(8615,of)
(7571,a)
(7086,her)
(6217,she)
(5912,was)
(5751,in)
(4502,had)
  • 可见与前面控制台输出的一致;
  • 在spark的web页面,可见刚刚执行的任务信息:

在这里插入图片描述

  • 至此,第一个spark应用的开发和运行就完成了,接下来的文章中,咱们一起来完成更多的spark实战;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关文章
|
5月前
|
前端开发 Oracle Java
Java中的GUI应用开发技术选型
Java中的GUI应用开发技术选型
|
2月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
37 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
2月前
|
SQL IDE Java
入门Cloud Toolkit:简化你的Java应用开发与部署流程
【10月更文挑战第19天】作为一名长期从事Java开发的程序员,我一直致力于寻找能够简化日常开发工作的工具。在众多工具中,阿里巴巴推出的Cloud Toolkit引起了我的注意。这款免费的插件旨在帮助开发者更轻松地进行开发、测试及部署工作,尤其是在与云服务交互时表现尤为出色。本文将从个人的角度出发,介绍Cloud Toolkit的基本功能及其使用技巧,希望能帮助初学者快速上手这款实用工具。
31 1
|
3月前
|
消息中间件 分布式计算 Java
Linux环境下 java程序提交spark任务到Yarn报错
Linux环境下 java程序提交spark任务到Yarn报错
49 5
|
6月前
|
分布式计算 资源调度 Hadoop
Java大数据处理:Spark与Hadoop整合
Java大数据处理:Spark与Hadoop整合
|
6月前
|
机器学习/深度学习 人工智能 自然语言处理
基于Java的人工智能应用开发
基于Java的人工智能应用开发
|
6月前
|
SQL JSON 分布式计算
|
6月前
|
SQL 分布式计算 Java
|
分布式计算 Java 关系型数据库
云栖社区2019年1月技术活动:PG, Java,Spark等30+场预告【持续更新】
云栖社区月度技术活动预告:技术直播、系列公开课、Meetup、峰会、钉群分享等。欢迎分享给更多开发者。
9315 0
云栖社区2019年1月技术活动:PG, Java,Spark等30+场预告【持续更新】
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
129 2
ClickHouse与大数据生态集成:Spark & Flink 实战