1 梳理单词计数的执行流程
上面的是单个文件的执行流程,有一些现象看起来还是不明显 下面我们来看一个两个文件的执行流程
2 实战WordCount
前面我们通过理论层面详细分析了单词计数的执行流程,下面我们就来实际上手操作一下。
大致流程如下:
第一步:开发Map阶段代码
第二步:开发Reduce阶段代码
第三步:组装Job
在idea中创建WordCountJob类
添加注释,梳理一下需求:
需求:读取hdfs上的hello.txt文件,计算文件中每个单词出现的总次数
hello.txt文件内容如下:
hello you
hello me
最终需要的结果形式如下:
hello 2
me 1
you 1
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountJob { /** * 创建自定义mapper类 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { /** * 需要实现map函数 * 这个map函数就是可以接收k1,v1, 产生k2,v2 * * * @param k1 * @param v1 * @param context * * @throws IOException * @throws InterruptedException * */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); for (String word : words) { // 迭代切割出来的单词数据 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); // 把<k2,v2>写出去 context.write(k2,v2); context.write(k2, v2); } } } /** * 创建自定义reducer类 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { /** * 针对<k2,{v2……}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 * * @param k2 * * @param v2s * * @param context * * @throws IOException * * @throws InterruptedException * */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable v2 : v2s) { sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable(sum); context.write(k3, v3); } } public static void main(String[] args) { try { if (args.length != 2) { // 如果传递的参数不够,程序直接退出 System.exit(100); } // job需要的配置参数 Configuration conf = new Configuration(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJob.class); // 指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); // 指定reduce相关的代码 job.setReducerClass(MyReducer.class); // 指定k3的类型 job.setOutputKeyClass(Text.class); // 指定v3的类型 job.setOutputValueClass(LongWritable.class); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
现在代码开发完毕了,现在我们是把自定义的mapper类和reducer类都放到了这个WordCountJob类中,主要是为了在学习阶段看起来清晰一些,所有的代码都在一个类中,好找,其实我们完全可以把自定义的mapper类和reducer类单独提出去,定义为单独的类,是没有什么区别的。
ok,那代码开发好了以后想要执行,我们需要打jar包上传到集群上去执行,这个时候需要在pom文件中添加maven的编译打包插件。
<build> <!-- compiler插件, 设定JDK版本 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> <showWarnings>true</showWarnings> </configuration> </plugin> <!--打包插件--> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> //在运行的时候可以动态指定 </manifest> </archive> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
注意了,这些添加完以后还有一个地方需要修改,需要在pom中的hadoop-client和log4j依赖中增加scope属性,值为provided,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为hadoop-client和log4j依赖在集群中都是有的,所以在打jar包的时候就不需要打进去了,如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided属性了,不增加provided就可以把对应的第三方依赖打进jar包里面了。
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.10</version> <scope>provided</scope> </dependency> </dependencies>
添加好了以后就可以打包了,建议在windows的cmd命令行下cd到项目根目录,然后执行mvn编译打包命令,看到最后输出的BUILD SUCCESS就说明执行成功了
命令执行成功之后,就可以到target目录下获取对应的jar包了,需要使用jar-with-dependencies结尾的那个jar包。
D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar
把这个jar包上传到集群的任意一台机器上面或者是hadoop客户端机器上都可以,只要这台机器可以和集群进行交互即可。
注意,这个jar包不能使用java -jar的方式执行,需要使用集群特有的执行方式
我把这个jar包上传到了bigdata01机器的/usr/local/hadoop-3.2.0目录下了,
在向集群中正式提交任务jar包之前需要先把测试数据准备好
在本地创建一个hello.txt文件,内容是
[root@bigdata01 hadoop-3.2.0]# vi hello.txt hello you hello me
单词中间用空格隔开,因为我们在MapReduce代码中是使用空格进行切割单词的。
然后把hello.txt上传到hdfs的test目录下
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -mkdir /test [root@bigdata01 hadoop-3.2.0]# hdfs dfs -put hello.txt /test [root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /test Found 1 items -rw-r--r-- 2 root supergroup 19 2020-04-22 11:16 /test/hello.txt复制代码
接下来就可以向集群提交MapReduce任务了
具体的命令是这样的
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJob /test/hello.txt /out
yarn jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJob /test/hello.txt /out
hadoop:表示使用hadoop脚本提交任务,其实在这里使用yarn脚本也是可以的,从hadoop2开始支持使用yarn,不过也兼容hadoop1,也继续支持使用hadoop脚本,所以在这里使用哪个都可以,具体就看你个人的喜好了,我是习惯于使用hadoop脚本
jar:表示执行jar包
db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:指定具体的jar包路径信息
com.cjt.mr.WordCountJob:指定要执行的mapreduce代码的全路径
/test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径,这里的输入路径可以直接指定hello.txt的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。
/out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据
任务提交到集群上面之后,可以在shell窗口中看到如下日志信息,最终map执行到100%,reduce执行到100%,说明任务执行成功了
2020-04-22 15:12:59,887 INFO mapreduce.Job: map 0% reduce 0% 2020-04-22 15:13:08,050 INFO mapreduce.Job: map 100% reduce 0% 2020-04-22 15:13:16,261 INFO mapreduce.Job: map 100% reduce 100%
3 web界面中查看任务执行情况
访问 http://bigdata01:8088
那我们来查看一下任务输出的结果,
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /out Found 2 items -rw-r--r-- 2 root supergroup 0 2020-04-22 15:13 /out/_SUCCESS -rw-r--r-- 2 root supergroup 19 2020-04-22 15:13 /out/part-r-00000
还要一点需要注意的,part后面的r表示这个结果文件是reduce步骤产生的,如果一个mapreduce只有map阶段没有reduce阶段,那么产生的结果文件是part-m-00000这样的。