大数据MapReduce常用操作

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据MapReduce常用操作

1 MapReduce之任务日志查看

如果想要查看mapreduce任务执行过程产生的日志信息怎么办呢?

是不是在提交任务的时候直接在这个控制台上就能看到了?先不要着急,我们先在代码中增加一些日志信息,在实际工作中做调试的时候这个也是很有必要的

在自定义mapper类的map函数中增加一个输出,将k1,v1的值打印出来

  @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // k1代表的是每一行的行首偏移量,v1代表的是每一行内容
            // 对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            for (String word : words) {
                // 迭代切割出来的单词数据
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                System.out.println("k2:"+word+"...v2:1");
                // 把<k2,v2>写出去;
                context.write(k2, v2);
            }
        }

在自定义reducer类中的reduce方法中增加一个输出,将k2,v2和k3,v3的值打印出来

        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
            }
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            context.write(k3, v3);
        }

重新在windows机器上打jar包,并把新的jar包上传到bigdata01机器的/usr/local/hadoop-3.2.0目录中 重新向集群提交任务,注意,针对输出目录,要么换一个新的不存在的目录,要么把之前的out目录删掉

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJob /test/hello.txt /out

等待任务执行结束,我们发现在控制台上是看不到任务中的日志信息的,为什么呢?因为我们在这相当于是通过一个客户端把任务提交到集群里面去执行了,所以日志是存在在集群里面的。想要查看需要需要到一个特殊的地方查看这些日志信息

先进入到yarn的web界面,访问8088端口,点击对应任务的history链接

http://bigdata01:8088/



20210712172029211.png

注意了,在这里我们发现这个链接是打不来的,

这里有两个原因,第一个原因是没有windows的hosts文件中没有配置bigdata02和bigdata03这两个主机名和ip的映射关系,先去把这两个主机名配置到hosts文件里面,之前的bigdata01已经配置进去了。

192.168.182.100 bigdata01 
192.168.182.101 bigdata02 
192.168.182.102 bigdata03

第二个原因就是这里必须要启动historyserver进程才可以,并且还要开启日志聚合功能,才能在web界面上直接查看任务对应的日志信息,因为默认情况下任务的日志是散落在nodemanager节点上的,想要查看需要找到对应的nodemanager节点上去查看,这样就很不方便,通过日志聚合功能我们可以把之前本来散落在nodemanager节点上的日志统一收集到hdfs上的指定目录中,这样就可以在yarn的web界面中直接查看了


那我们就来开启日志聚合功能。开启日志聚合功能需要修改yarn-site.xml的配置,增加 yarn.log-aggregation-enable和yarn.log.server.url这两个参数

<property> 
<name>yarn.log-aggregation-enable</name> <value>true</value> 
</property> 
<property>
<name>yarn.log.server.url</name> <value>http://bigdata01:19888/jobhistory/logs/</value> </property>

注意:修改这个配置想要生效需要重启集群。

[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh 
[root@bigdata01 hadoop-3.2.0]# cd etc/hadoop/ 
[root@bigdata01 hadoop]# vi yarn-site.xml

启动historyserver进程,需要在集群的所有节点上都启动这个进程

[root@bigdata01 hadoop-3.2.0]# bin/mapred --daemon start historyserver 
[root@bigdata01 hadoop-3.2.0]# jps 
4232 SecondaryNameNode 
5192 JobHistoryServer 
4473 ResourceManager 
3966 NameNode 
5231 Jps 
[root@bigdata02 hadoop-3.2.0]# bin/mapred --daemon start historyserver 
[root@bigdata02 hadoop-3.2.0]# jps 
2904 Jps 
2523 NodeManager 
2844 JobHistoryServer 
2415 DataNode 
[root@bigdata03 hadoop-3.2.0]# bin/mapred --daemon start historyserver 
[root@bigdata03 hadoop-3.2.0]# jps 
3138 JobHistoryServer
2678 NodeManager 
2570 DataNode 
3198 Jps

重新再提交mapreduce任务

此时再进入yarn的8088界面,点击任务对应的history链接就可以打开了。

20210712172218696.png

此时,点击对应map和reduce后面的链接就可以点进去查看日志信息了,点击map后面的数字1,可以进入如下界面

20210712172255693.png


点击这个界面中的logs文字链接,可以查看详细的日志信息。

20210712172316169.png

最终可以在界面中看到很多日志信息,我们刚才使用sout输出的日志信息需要到Log Type: stdout这里来查看,在这里可以看到,k1和v1的值

Log Type: stdout Log 
Upload Time: Fri Apr 24 15:33:58 +0800 2020 
Log Length: 103 
<k1,v1>=<0,hello you>
<k1,v1>=<10,hello me>

想要查看reduce输出的日志信息需要到reduce里面查看,操作流程是一样的,可以看到k2,v2和k3,v3的值

咱们刚才的输出是使用syout输出的,这个其实是不正规的,标准的日志写法是需要使用logger进行输出的

public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        Logger logger = LoggerFactory.getLogger(MyMapper.class);/**
         * 需要实现map函数
         * 这个map函数就是可以接收k1,v1, 产生k2,v2
         *
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */@Overrideprotected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {// k1代表的是每一行的行首偏移量,v1代表的是每一行内容// 对获取到的每一行数据进行切割,把单词切割出来String[] words = v1.toString().split(" ");
            logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");           // System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");for (String word : words) {// 迭代切割出来的单词数据Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                logger.info("k2:"+word+"...v2:1");               // System.out.println("k2:"+word+"...v2:1");// 把<k2,v2>写出去 context.write(k2,v2);context.write(k2, v2);
            }
        }
    }/**
     * 创建自定义reducer类
     */public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        Logger logger = LoggerFactory.getLogger(MyReducer.class);/**
         * 针对<k2,{v2……}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去
         *
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */@Overrideprotected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {long sum = 0L;for (LongWritable v2 : v2s) {
                logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");// System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");sum += v2.get();
            }
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");           // System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");context.write(k3, v3);
        }
    }

重新编译打包上传,重新提交最新的jar包,这个时候再查看日志就需要到Log Type: syslog中查看日志了。


这是工作中比较常用的查看日志的方式,但是还有一种使用命令查看的方式,这种方式面试的时候一般喜欢问

[root@bigdata01 hadoop-3.2.0]# yarn logs -applicationId application_158771356

注意:后面指定的是任务id,任务id可以到yarn的web界面上查看。

执行这个命令可以看到很多的日志信息,我们通过grep筛选一下日志

[root@bigdata01 hadoop-3.2.0]# yarn logs -applicationId application_158771356 | grep k1,v1
<k1,v1>=<0,hello you> 
<k1,v1>=<10,hello me>

2 停止Hadoop集群中的任务

如果一个mapreduce任务处理的数据量比较大的话,这个任务会执行很长时间,可能几十分钟或者几个小时都有可能,假设一个场景,任务执行了一半了我们发现我们的代码写的有问题,需要修改代码重新提交执行,这个时候之前的任务就没有必要再执行了,没有任何意义了,最终的结果肯定是错误的,所以我们就想把它停掉,要不然会额外浪费集群的资源,如何停止呢?

我在提交任务的窗口中按ctrl+c是不是就可以停止?

注意了,不是这样的,我们前面说过,这个任务是提交到集群执行的,你在提交任务的窗口中执行ctrl+c对已经提交到集群中的任务是没有任何影响的。

我们可以验证一下,执行ctrl+c之后你再到yarn的8088界面查看,会发现任务依然存在。

所以需要使用hadoop集群的命令去停止正在运行的任务

使用yarn application -kill命令,后面指定任务id即可

[root@bigdata01 hadoop-3.2.0]# yarn application -kill application_15877135678

3 MapReduce程序扩展

咱们前面说过MapReduce任务是由map阶段和reduce阶段组成的

但是我们也说过,reduce阶段不是必须的,那也就意味着MapReduce程序可以只包含map阶段。

什么场景下会只需要map阶段呢?

当数据只需要进行普通的过滤、解析等操作,不需要进行聚合,这个时候就不需要使用reduce阶段了,

在代码层面该如何设置呢?

很简单,在组装Job的时候设置reduce的task数目为0就可以了。并且Reduce代码也不需要写了。

public static void main(String[] args) { 
  try { if(args.length!=2){ 
    // 如果传递的参数不够,程序直接退出 
       System.exit(100); 
      }
    // job需要的配置参数 
    Configuration conf = new Configuration(); 
    // 创建一个job 
    Job job = Job.getInstance(conf); 
    // 注意:这一行必须设置,否则在集群中执行的是找不到WordCountJob这个类    
    job.setJarByClass(WordCountJobNoReduce.class); 
    // 指定输入路径(可以是文件,也可以是目录) 
    FileInputFormat.setInputPaths(job,new Path(args[0])); 
    // 指定输出路径(只能指定一个不存在的目录) 
    FileOutputFormat.setOutputPath(job,new Path(args[1])); 
    // 指定map相关的代码 
    job.setMapperClass(MyMapper.class);
    // 指定k2的类型 
    job.setMapOutputKeyClass(Text.class); 
    // 指定v2的类型 
    job.setMapOutputValueClass(LongWritable.class); 
    //禁用reduce阶段 
    job.setNumReduceTasks(0); 
    // 提交job job.waitForCompletion(true); 
    }catch (Exception e){   
      e.printStackTrace(); 
    } 
  }
}

重新编译,打包,上传到bigdata01机器上

然后将最新的任务提交到集群上面,注意修改入口类全类名

这里发现map执行到100%以后任务就执行成功了,reduce还是0%,因为就没有reduce阶段了。

查看输出结果,注意,这里的文件名就是part-m-00000了

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -cat /out5/part-m-00000 
hello 1 
you 1 
hello 1
me 1
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
83 2
|
4月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
83 1
|
4月前
|
分布式计算 大数据 Hadoop
MapReduce:大数据处理的基石
【8月更文挑战第31天】
170 0
|
4月前
|
机器学习/深度学习 分布式计算 算法
MaxCompute 的 MapReduce 与机器学习
【8月更文第31天】随着大数据时代的到来,如何有效地处理和分析海量数据成为了一个重要的课题。MapReduce 是一种编程模型,用于处理和生成大型数据集,其核心思想是将计算任务分解为可以并行处理的小任务。阿里云的 MaxCompute 是一个面向离线数据仓库的计算服务,提供了 MapReduce 接口来处理大规模数据集。本文将探讨如何利用 MaxCompute 的 MapReduce 功能来执行复杂的计算任务,特别是应用于机器学习场景。
108 0
|
4月前
|
存储 分布式计算 算法
"揭秘!MapReduce如何玩转压缩文件,让大数据处理秒变‘瘦身达人’,效率飙升,存储不再是烦恼!"
【8月更文挑战第17天】MapReduce作为Hadoop的核心组件,在处理大规模数据集时展现出卓越效能。通过压缩技术减少I/O操作和网络传输的数据量,不仅提升数据处理速度,还节省存储空间。支持Gzip等多种压缩算法,可根据需求选择。示例代码展示了如何配置Map输出压缩,并使用GzipCodec进行压缩。尽管压缩带来CPU负担,但在多数情况下收益大于成本,特别是Hadoop能够自动处理压缩文件,简化开发流程。
77 0
|
6月前
|
分布式计算 自然语言处理 大数据
【大数据】MapReduce JAVA API编程实践及适用场景介绍
【大数据】MapReduce JAVA API编程实践及适用场景介绍
173 0
|
6月前
|
存储 缓存 分布式计算
【大数据】计算引擎MapReduce
【大数据】计算引擎MapReduce
142 0
|
7月前
|
分布式计算 数据可视化 Hadoop
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
大数据实战——基于Hadoop的Mapreduce编程实践案例的设计与实现
|
1月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
347 7
|
1月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
52 2

热门文章

最新文章