[Hadoop]Hadoop单元测试MRUnit

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/53523652 在MapReduce中,map函数和reduce函数的独立测试是非常方便的,这是由函数风格决定的 。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/53523652

在MapReduce中,map函数和reduce函数的独立测试是非常方便的,这是由函数风格决定的 。MRUnit是一个测试库,它便于将已知的输入传递给mapper或者检查reducer的输出是否符合预期。MRUnit与标准的执行框架(JUnit)一起使用。

1. 设置开发环境

从(https://repository.apache.org/content/repositories/releases/org/apache/mrunit/mrunit/ )下载最新版本的MRUnit jar,例如如果你使用的hadoop版本为1.0.3,则需要下载

mrunit-x.x.x-incubating-hadoop2.jar。同时还需要下载JUnit最新版本jar。

如果使用Maven方式则使用如下方式:

<junit.version>4.12</junit.version>
<mrunit.version>1.1.0</mrunit.version>
<!-- junit -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>${junit.version}</version>
    <scope>test</scope>
</dependency>
<!-- mrunit -->
<dependency>
   <groupId>org.apache.mrunit</groupId>
   <artifactId>mrunit</artifactId>
   <version>${mrunit.version}</version>
   <classifier>hadoop2</classifier>
   <scope>test</scope>
</dependency>

备注

如果你使用的是hadoop 2.x版本,classifier设置为hadoop2

2. MRUnit 测试用例

MRUnit测试框架基于Junit,可以测试hadoop版本为0.20,0.23.x,1.0.x,2.x的map reduce程序。

下面是一个使用MRUnit对统计一年最高气温的Map Reduce程序进行单元测试。

测试数据如下:

0096007026999992016062218244+00000+000000FM-15+702699999V0209999C000019999999N999999999+03401+01801999999ADDMA1101731999999REMMET069MOBOB0 METAR 7026 //008 000000 221824Z AUTO 00000KT //// 34/18 A3004=

这只有一天的数据,气温是340,Mapper输出为该天气温340

下面是相应的Mapper和Reducer:

MaxTemperatureMapper:

package com.sjf.open.maxTemperature;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.google.common.base.Objects;
/**
 * Created by xiaosi on 16-7-27.
 */
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 9999;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        // 年份
        String year = line.substring(15, 19);
        // 温度
        int airTemperature;
        if(Objects.equal(line.charAt(87),"+")){
            airTemperature = Integer.parseInt(line.substring(88,92));
        }
        else{
            airTemperature = Integer.parseInt(line.substring(87,92));
        }
        // 空气质量
        String quality = line.substring(92, 93);
        if(!Objects.equal(airTemperature, MISSING) && quality.matches("[01459]")){
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

MaxTemperatureReducer:

package com.sjf.open.maxTemperature;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * Created by xiaosi on 16-7-27.
 */
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 一年最高气温
        int maxValue = Integer.MIN_VALUE;
        for(IntWritable value : values){
            maxValue = Math.max(maxValue, value.get());
        }//for
        // 输出
        context.write(key, new IntWritable(maxValue));
    }
}

下面是MRUnit测试类:

package com.sjf.open.maxTemperature;
import com.google.common.collect.Lists;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
/**
 * Created by xiaosi on 16-12-8.
 */
public class MaxTemperatureTest {
    private MapDriver mapDriver;
    private ReduceDriver reduceDriver;
    private MapReduceDriver mapReduceDriver;
    @Before
    public void setUp(){
        MaxTemperatureMapper mapper = new MaxTemperatureMapper();
        mapDriver = MapDriver.newMapDriver(mapper);
        MaxTemperatureReducer reducer = new MaxTemperatureReducer();
        reduceDriver = ReduceDriver.newReduceDriver();
        reduceDriver.withReducer(reducer);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
    }
    @Test
    public void testMapper() throws IOException {
        Text text = new Text("0096007026999992016062218244+00000+000000FM-15+702699999V0209999C000019999999N999999999+03401+01801999999ADDMA1101731999999REMMET069MOBOB0 METAR 7026 //008 000000 221824Z AUTO 00000KT //// 34/18 A3004=");
        mapDriver.withInput(new LongWritable(), text);
        mapDriver.withOutput(new Text("2016"), new IntWritable(340));
        mapDriver.runTest();
        // 输出
        List<Pair> expectedOutputList = mapDriver.getExpectedOutputs();
        for(Pair pair : expectedOutputList){
            System.out.println(pair.getFirst() + " --- " + pair.getSecond()); // 2016 --- 340
        }
    }
    @Test
    public void testReducer() throws IOException {
        List<IntWritable> IntWritableList = Lists.newArrayList();
        IntWritableList.add(new IntWritable(340));
        IntWritableList.add(new IntWritable(240));
        IntWritableList.add(new IntWritable(320));
        IntWritableList.add(new IntWritable(330));
        IntWritableList.add(new IntWritable(310));
        reduceDriver.withInput(new Text("2016"), IntWritableList);
        reduceDriver.withOutput(new Text("2016"), new IntWritable(340));
        reduceDriver.runTest();
        // 输出
        List<Pair> expectedOutputList = reduceDriver.getExpectedOutputs();
        for(Pair pair : expectedOutputList){
            System.out.println(pair.getFirst() + " --- " + pair.getSecond());
        }
    }
    @Test
    public void testMapperAndReducer() throws IOException {
        Text text = new Text("0089010010999992014010114004+70933-008667FM-12+000999999V0201201N006019999999N999999999+00121-00361100681ADDMA1999990100561MD1810171+9990REMSYN04801001 46/// /1206 10012 21036 30056 40068 58017=");
        mapReduceDriver.withInput(new LongWritable(), text);
        mapReduceDriver.withOutput(new Text("2014"), new IntWritable(12));
        mapReduceDriver.runTest();
        // 输出
        List<Pair> expectedOutputList = mapReduceDriver.getExpectedOutputs();
        for(Pair pair : expectedOutputList){
            System.out.println(pair.getFirst() + " --- " + pair.getSecond()); // 2014 --- 12
        }
    }
}

如果测试的是Mapper,使用MRUnit的MapDiver,如果测试Reducer,使用ReduceDriver,如果测试整个MapReduce程序,则需要使用MapReduceDriver。在调用runTest()方法之前,需要配置mapper(或者Reducer),输入值,期望的输出key,期望的输出值等。如果与期望的输出值不匹配,MRUnit测试失败。根据withOutput()被调用的次数,MapDiver(ReduceDriver,MapReduceDriver)能来检查0,1,或者多个输出记录。

备注

注意 MapDriver,ReduceDriver,MapReduceDriver 引入的jar包版本:

import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;

而不是:

import org.apache.hadoop.mrunit.MapDriver;
import org.apache.hadoop.mrunit.MapReduceDriver;
import org.apache.hadoop.mrunit.ReduceDriver;

这分别对应Hadoop新老版本API,第一类对应新版本的Mapper和Reducer:

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

第二类对应老版本的Mapper和Reducer:

import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;

参考:https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial

数据来源:ftp://ftp.ncdc.noaa.gov/pub/data/noaa


目录
相关文章
|
6月前
|
分布式计算 Hadoop 测试技术
Hadoop【环境搭建 05】【hadoop-3.1.3 单机版基准测试 TestDFSIO + mrbench + nnbench + Terasort + sort 举例】
【4月更文挑战第1天】Hadoop【环境搭建 05】【hadoop-3.1.3 单机版基准测试 TestDFSIO + mrbench + nnbench + Terasort + sort 举例】
179 3
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-13-Hive 启动Hive 修改启动参数命令行启动测试 几句简单的HQL了解Hive
Hadoop-13-Hive 启动Hive 修改启动参数命令行启动测试 几句简单的HQL了解Hive
59 2
|
1月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
61 1
|
1月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
71 4
|
1月前
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
46 4
|
1月前
|
SQL
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(二)
37 2
|
1月前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
42 1
|
1月前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
42 1
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
6月前
|
分布式计算 Hadoop 测试技术
下一篇
无影云桌面