一、pom
<properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.12.10</scala.version> <spark.version>3.0.0</spark.version> <hadoop.version>3.2.1</hadoop.version> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.1</version> </dependency> </dependencies> <build> <pLuginManagement> <plugins> <!--编译scala的插件-- -> <pLugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <!--编译java的插件--> <pLugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pLuginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
二、Spark3.0-JavaAPI程序
实现Spark读取HDFS中的文本文件,实现单词计数,并2将结果输出到HDFS中。
2.1 java匿名实现类
// 1.创建配置 SparkConf conf = SparkConf().setAppName("JavaWordCount"); // 2.封装了SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); // 3.jsc 创建 java RDD JavaRDD<String> lines = jsc.textFile(args[0]); // 4.切分压平 JavaRDD<String> words = lines.flatMap(new FlatMapFuntion<String,String>(){ @Override public Iterator<String> call(String line) throws Expection{ return Arrays.asList(line.split(" ")).iterator; } }); // 5.单词和1组合 JavaPairRDD<String,Integer> wordAndOne = words.maoToPair(new PairFunction<String,String,Integer>(){ @Override public Tuple2<String,Integer> call(String words) throws Exception{ return Tuple2.apply(word,1); } }); // 6.分组聚合 JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer,Integer,Integer>(){ @Override public Integer call(Integer v1,Integer v2) throws Exception{ return v1+v2; } }); // 7.交换kv顺序 JavaPairRDD<Integer,String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer,String>(){ @Override public Tuple2<Integer,String> call(Tuple2<String,Integer> tp) throws Exception{ return tp.swap(); } }); // 8.排序 JavaPairRDD<Integer,String> sorted = swapped.sortedByKey(false); // 9.交换kv顺序 JavaPairRDD<String,Integer> result = sorted.mapToPair(new PairFunction<Tuple2<String,Integer>,String,Integer>(){ @Override public Tuple2<String,Integer> call(Tuple2<Integer,String> tp) throws Exception{ return tp.swap(); } }); // 10.触发action保存到HDFS result.saveAsTextFile(args[1]); // 11.释放资源 jsc.stop();
2.2 Lambda表达式实现
// 1.创建配置 SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount"); // 2.封装了SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); // 3.jsc 创建 java RDD JavaRDD<String> lines = jsc.textFile(args[0]); // 4.切分压平 JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator()); // 5.单词 1 JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1)); // 6.聚合 JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j); // 7.排序 JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false) .mapToPair(tp -> tp.swap()); // 8.保存hdfs sorted.saveAsTextFile(args[1]); // 9.释放资源 jsc.stop();
2.3 程序打包
2.4 上传到Linux
2.5 启动HDFS
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
2.6 Spark执行jar包
bin/spark-3.0.0-bin-hadoop3.2/bin/spark-submit --master spark://hadoop1:7077,hadoop2:7077,hadoop3:7077 --executor-memory 1g --total-executor-cores 5 --class com.wang.spark.LambdaJavaWordCount /root/spark-in-active-1.0-SNAPSHOT.jar hdfs://hadoop1:9000/wc hdfs://hadoop1:9000/out
2.7 查看结果
hdfs -dfs -cat /out/*
三、本机执行
本地测试,不会建立集群链接,再本地的一个进程运行。
// 1.创建配置【本地测试】 SparkConf conf = SparkConf().setAppName("LambdaJavaWordCount").setMaster("local[*]"); // 2.封装了SparkContext JavaSparkContext jsc = new JavaSparkContext(conf); // 3.jsc 创建 java RDD JavaRDD<String> lines = jsc.textFile(args[0]); // 4.切分压平 JavaRDD<String> words = lines.flatMap(line -> Arrays.stream(line.split(" ")).iterator()); // 5.单词 1 JavaPairRDD<String,Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w,1)); // 6.聚合 JavaPairRDD<String,Integer> reduced = wordAndOne.reduceByKey((i,j) -> i+j); // 7.排序 JavaPairRDD<String,Integer> sorted = reduced.mapToPair(tp -> tp.swap()).sortByKey(false) .mapToPair(tp -> tp.swap()); // 8.保存hdfs sorted.saveAsTextFile(args[1]); // 9.释放资源 jsc.stop();
运行时候,传入参数本地数据或者hdfs的数据。
如果出现这个错误,需要将pom中的scala的****放开
或者全部读取本机的文件。
执行结果如下: