如何在CDH5上运行Spark应用

简介:

这篇文章参考 How-to: Run a Simple Apache Spark App in CDH 5 编写而成,没有完全参照原文翻译,而是重新进行了整理,例如:spark 版本改为 1.3.0,添加了 Python 版的程序。

创建 maven 工程

使用下面命令创建一个普通的 maven 工程:

$ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

在 sparkwordcount 目录下添加 scala 源文件目录和相应的包目录:

$ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount

修改 pom.xml 添加 scala 和 spark 依赖:

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.10.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>1.3.0</version>
    </dependency>
  </dependencies>

添加编译 scala 的插件:

 <plugin>
  <groupId>org.scala-tools</groupId>
      <artifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
</plugin>

添加 scala 编译插件需要的仓库:

<pluginRepositories>
  <pluginRepository>
    <id>scala-tools.org</id>
    <name>Scala-tools Maven2 Repository</name>
    <url>http://scala-tools.org/repo-releases</url>
  </pluginRepository>
</pluginRepositories>

另外,添加 cdh hadoop 的仓库:

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
    <repository>
      <id>maven-hadoop</id>
      <name>Hadoop Releases</name>
      <url>https://repository.cloudera.com/content/repositories/releases/</url>
    </repository>
    <repository>
      <id>cloudera-repos</id>
      <name>Cloudera Repos</name>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>

运行下面命令检查工程是否能够成功编译:

mvn package

编写示例代码

 WordCount 为例,该程序需要完成以下逻辑:

  • 读一个输入文件
  • 统计每个单词出现次数
  • 过滤少于一定次数的单词
  • 对剩下的单词统计每个字母出现次数

在 MapReduce 中,上面的逻辑需要两个 MapReduce 任务,而在 Spark 中,只需要一个简单的任务,并且代码量会少 90%。

编写Scala 程序如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
 
object SparkWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
    val threshold = args(1).toInt
 
    // split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))
 
    // count the occurrence of each word
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
 
    // filter out words with less than threshold occurrences
    val filtered = wordCounts.filter(_._2 &gt;= threshold)
 
    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
 
    System.out.println(charCounts.collect().mkString(", "))
  }
}

Spark 使用懒执行的策略,意味着只有当动作执行的时候,转换才会运行。上面例子中的动作操作是 collect  saveAsTextFile,前者是将数据推送给客户端,后者是将数据保存到 HDFS。

作为对比,Java 版的程序如下:

import java.util.ArrayList;
import java.util.Arrays;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
 
public class JavaWordCount {
  public static void main(String[] args) {
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
    final int threshold = Integer.parseInt(args[1]);
 
    // split each document into words
    JavaRDD tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction() {
        public Iterable call(String s) {
          return Arrays.asList(s.split(" "));
        }
      }
    );
 
    // count the occurrence of each word
    JavaPairRDD counts = tokenized.mapToPair(
      new PairFunction() {
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
        }
      }
    ).reduceByKey(
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );
 
    // filter out words with less than threshold occurrences
    JavaPairRDD filtered = counts.filter(
      new Function, Boolean>() {
        public Boolean call(Tuple2 tup) {
          return tup._2 >= threshold;
        }
      }
    );
 
    // count characters
    JavaPairRDD charCounts = filtered.flatMap(
      new FlatMapFunction, Character>() {
        public Iterable call(Tuple2 s) {
          ArrayList chars = new ArrayList(s._1.length());
          for (char c : s._1.toCharArray()) {
            chars.add(c);
          }
          return chars;
        }
      }
    ).mapToPair(
      new PairFunction() {
        public Tuple2 call(Character c) {
          return new Tuple2(c, 1);
        }
      }
    ).reduceByKey(
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );
 
    System.out.println(charCounts.collect());
  }
}

另外,Python 版的程序如下:

import sys

from pyspark import SparkContext

file="inputfile.txt"
count=2

if __name__ == "__main__":
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(file, 1)
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1))  \
                  .reduceByKey(lambda a, b: a + b)  \
                  .filter(lambda (a, b) : b >= count)  \
                  .flatMap(lambda (a, b): list(a))  \
                  .map(lambda x: (x, 1))  \
                  .reduceByKey(lambda a, b: a + b)

    print ",".join(str(t) for t in counts.collect())
    sc.stop()

编译

运行下面命令生成 jar:

$ mvn package

运行成功之后,会在 target 目录生成 sparkwordcount-0.0.1-SNAPSHOT.jar 文件。

运行

首先,将测试文件 inputfile.txt 上传到 HDFS 上;

$ wget https://github.com/sryza/simplesparkapp/blob/master/data/inputfile.txt
$ hadoop fs -put inputfile.txt

其次,将 sparkwordcount-0.0.1-SNAPSHOT.jar 上传到集群中的一个节点;然后,使用 spark-submit 脚本运行 Scala 版的程序:

$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

或者,运行 Java 版本的程序:

$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

对于 Python 版的程序,运行脚本为:

$ spark-submit  --master local PythonWordCount.py

如果,你的集群部署的是 standalone 模式,则你可以替换 master 参数的值为 spark://<master host>:<master port>,也可以以 Yarn 的模式运行。

最后的 Python 版的程序运行输出结果如下:

(u'a', 4),(u'c', 1),(u'b', 1),(u'e', 6),(u'f', 1),(u'i', 1),(u'h', 1),(u'l', 1),(u'o', 2),(u'n', 4),(u'p', 2),(u'r', 2),(u'u', 1),(u't', 2),(u'v', 1)

完整的代码在:https://github.com/sryza/simplesparkapp

目录
相关文章
|
3月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
353 1
|
8月前
|
机器学习/深度学习 SQL 分布式计算
Spark核心原理与应用场景解析:面试经验与必备知识点解析
本文深入探讨Spark核心原理(RDD、DAG、内存计算、容错机制)和生态系统(Spark SQL、MLlib、Streaming),并分析其在大规模数据处理、机器学习及实时流处理中的应用。通过代码示例展示DataFrame操作,帮助读者准备面试,同时强调结合个人经验、行业趋势和技术发展以展现全面的技术实力。
723 0
|
8月前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
273 0
|
3月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
49 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
5月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
5月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
88 3
|
6月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
6月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
171 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
5月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
87 0
|
8月前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56615 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用