1 MapReduce之任务日志查看
如果想要查看mapreduce任务执行过程产生的日志信息怎么办呢?
是不是在提交任务的时候直接在这个控制台上就能看到了?先不要着急,我们先在代码中增加一些日志信息,在实际工作中做调试的时候这个也是很有必要的
在自定义mapper类的map函数中增加一个输出,将k1,v1的值打印出来
@Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // k1代表的是每一行的行首偏移量,v1代表的是每一行内容 // 对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); for (String word : words) { // 迭代切割出来的单词数据 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); System.out.println("k2:"+word+"...v2:1"); // 把<k2,v2>写出去; context.write(k2, v2); } }
在自定义reducer类中的reduce方法中增加一个输出,将k2,v2和k3,v3的值打印出来
@Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long sum = 0L; for (LongWritable v2 : v2s) { System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable(sum); System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); context.write(k3, v3); }
重新在windows机器上打jar包,并把新的jar包上传到bigdata01机器的/usr/local/hadoop-3.2.0目录中 重新向集群提交任务,注意,针对输出目录,要么换一个新的不存在的目录,要么把之前的out目录删掉
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJob /test/hello.txt /out
等待任务执行结束,我们发现在控制台上是看不到任务中的日志信息的,为什么呢?因为我们在这相当于是通过一个客户端把任务提交到集群里面去执行了,所以日志是存在在集群里面的。想要查看需要需要到一个特殊的地方查看这些日志信息
先进入到yarn的web界面,访问8088端口,点击对应任务的history链接
http://bigdata01:8088/
注意了,在这里我们发现这个链接是打不来的,
这里有两个原因,第一个原因是没有windows的hosts文件中没有配置bigdata02和bigdata03这两个主机名和ip的映射关系,先去把这两个主机名配置到hosts文件里面,之前的bigdata01已经配置进去了。
192.168.182.100 bigdata01 192.168.182.101 bigdata02 192.168.182.102 bigdata03
第二个原因就是这里必须要启动historyserver进程才可以,并且还要开启日志聚合功能,才能在web界面上直接查看任务对应的日志信息,因为默认情况下任务的日志是散落在nodemanager节点上的,想要查看需要找到对应的nodemanager节点上去查看,这样就很不方便,通过日志聚合功能我们可以把之前本来散落在nodemanager节点上的日志统一收集到hdfs上的指定目录中,这样就可以在yarn的web界面中直接查看了
那我们就来开启日志聚合功能。开启日志聚合功能需要修改yarn-site.xml的配置,增加 yarn.log-aggregation-enable和yarn.log.server.url这两个参数
<property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <property> <name>yarn.log.server.url</name> <value>http://bigdata01:19888/jobhistory/logs/</value> </property>
注意:修改这个配置想要生效需要重启集群。
[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh [root@bigdata01 hadoop-3.2.0]# cd etc/hadoop/ [root@bigdata01 hadoop]# vi yarn-site.xml
启动historyserver进程,需要在集群的所有节点上都启动这个进程
[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@bigdata01 hadoop-3.2.0]# jps 4232 SecondaryNameNode 5192 JobHistoryServer 4473 ResourceManager 3966 NameNode 5231 Jps [root@bigdata02 hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@bigdata02 hadoop-3.2.0]# jps 2904 Jps 2523 NodeManager 2844 JobHistoryServer 2415 DataNode [root@bigdata03 hadoop-3.2.0]# bin/mapred --daemon start historyserver [root@bigdata03 hadoop-3.2.0]# jps 3138 JobHistoryServer 2678 NodeManager 2570 DataNode 3198 Jps
重新再提交mapreduce任务
此时再进入yarn的8088界面,点击任务对应的history链接就可以打开了。
此时,点击对应map和reduce后面的链接就可以点进去查看日志信息了,点击map后面的数字1,可以进入如下界面
点击这个界面中的logs文字链接,可以查看详细的日志信息。
最终可以在界面中看到很多日志信息,我们刚才使用sout输出的日志信息需要到Log Type: stdout这里来查看,在这里可以看到,k1和v1的值
Log Type: stdout Log Upload Time: Fri Apr 24 15:33:58 +0800 2020 Log Length: 103 <k1,v1>=<0,hello you> <k1,v1>=<10,hello me>
想要查看reduce输出的日志信息需要到reduce里面查看,操作流程是一样的,可以看到k2,v2和k3,v3的值
咱们刚才的输出是使用syout输出的,这个其实是不正规的,标准的日志写法是需要使用logger进行输出的
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { Logger logger = LoggerFactory.getLogger(MyMapper.class);/** * 需要实现map函数 * 这个map函数就是可以接收k1,v1, 产生k2,v2 * * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */@Overrideprotected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {// k1代表的是每一行的行首偏移量,v1代表的是每一行内容// 对获取到的每一行数据进行切割,把单词切割出来String[] words = v1.toString().split(" "); logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); // System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");for (String word : words) {// 迭代切割出来的单词数据Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); logger.info("k2:"+word+"...v2:1"); // System.out.println("k2:"+word+"...v2:1");// 把<k2,v2>写出去 context.write(k2,v2);context.write(k2, v2); } } }/** * 创建自定义reducer类 */public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { Logger logger = LoggerFactory.getLogger(MyReducer.class);/** * 针对<k2,{v2……}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 * * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */@Overrideprotected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {long sum = 0L;for (LongWritable v2 : v2s) { logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");// System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");sum += v2.get(); } Text k3 = k2; LongWritable v3 = new LongWritable(sum); logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");context.write(k3, v3); } }
重新编译打包上传,重新提交最新的jar包,这个时候再查看日志就需要到Log Type: syslog中查看日志了。
这是工作中比较常用的查看日志的方式,但是还有一种使用命令查看的方式,这种方式面试的时候一般喜欢问
[root@bigdata01 hadoop-3.2.0]# yarn logs -applicationId application_158771356
注意:后面指定的是任务id,任务id可以到yarn的web界面上查看。
执行这个命令可以看到很多的日志信息,我们通过grep筛选一下日志
[root@bigdata01 hadoop-3.2.0]# yarn logs -applicationId application_158771356 | grep k1,v1 <k1,v1>=<0,hello you> <k1,v1>=<10,hello me>
2 停止Hadoop集群中的任务
如果一个mapreduce任务处理的数据量比较大的话,这个任务会执行很长时间,可能几十分钟或者几个小时都有可能,假设一个场景,任务执行了一半了我们发现我们的代码写的有问题,需要修改代码重新提交执行,这个时候之前的任务就没有必要再执行了,没有任何意义了,最终的结果肯定是错误的,所以我们就想把它停掉,要不然会额外浪费集群的资源,如何停止呢?
我在提交任务的窗口中按ctrl+c是不是就可以停止?
注意了,不是这样的,我们前面说过,这个任务是提交到集群执行的,你在提交任务的窗口中执行ctrl+c对已经提交到集群中的任务是没有任何影响的。
我们可以验证一下,执行ctrl+c之后你再到yarn的8088界面查看,会发现任务依然存在。
所以需要使用hadoop集群的命令去停止正在运行的任务
使用yarn application -kill命令,后面指定任务id即可
[root@bigdata01 hadoop-3.2.0]# yarn application -kill application_15877135678
3 MapReduce程序扩展
咱们前面说过MapReduce任务是由map阶段和reduce阶段组成的
但是我们也说过,reduce阶段不是必须的,那也就意味着MapReduce程序可以只包含map阶段。
什么场景下会只需要map阶段呢?
当数据只需要进行普通的过滤、解析等操作,不需要进行聚合,这个时候就不需要使用reduce阶段了,
在代码层面该如何设置呢?
很简单,在组装Job的时候设置reduce的task数目为0就可以了。并且Reduce代码也不需要写了。
public static void main(String[] args) { try { if(args.length!=2){ // 如果传递的参数不够,程序直接退出 System.exit(100); } // job需要的配置参数 Configuration conf = new Configuration(); // 创建一个job Job job = Job.getInstance(conf); // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类 job.setJarByClass(WordCountJobNoReduce.class); // 指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,new Path(args[0])); // 指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,new Path(args[1])); // 指定map相关的代码 job.setMapperClass(MyMapper.class); // 指定k2的类型 job.setMapOutputKeyClass(Text.class); // 指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //禁用reduce阶段 job.setNumReduceTasks(0); // 提交job job.waitForCompletion(true); }catch (Exception e){ e.printStackTrace(); } } }
重新编译,打包,上传到bigdata01机器上
然后将最新的任务提交到集群上面,注意修改入口类全类名
这里发现map执行到100%以后任务就执行成功了,reduce还是0%,因为就没有reduce阶段了。
查看输出结果,注意,这里的文件名就是part-m-00000了
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out5/part-m-00000 hello 1 you 1 hello 1 me 1