利用MRUnit进行MapReduce单元测试

简介:

一、MRUnit简介

    官网地址:https://mrunit.apache.org/

    Apache MRUnit ™ is a Java library that helps developers unit test Apache Hadoop map reduce jobs.

    MRUnit是一个帮助开发者测试map reduce 作业的单元测试库。

二、代码示例

    以maven项目为例,演示如何使用MRUnit进行MR单元测试。

    关于示例的讲解,请参考:https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial

    项目pom.xml文件,重点关注mrunit,mockito-all, junit三个类库的引入,MRUnit是利用mockito+junit针对MR程序进行模拟测试。

MR单元测试类

package mrunit;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import mrunit.SMSCDRMapper.CDRCounter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

/**
 * 测试数据说明 CDRID;CDRType;Phone1;Phone2;SMS Status Code
 * 655209;1;796764372490213;804422938115889;6
 * 353415;0;356857119806206;287572231184798;4
 * 835699;1;252280313968413;889717902341635;0
 * 
 */
public class SMSCDRMapperReducerTest {
	Configuration conf = new Configuration();
	MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
	ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
	MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

	@Before
	public void setUp() {

		//测试mapreduce
		SMSCDRMapper mapper = new SMSCDRMapper();
		SMSCDRReducer reducer = new SMSCDRReducer();
		mapDriver = MapDriver.newMapDriver(mapper);
		reduceDriver = ReduceDriver.newReduceDriver(reducer);
		mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
		
		//测试配置参数
		mapDriver.setConfiguration(conf);
		conf.set("myParameter1", "20");
		conf.set("myParameter2", "23");
		
	}

	@Test
	public void testMapper() throws IOException {
		mapDriver.withInput(new LongWritable(), new Text(
				"655209;1;796764372490213;804422938115889;6"));
		mapDriver.withOutput(new Text("6"), new IntWritable(1));
		mapDriver.runTest();
	}

	@Test
	public void testReducer() throws IOException {
		List<IntWritable> values = new ArrayList<IntWritable>();
		values.add(new IntWritable(1));
		values.add(new IntWritable(1));
		reduceDriver.withInput(new Text("6"), values);
		reduceDriver.withOutput(new Text("6"), new IntWritable(2));
		reduceDriver.runTest();
	}
	
	@Test
	public void testMapperReducer() throws IOException {
		mapReduceDriver.withInput(new LongWritable(), new Text(
				"655209;1;796764372490213;804422938115889;6"));
		mapReduceDriver.withOutput(new Text("6"), new IntWritable(1));
	}

	@Test
	public void testMapperCount() throws IOException {
		mapDriver.withInput(new LongWritable(), new Text(
				"655209;0;796764372490213;804422938115889;6"));
		// mapDriver.withOutput(new Text("6"), new IntWritable(1));
		mapDriver.runTest();
		assertEquals("Expected 1 counter increment", 1, mapDriver.getCounters()
				.findCounter(CDRCounter.NonSMSCDR).getValue());
	}
}

Mapper类

package mrunit;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	 
	  private Text status = new Text();
	  private final static IntWritable addOne = new IntWritable(1);
	  
	  static enum CDRCounter {
			NonSMSCDR;
		};
	 
	  /**
	   * Returns the SMS status code and its count
	   */
	  protected void map(LongWritable key, Text value, Context context)
	      throws java.io.IOException, InterruptedException {
	 
	    //655209;1;796764372490213;804422938115889;6 is the Sample record format
	    String[] line = value.toString().split(";");
	    // If record is of SMS CDR
	    if (Integer.parseInt(line[1]) == 1) {
	      status.set(line[4]);
	      context.write(status, addOne);
	    }else{
	    	// CDR record is not of type SMS so increment the counter
			context.getCounter(CDRCounter.NonSMSCDR).increment(1);
		}
	  }
	}

Reducer类

package mrunit;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SMSCDRReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
 
  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
    int sum = 0;
    for (IntWritable value : values) {
      sum += value.get();
    }
    context.write(key, new IntWritable(sum));
  }
}

项目的pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.cdh</groupId>
	<artifactId>cdh-test</artifactId>
	<version>SNAPSHOT-1.0.0</version>
	<packaging>jar</packaging>

	<name>cdh-test</name>
	<url>http://maven.apache.org</url>

	<properties>
		<hadoop.version>2.0.0-mr1-cdh4.4.0</hadoop.version>
		<hbase.version>0.94.6-cdh4.4.0</hbase.version>
		<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
		<maven.compiler.encoding>utf-8</maven.compiler.encoding>
	</properties>

	<build>
		<pluginManagement>
			<plugins>
				<plugin>
					<groupId>org.apache.maven.plugins</groupId>
					<artifactId>maven-compiler-plugin</artifactId>
					<version>3.1</version>
					<configuration>
						<encoding>utf-8</encoding>
						<source>1.6</source>
						<target>1.6</target>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>

		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>2.1</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-eclipse-plugin</artifactId>
				<version>2.9</version>
				<configuration>
					<buildOutputDirectory>eclipse-classes</buildOutputDirectory>
					<downloadSources>true</downloadSources>
					<downloadJavadocs>false</downloadJavadocs>
				</configuration>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.6</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>${hadoop.version}</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<artifactId>mockito-all</artifactId>
					<groupId>org.mockito</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>2.0.0-cdh4.4.0</version>
			<exclusions>
				<exclusion>
					<artifactId>
						jersey-test-framework-grizzly2
					</artifactId>
					<groupId>
						com.sun.jersey.jersey-test-framework
					</groupId>
				</exclusion>
				<exclusion>
					<artifactId>netty</artifactId>
					<groupId>org.jboss.netty</groupId>
				</exclusion>
			</exclusions>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase</artifactId>
			<version>${hbase.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>com.hadoop.gplcompression</groupId>
			<artifactId>hadoop-lzo-cdh4</artifactId>
			<version>0.4.15-gplextras</version>
		</dependency>

		<dependency>
			<groupId>org.hsqldb</groupId>
			<artifactId>hsqldb</artifactId>
			<version>2.2.9</version>
		</dependency>

		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.5.1</version>
		</dependency>
		
		<!-- junit test -->
		<dependency>
			<groupId>org.apache.mrunit</groupId>
			<artifactId>mrunit</artifactId>
			<version>1.1.0</version>
			<classifier>hadoop2</classifier>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.mockito</groupId>
			<artifactId>mockito-all</artifactId>
			<version>1.9.5</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<repositories>
		<repository>
			<id>cloudera</id>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
</project>
相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
分布式计算 Java 测试技术
|
分布式计算 Hadoop 测试技术
|
分布式计算 Hadoop 测试技术
|
1月前
|
分布式计算 Hadoop
Hadoop系列 mapreduce 原理分析
Hadoop系列 mapreduce 原理分析
46 1
|
1月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
112 0
|
9天前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
15 1
|
17天前
|
数据采集 SQL 分布式计算
|
27天前
|
分布式计算 Hadoop Java
Hadoop MapReduce 调优参数
对于 Hadoop v3.1.3,针对三台4核4G服务器的MapReduce调优参数包括:`mapreduce.reduce.shuffle.parallelcopies`设为10以加速Shuffle,`mapreduce.reduce.shuffle.input.buffer.percent`和`mapreduce.reduce.shuffle.merge.percent`分别设为0.8以减少磁盘IO。
|
9天前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
25 0
|
10天前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
11 0