【Spark】一个例子带你了解Spark运算流程

简介: 【Spark】一个例子带你了解Spark运算流程

一、前期准备

1. 运行环境

对于Spark来说,WordCount程序同样是经典的Hello World案例。Spark本身的部署十分简单,因为是基于内存进行计算的,所以只要简单配置一下运行环境、端口、要分配的资源大小以及工作节点即可,如需搭建可以参考:Spark 3.x各模式部署 - Ubuntu

如果只是快速的测试程序可以不需要搭建Spark环境,只需要在项目中构建需要的依赖,以本地模式运行即可。在这种模式下,可以将本地文件作为input,output也可以直接输出到控制台。

2. 项目新建

  • 首先在IDEA中新建一个Maven项目:

  • 修改pom.xml,添加Spark相关的依赖:
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.3.2</version>
        </dependency>
</dependencies>

3. 输入数据准备

在项目下新建一个input文件夹,再新建一个data.txt文件,输入一些纯文本的单词作为样例数据:

二、从WordCount开始

程序的目标是计算出文本文件中每个单词各出现了多少次,目前先使用比较单一和简单的空格分隔符。

1. 完整程序实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class WordCount {
    public static void main(String[] args) {
        // 创建SparkConf对象,配置Spark运行参数,声明本地运行
        SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]");
        // 创建JavaSparkContext对象,是Spark应用的入口
        JavaSparkContext context = new JavaSparkContext(sparkConf);
        // 读取文件内容到JavaRDD,一个较为通用的分布式集合类型
        JavaRDD<String> lines = context.textFile("input/data.txt");
        System.err.println(lines.collect());
        // 对每一行文本进行拆分,生成一个新的单词RDD
        JavaRDD<String> words = lines.flatMap(
                (FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
        System.err.println(words.collect());
        // 将每个单词转换为(key, value)格式,生成新的JavaPairRDD
        JavaPairRDD<String, Integer> wordOne = words.mapToPair(
                (PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
        System.err.println(wordOne.collect());
        // 使用reduceByKey操作,计算每个单词的出现次数
        JavaPairRDD<String, Integer> wordCounts = wordOne.reduceByKey(
                (Function2<Integer, Integer, Integer>) Integer::sum);
        System.err.println(wordCounts.collect());
        // 使用collect操作,获取RDD中的所有元素
        List<Tuple2<String, Integer>> output = wordCounts.collect();
        // 在控制台打印出每个单词的出现次数
        for (Tuple2<?, ?> tuple : output) {
            System.err.println(tuple._1() + ": " + tuple._2());
        }
        // 关闭SparkContext,释放资源
        context.stop();
    }
}

2. 程序执行流程

  • 从文件中读取数据

使用textFile方法时会逐行读取数据,构建出集合。

// 读取文件内容到JavaRDD,一个较为通用的分布式集合类型
        JavaRDD<String> lines = context.textFile("input/data.txt");
        System.err.println(lines.collect());

运行结果:【what day is today, today is a good day, good good study, day day up】

  • 将文本拆分为单词

使用FlatMapFunction定义对每个集合元素的处理规则,泛型部分代表了输入和输出类型,处理完成后flatMap会将产生的列表连接到一起,形成一个新的列表,即不会出现嵌套结构。

flatMap前:【(what, day, is, today), (today, is, a, good, day)】。

flatMap后:【what, day, is, today, today, is, a, good, day】

// 对每一行文本进行拆分,生成一个新的单词RDD
        JavaRDD<String> words = lines.flatMap(
                (FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
        System.err.println(words.collect());

运行结果:【what, day, is, today, today, is, a, good, day, good, good, study, day, day, up】

  • 对出现的单词标记

对得到的每个单词如何处理取决于我们的需求,由于现在要进行单词计数,所以按照MapReduce的思想先构建出Map结构,然后在Reduce阶段来实现计算逻辑。

在Java中需要使用JavaPairRDD,元素的结构都是键值对,因此额外提供了reduceByKey等方法,首先通过mapToPair进行一个类型转换,因此输入类型为String,输出类型为String,Integer。

// 将每个单词转换为(key, value)格式,生成新的JavaPairRDD
        JavaPairRDD<String, Integer> wordOne = words.mapToPair(
                (PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
        System.err.println(wordOne.collect());

运行结果:【(what,1), (day,1), (is,1), (today,1), (today,1), (is,1), (a,1), (good,1), (day,1), (good,1), (good,1), (study,1), (day,1), (day,1), (up,1)】

  • 执行按词累加计算

现在我们需要传入两个参数执行一个相加的逻辑Integer::sum,等价于(a,b) -> a + b,因此使用Function2,代表传入两个参数返回一个结果。

// 使用reduceByKey操作,计算每个单词的出现次数
        JavaPairRDD<String, Integer> wordCounts = wordOne.reduceByKey(
                (Function2<Integer, Integer, Integer>) Integer::sum);
        System.err.println(wordCounts.collect());

运行结果:【(is,2), (day,4), (what,1), (up,1), (a,1), (today,2), (good,3), (study,1)】

3. 计算机制介绍

Spark的运算机制非常值得深入学习,这里只借助简单例子稍微的扩展一下。Spark的所有基于RDD的方法调用都可以看作一个个算子【小编习惯性的程序】,因为Spark是基于Scala开发的,当我们使用Scala语言进行程序开发时更能深刻到这一点。也就是我们基本上从一个集合开始,用一连串的方法调用就可以得到最终想要的结果,这也与Spark的延迟计算机制有关。

  • 转换算子:Transformation

在Spark中,转换算子用于从一个数据集创建一个新的数据集。例如,map、filter和reduceByKey等操作都是转换算子。转换算子的结果是一个新的RDD,它通常是通过对输入RDD应用某种函数得到的。需要注意的是,转换操作是惰性的(lazy),也就是说,它们并不会立即计算结果,而是在行动操作调用时才真正执行。

  • 行动算子:Action

行动算子是那些触发实际计算的操作。例如,count、collect、first、take等操作都是行动算子。当一个行动操作被调用时,Spark就会执行计算,并返回一个具体的值。

  • 延迟计算:Lazy Evaluation

Spark使用延迟计算模型,也就是说,当转换操作被调用时,它们并不会立即执行,而是记录下这些操作。只有当一个行动操作被调用时,这些转换操作才会真正执行。这使得Spark可以优化整个计算过程,例如,通过合并多个转换操作,减少数据的读写等。

  • 分布式计算:Distributed Evaluation

Spark通过分布式计算实现高效的大数据处理。数据被分割成多个分区(partition),每个分区可以在集群中的一个节点上单独处理。通过这种方式,Spark可以在多个节点上并行处理大量数据。另外,Spark还提供了弹性调度和容错机制,使其能够在节点失败时继续运行,并根据负载情况动态调整资源使用。

目录
相关文章
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
51 0
|
8月前
|
分布式计算 监控 Java
Spark学习---day06、Spark内核(源码提交流程、任务执行)
Spark学习---day06、Spark内核(源码提交流程、任务执行)
115 2
|
8月前
|
机器学习/深度学习 分布式计算 算法
Spark MLlib简介与机器学习流程
Spark MLlib简介与机器学习流程
|
8月前
|
存储 分布式计算 监控
Spark作业的调度与执行流程
Spark作业的调度与执行流程
|
8月前
|
存储 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark RDD设计、运行原理、运行流程、容错机制讲解(图文解释)
【大数据技术Hadoop+Spark】Spark RDD设计、运行原理、运行流程、容错机制讲解(图文解释)
500 0
|
SQL JSON 分布式计算
Spark SQL底层执行流程详解(一)
Spark SQL底层执行流程详解
1091 0
Spark SQL底层执行流程详解(一)
|
分布式计算 监控 Java
Spark学习---7、Spark内核(源码提交流程、任务执行、Shuffle、内存管理)(一)
Spark学习---7、Spark内核(源码提交流程、任务执行、Shuffle、内存管理)(一)
|
SQL 分布式计算 资源调度
Spark on Yarn Job的执行流程简介
2017-12-19-Hadoop2.0架构及HA集群配置(1) 2017-12-24-Hadoop2.0架构及HA集群配置(2) 2017-12-25-Spark集群搭建 2017-12-29-Hadoop和Spark的异同 2017-12-28-Spark-HelloWorld(Spark开发环境搭建)
|
分布式计算 大数据 调度
Spark 原理_运行过程_总结和流程 | 学习笔记
快速学习 Spark 原理_运行过程_总结和流程
117 0
Spark 原理_运行过程_总结和流程 | 学习笔记
|
分布式计算 资源调度 监控
【Spark】(六)Spark 运行流程
【Spark】(六)Spark 运行流程
621 0
【Spark】(六)Spark 运行流程