Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

简介:

  生成的结果,作为输入源。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter;

import java.net.URI;

import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**

* @function 统计无效数据和对输出结果进行压缩
* @author 小讲

*/
public class CompressAndCounter extends Configured implements Tool 
{
// 定义枚举对象
public static enum LOG_PROCESSOR_COUNTER
{
BAD_RECORDS
};
/**

* @function Mapper 解析数据,统计无效数据,并输出有效数据
*
*/
public static class CompressAndCounterMap extends Mapper<LongWritable, Text, Text, Text>
{
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException 
{
// 解析每条机顶盒记录,返回list集合
List<String> list = ParseTVData.transData(value.toString()); //调用ParseTVData.java下的transData方法
int length = list.size();
// 无效记录
if (length == 0) 
{
// 动态自定义计数器
context.getCounter("ErrorRecordCounter", "ERROR_Record_TVData").increment(1);
// 枚举声明计数器
context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS).increment(1);
} else
{
for (String validateRecord : list) 
{
//输出解析数据
context.write(new Text(validateRecord), new Text(""));
}
}

}
}
/**
* @function 任务驱动方法

*/
@Override
public int run(String[] args) throws Exception 
{
// TODO Auto-generated method stub
//读取配置文件
Configuration conf = new Configuration();
//文件系统接口
URI uri = new URI("hdfs://HadoopMaster:9000");
//输出路径
Path mypath = new Path(args[1]);
// 创建FileSystem对象
FileSystem hdfs = FileSystem.get(uri, conf);
if (hdfs.isDirectory(mypath)) 
{
//删除已经存在的文件路径
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "CompressAndCounter");//新建一个任务
job.setJarByClass(CompressAndCounter.class);//设置主类

job.setMapperClass(CompressAndCounterMap.class);//只有 Mapper
job.setOutputKeyClass(Text.class);//输出 key 类型
job.setOutputValueClass(Text.class);//输出 value 类型

FileInputFormat.addInputPath(job, new Path(args[0]));//输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径


FileOutputFormat.setCompressOutput(job, true);//对输出结果设置压缩
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//设置压缩类型

job.waitForCompletion(true);//提交任务
return 0;
}
/**
* @function main 方法
* @param args 输入 输出路径
* @throws Exception
*/
public static void main(String[] args) throws Exception
{
String[] date = {"20120917","20120918","20120919","20120920","20120921","20120922","20120923"};
int ec = 1;
for(String dt:date)
{
String[] args0 = { "hdfs://HadoopMaster:9000/middle/tv/"+dt+".txt",
"hdfs://HadoopMaster:9000/junior/tvCompressResult/"+dt };

// String[] args0 = { "./data/compressAndCounter/"+dt+".txt",
// "hdfs://HadoopMaster:9000/junior/tvCompressResult/"+dt };

ec = ToolRunner.run(new Configuration(), new CompressAndCounter(), args0);
}
System.exit(ec);
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter;

import java.util.ArrayList;


import java.util.List;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

/**

* @function 解析数据


*/
public class ParseTVData
{
/**
* @function 使用 Jsoup 工具,解析输入数据,
* @param text
* @return list
*/
public static List<String> transData(String text) 
{
List<String> list = new ArrayList<String>();
Document doc;
String rec = "";
try 
{
doc = Jsoup.parse(text);// jsoup解析数据
Elements content = doc.getElementsByTag("WIC");
String num = content.get(0).attr("cardNum");// 记录编号
if (num == null || num.equals("")) 
{
num = " ";
}

String stbNum = content.get(0).attr("stbNum");// 机顶盒号
if (stbNum.equals("")) 
{
return list;
}

String date = content.get(0).attr("date");// 日期

Elements els = doc.getElementsByTag("A");
if (els.isEmpty()) 
{
return list;
}

for (Element el : els)
{
String e = el.attr("e");// 结束时间

String s = el.attr("s");// 开始时间

String sn = el.attr("sn");// 频道名称

rec = stbNum + "@" + date + "@" + sn + "@" + s + "@" + e;
list.add(rec);
}
} catch (Exception e) 
{
System.out.println(e.getMessage());
return list;
}
return list;
}
}

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6171823.html,如需转载请自行联系原作者

相关文章
|
3月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
29 0
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
60 0
|
1月前
|
API 开发工具 开发者
抖音商品详情API入门:为开发者和商家打造增长工具箱
抖音商品详情API入门:为开发者和商家打造增长工具箱
52 0
|
13天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
19 2
|
2月前
|
前端开发 JavaScript API
前端秘法番外篇----学完Web API,前端才能算真正的入门
前端秘法番外篇----学完Web API,前端才能算真正的入门
|
6月前
|
JSON JavaScript 前端开发
前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS进阶(四)完结撒花✿✿ヽ(°▽°)ノ✿
前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS进阶(四)完结撒花✿✿ヽ(°▽°)ノ✿
535 0
|
6月前
|
JavaScript 前端开发 API
前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS进阶(三)
前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS进阶(三)
516 1
|
6月前
|
JavaScript 前端开发 API
前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS进阶(二)
前端JavaScript入门到精通,javascript核心进阶ES6语法、API、js高级等基础知识和实战 —— JS进阶(二)
470 0
|
3月前
|
JSON 安全 数据挖掘
从入门到精通:淘宝API接口调用全攻略
概述: 在当今电子商务的繁荣发展下,淘宝作为中国领先的电商平台,不仅为消费者提供了便捷的购物环境,也为商家们提供了强大的数据支持和服务能力。淘宝开放平台提供的API接口使得商家能够高效地获取店铺和商品的实时数据,从而更好地分析市场趋势、优化店铺运营、提升用户体验。本文将详细介绍如何从入门到精通地调用淘宝API接口,使商家能够充分利用这一强大工具推动业务增长。
|
3月前
|
JavaScript 前端开发 IDE
Vue3【为什么选择Vue框架、Vue简介 、Vue API 风格 、Vue开发前的准备 、Vue项目目录结构 、模板语法、属性绑定 、 】(一)-全面详解(学习总结---从入门到深化)
Vue3【为什么选择Vue框架、Vue简介 、Vue API 风格 、Vue开发前的准备 、Vue项目目录结构 、模板语法、属性绑定 、 】(一)-全面详解(学习总结---从入门到深化)
50 1

热门文章

最新文章