Spark入门:实现WordCount的3种方式

简介:

WordCount作为Spark的入门任务,可以很简单,也可以做到比较复杂。 本文从实现功能的角度提出了3种实现方式,至于性能影响,会在后文继续讨论。

注意: 本文使用的Spark版本还是1.6.1.如果读者您已经切换到2.0+版本,请参考GitHub spark的官方例子进行学习。 因为2.0版本的API与1.X 并不能完全兼容,特别是2.0开始使用了SparkSession的概念,而不是SparkContext!

第一种方式:mapToPair + reduceByKey

这是官方提供的实现方式,应该也是网上能找到的最多的例子。

官网地址: http://spark.apache.org/examples.html

核心代码:

 
  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
  10.  
  11. public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } 
  12.  
  13. }); 
  14.  
  15. JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<IntegerIntegerInteger>() { 
  16.  
  17. public Integer call(Integer a, Integer b) { return a + b; } 
  18.  
  19. }); 
  20.  
  21. counts.saveAsTextFile("hdfs://..."); 

总结上面的步骤:

  1. flatmap : 将一整段文字映射成一个字符串数组
  2. mapToPair: 将word 映射成 (word, 1)
  3. reduceByKey: 按照key进行group and plus的操作, 得到最终结果
  4. collect: 这是Action,上面3个都是Transformation

第二种方式:使用countByValue代替mapToPair + reduceByKey

核心代码:

 
  1. JavaRDD<String> textFile = sc.textFile("hdfs://..."); 
  2.  
  3. JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() { 
  4.  
  5. public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); } 
  6.  
  7. }); 
  8.  
  9. Map<String, Long> counts = words.countByValue(); 

读文件、flatmap这两步都是完全一样的,但是后面直接一个countByValue就搞定了,并且还直接collect到本地了,是不是感觉这一种实现方式更简洁了呢?

至于性能,一般来说这种方式还不错,但是这种方式有一些缺点,参考StackOverFlow的描述:

网址: http://stackoverflow.com/questions/25318153/spark-rdd-aggregate-vs-rdd-reducebykey

countByValue would be the fastest way to do this, however its implementation uses hash maps and merges them so if you have a large amount of data this approach may not scale well (especially when you consider how many issues spark already has with memory). You may want to use the standard way of counting in map reduce which would be to map the line and 1 as pairs then reduceBykey like this:

简单的说,这种方式是使用hash的方式进行merge。 如果处理的数据量比较大的时候,效果可能不怎么好。

注意: 这种方式的性能笔者确实还没有亲自实践过!

第三种方式:AggregateByKey

AggregateByKey 这个方法,可以看做是reduceByKey的增强版,因为reduceByKey的输出类型与输入类型要求是完全一致的。比如wordcount 之中的输入是Tuple2<String, Integer> 输出也同样要求是Tuple2<String,Integer>. 但是AggregateByKey的输出类型可以是不一样的数据类型。 参考下面的代码:

 
  1. val keysWithValuesList = Array("foo=A""foo=A""foo=A""foo=A""foo=B""bar=C""bar=D""bar=D"
  2.  
  3. val data = sc.parallelize(keysWithValuesList) 
  4.  
  5. //Create key value pairs 
  6.  
  7. val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache() 
  8.  
  9. val initialCount = 0; 
  10.  
  11. val addToCounts = (n: Int, v: String) => n + 1 
  12.  
  13. val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2 
  14.  
  15. val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts) 

输出:

 
  1. Aggregate By Key sum Results 
  2.  
  3. bar -> 3 
  4.  
  5. foo -> 5 

可以看到,输入是<String, String> 而输出变成了<String, Integer>

注意: 这种方法,并不是处理WordCount的最好的选择,只是说明我们可以使用AggregateByKey这种方式来实现相同的功能

其实还有另外一种实现方式: 使用DataFrame。 但是这种方式需要前期的准备比较多,即如何将数据处理并喂给DataFrame。

一般来说,DataFrame的效率相比其他的RDD的实现方式要高不少,如果在前期准备工作上面难度不是太大的话,非常推荐使用DataFrame的方式。


本文作者:rangerwolf

来源:51CTO

相关文章
|
5月前
|
SQL 分布式计算 调度
Spark入门(一篇就够了)(三)
Spark入门(一篇就够了)(三)
121 0
|
5月前
|
分布式计算 Java Scala
181 Spark IDEA中编写WordCount程序
181 Spark IDEA中编写WordCount程序
31 0
|
6月前
|
SQL 分布式计算 Java
Spark入门指南:从基础概念到实践应用全解析
在这个数据驱动的时代,信息的处理和分析变得越来越重要。而在众多的大数据处理框架中, Apache Spark 以其独特的优势脱颖而出。
67 0
|
1天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
5月前
|
存储 缓存 分布式计算
Spark入门(一篇就够了)(一)
Spark入门(一篇就够了)(一)
132 0
|
5月前
|
分布式计算 Hadoop 大数据
178 Spark入门
178 Spark入门
30 0
|
2月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
93 1
|
2月前
|
数据采集 分布式计算 Linux
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
111 0
|
4月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
34 0
|
5月前
|
SQL JSON 分布式计算
Spark入门(一篇就够了)(二)
Spark入门(一篇就够了)(二)
67 0