转自:http://pieux.github.io/blog/2013-05-08-learn-hadoop-the-definitive-guide.html
1 前言
Hadoop的内部工作机制: 分布式系统理论, 实际工程和常识于一体的系统.
但是,Hadoop提供的用于构建分布式系统的工具–数据存储, 数据分析,和协调处理–都非常简单.
本书的结构: 1章介绍历史, 2章介绍MapReduce,3章剖析Hadoop文件系统, 特别是HDFS,4章包含Hadoop的基本I/O操作:数据完整性,压缩,序列化及基于文件的数据结构.
接下来的5~8章深入剖析MapReduce.
之后的省略说明.
本书的阅读方式和时间安排
大致看了下目录, 第2章的入门必须看, 第3章HDFS看, 第4章I/O看, 今天大致看100多页, 然后, 准备在Ubuntu上安装Hadoop假集群. 尝试开发一个简单的应用.
毕业设计的计划步骤
- 入门Hadoop
- 搭建集群
- 设计应用程序
- 逐步完成
以上每一步都需记录.
毕业设计论文, 请教完格式后使用LaTex书写.
2 关于MapReduce
MapReduce是一种可用于数据处理的编程模型.
MaoReduce程序本质上是 并行 的.
优势, 处理大规模数据集. 举例:
2.1 一个气象数据集
半结构化数据, 且按照记录方式存储.
2.1.1 数据的格式
整个数据集是由大量的小容量文件组成.
通常情况下, 处理少量的大型文件显得更容易且有效, 因此, 这些数据集需要经过预处理.
2.2 使用Unix工具进行数据分析
先不使用Hadoop, 因为只有提供性能基准和结果检查工具, 才能和hadoop进行有效对比.
传统处理按行存储数据的工具是awk.
#! /usr/bin/env bash for year in all/* do echo -ne `basename $year .gz`"\t" gunzip -c $year | \ awk'{ temp =substr($0, 88, 5) + 0; q = substr($0, 93, 1); if( temp!=9999 && q~/[01459]/ && temp>max) max = temp} END { print max }' done
2.3 使用Hadoop分析数据
为了充分发挥Hadoop提供的并行处理优势, 我们需要将查询表示成MapReduce作业. 经过一些本地的小规模测试, 我们将能够在集群设备上运行Hadoop.
2.3.1 map阶段和reduce阶段
每个阶段都以key/value对作为输入和输出, 类型由程序员选择.
程序员需要定义两个函数: map函数和reduce函数.
对于map阶段, 输入的是原始的NCDC(国家气候数据中心)数据, 键: 该行起始位置相对于文件起始位置的偏移量(因为数据实际上是一行, 没有分隔符的).
设计map函数: 我们只对年份和气温这两个属性感兴趣, 只取出这两个. 在本例中, map函数只是一个数据准备阶段, 使reduce函数能在准备数据上继续处理.
map函数还是一个比较适合去除已损记录的地方(筛选掉缺失的, 可疑的).
map函数的输出发送到reduce函数, 处理过程是根据键对键/值对进行排序和分组.
整个数据流如图, 并使用Unix的pipe来模拟:
略
2.3.2 Java MapReduce
通过代码来实现.
我们需要3样东西: 一个map函数, 一个reduce函数和一些用来运行作业的代码.
map函数由Mapper接口实现来表示, 后者声明了一个map()方法.
import java.io.IOException; // 是hadoop针对流处理优化的类型 import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; // 会继承这个基类 import org.apache.hadoop.mapred.MapReduceBase; // 会实现这个接口 import org.apache.hadoop.mapred.Mapper; // 处理后数据由它来收集 import org.apache.hadoop.mapred.OoutputCollector; import org.apache.hadoop.mapred.Reporter; // 虽然还没有开始系统学习java语法, 我猜测, extends是继承基类, // implements 是实现接口, java把它们语法上分开了 public class MaxTemperatureMapper extends MapReduceBase // Mapper是一个泛型接口 implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if( line.charAt(87) == '+') {// parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if(airTemperature != MISSING && quality.matches("01459]")) { output.collect(new Text(year), new IntWritable(airtemperature)); } } }
Mapper是一个泛型接口:
Mapper<LongWritable, Text, Text, IntWritable>
它有4个形参类型, 分别是map函数的输入键, 输入值, 输出键和输出值的类型.
就目前来说, 输入键是长整数偏移量, 输入值是一行文本, 输出键是年份, 输出值是气温(整数).
Hadoop提供了一套可优化网络序列化传输的基本类型, 不直接使用java内嵌的类型. 在这里, LongWritable
相当于 Long
, IntWritable
相当于 Int
, Text
相当于 String
.
map()
方法的输入是一个键和一个值.
map()
还提供了 OutputCollector
实例用于输出内容的写入.
reduce函数通过Reducer进行类似的定义.
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, Intwritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }
reduce函数的输入键值对必须与map函数的输出键值对匹配.
reduce函数的输出键值对类型为Text/IntWritable, 分别表示年份和最高气温.
第三部分的代码为负责运行MapReduce的作业.
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; public class MaxTemperature { public static void main(String[] args) throws IOException { if(args.length !=2 ) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperatuer.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatuerMapper.class); conf.setReducerClass(MaxTemperatuerReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } }
JobConf
对象制定了作业的执行规范. 构造函数的参数为作业所在的类, Hadoop会通过该类来查找包含给类的JAR文件.
构造 JobConf
对象后, 制定输入和输出数据的路径. 这里是通过 FileInputFormat
的静态方法 addInputPath()
来定义输入数据的路径, 路径可以是单个文件, 也可以是目录(即目录下的所有文件)或符合特定模式的一组文件. 可以多次调用(从名称可以看出, addInputPath()
).
同理, FileOutputFormat.setOutputPath()
指定输出路径. 即写入目录. 运行作业前, 写入目录不应该存在, Hadoop会拒绝并报错. 这样设计, 主要是防止数据被覆盖, 数据丢失. 毕竟Hadoop运行的时间是很长的, 丢失了非常恼人.
FileOutputFormat.setOutputPath()
和 conf.setMapperClass()
指定map和reduce类型.
接着, setOutputKeyClass
和 setOutputValueClass
指定map和reduce函数的 输出 类型, 这两个函数的输出类型往往相同. 如果不同, map的输出函数类型通过 setMapOutputKeyClass
和 setMapOutputValueClass
指定.
输入的类型用 InputFormat
设置, 本例中没有指定, 使用的是默认的 TextInputFormat
(文本输入格式);
最后, JobClient.runJob()
会提交作业并等待完成, 将结果写到控制台.
2.3.3 运行测试
略过, 本节讲了下如何使用小型的数据集运行来排错. 并且讲解了如何看控制台输出.
2.3.4 新增的java MapReduce API
Hadoop 0.20.0之后新增了一个Java MapReduce API, 称为”上下文对象”(context object), 旨在使API更容易扩展.
新API在类型上不兼容旧的.
区别:
- 新API倾向于使用基类而不是接口, 因为更容易扩展.
- 新API放在
org.apache.hadoop.mapreduce
包中, 旧的在org.apache.hadoop.mapred
中. - 新API充分使用context object, 使用户代码能与MapReduce系统进行通信. ex, MapContext基本具备了JobConf, OutputCollector和Reporter的功能.
- 新API支持”推”(push)和”拉”(pull)式的迭代. 这两类API, 均可以K/V pair把记录推给mapper, 亦可以从map()方法中pull.pull的好处是, 可以实现数据的批量处理, 而非逐条记录的处理.
- 新API实现了配置的统一. 不在通过JobConf对象(Hadoop配置的对象的一个扩展)配置, 而是通过Configuration配置.
- 新API中作业由Job类控制, 而非JobClient类, 它被删除了.
- 输出文件的命名方式稍有不同. map为part-m-nnnnn, reduce为part-r-nnnnn(nnnnn为分块序列号, 整数, 从0开始).
我上官网看键Hadoop在2011年已经发布了1.0版. 所以, 新API的写法暂时就不在笔记上写了. 执行逻辑上没有区别.
2.4 横向扩展
前面介绍的少量数据, 我们现在开始鸟瞰整个系统, 以及有大量数据时的处理.
之前的是本地文件系统中, 为了实现 横向扩展 (scaling out), 数据需要存储在分布式文件系统中, 一般为HDFS.
2.4.1 数据流
首先是一些术语:
- MapReduce作业(job): 输入数据, 程序, 配置. Hadoop将作业分为若干个小task来执行, 分为两类, map和reduce任务.
- 两类节点控制着作业的执行过程: jobtracker(一个)和tasktracker(一系列). 前者调度后者, 后者返回结果给前者.
- 输入分片 (input split), 输入数据, 等长的小数据块. 简称分片. 一个分片一个map task.
- 负载均衡. 但是, split足够多, 可能会增加管理split的时间和构建map task的时间. 合理的是HDFS的一个块的大小(默认64MB).
- 数据本地化优化 (data locality optimization).即在HDFS的节点上运行map task, 性能最优. 节省了网络传输资源.
- map任务写入本地硬盘, 而不是HDFS. 为何? map的输出是中间结果, 完成后可被删除. 如果map失败, 将重起一个map task, 再次构建.
- reduce任务不具备数据本地化的优势. 因为它的输入是多个map的输出. 需要网络.
- reduce输出放在HDFS中, 可靠存储.即, 第一个副本在本地节点, 其他副本在其他机架节点上. 输出需要占网络带宽.
图:一个Reduce任务的MapReduce数据流
reduce的任务数量不是由输入数据的大小决定, 而是指定的.
如果有多个reduce任务, 每个map会对其输出 分区 (partition), 即为每个reduce任务创建一个分区.
分区由用户自定义的分区函数控制, 但是一般使用默认的分区器(partitioner)通过哈希函数来分区, 很高效.
图:多个Reduce任务的MapReduce数据流
这里map任务和reduce任务之间的数据流称为 混洗 (shuffle). 一般比此图要更复杂, 并且调整混洗参数对作业执行总时间有非常大的影响.
当然, 也会出现没有reduce任务的场景.
2.4.2 combiner
combiner, 合并函数. 优化方案, 对map任务的输出进行合并, 以减少map和reduce任务间的数据传输(占用带宽资源).
并非所有函数都可以合并, 需要具有某个属性(被称为”分布式的”函数). 比如算平均数, 需要K/V pair的个数.
使用combiner, 是需要慎重考虑的.
2.4.3 指定一个合并函数
下面的代码是旧的API的实现方法, 本书中都是使用旧的API实现的.
public class MaxTemperatureWithCombiner { public static void main(String[] args) throws IOException { if(args.length !=2 ) { System.err.println("Usage: MaxTemperatureWithCombiner <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperatuerWithCombiner.class); conf.setJobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatuerMapper.class); conf.setCombinerClass(MaxTemperatuerCombiner.class); conf.setReducerClass(MaxTemperatuerReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); }
2.4.4 运行分布式的MapReduce作业
无需修改, 直接运行, 这是Hadoop的优势.
2.5 Hadoop的Streaming
使用的Unix标准流, 作为应用程序接口.
在文本模式下, 输入数据以标准输入流传递给map函数, 按行传输, 最终结果写到标准输出, K/V pair是一个制表符分割的行.
reduce接收制表符分割的行, 即K/V pair, 并通过标准输入流进行传输. 并且该输入已由hadoop根据键排过序. 最终结果写到标准输出.
下面学习Ruby版本, Python版本, 我不会Python.
2.5.1 Ruby版本
map函数:
#! /usr/bin/env ruby STDIN.each_line do |line| var = line year, temp, q = val[15,4], val[87,5], val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/) end
果真是解释性语言, 真是太简洁, 太美了.
测试, 使用Unix pipe.
% cat input/ncdc/sample.txt | /path/to/ruby_scripts/max_temperature_map.rb
reduce函数:
#! /usr/bin/env ruby last_key, max_val = nil, 0 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key
稍微按代码理解下, ruby的可读性是非常高的. 听说go的可读性也非常高.
last_key
暂时不知道作用. 但是, 首先要知道, reduce的输入是map的输出, 并且已经被Hadoop按照key排过序了.
所以, 如果 last_key
不为空, 可以代码还没有到最后; last_key!=key
, 就是说这个key是一个新key. 打印之前key一条记录, 因为之前的key所有值都处理完了.
并且更新last_key, 最大值就是val.
如果是重复的key, 就和存储的最大值进行比较.
最后再打印最后一个K/V pair.
后面的调用hadoop测试, 先略过.
2.6 Hadoop的Pipes
C++, 套接字而不是标准流. 略过.
3 Hadoop 分布式文件系统
数据集的分区(partion)存储到若干个计算机节点.
管理 网络中跨多台计算机存储的文件系统称作 分布式文件系统 (distributed filesystem).
Hadoop->HDFS, 但是Hadoop还继承了其他文件系统, 比如本地文件系统.
3.1 HDFS的设计
流式数据访问模式来存储超大文件.
- 超大文件:
- 流式数据访问: 一次写入, 多次读取是最高效的访问模式.
- 商用硬件:
- 低时间延迟的数据访问: HDFS为高数据吞吐量优化, 高时间延迟. 对于低延迟, HBase是更好的选择.
- 大量的小文件: namenode存储文件系统的元数据, 在内存中.
- 多用户写入, 任意修改文件: HDFS中的文件可能只有一个writer, 且写操作总是append到末尾.
3.2 HDFS的概念
3.2.1 数据块(block)
默认为64MB, 很多情况下设置成128MB.
同样文件也被划分成块大小的分块(chunck), 作为独立的存储单元.
之所以块如此之大, 是为了最小化寻址开销.
分块的好处也很多, 可以分布式存储, 提高数据容错能力和可用性. 块只是存储数据. 没有元数据信息, 所以元数据可以单独管理.
显示块信息:
% hadoop fsck / -files -blocks
3.2.2 namenode和datanode
HDFS集群上两类节.
管理者-工作者模式, 一个namenode(管理者)多个datanode(工作者).
namenode管理文件系统的命名空间, 维护文件系统树及整棵树内的所有文件和目录: 命名空间镜像文件和编辑日志文件, 永久保存在本地磁盘.
namenode也记录每个文件中各个块所在的数据节点信息, 但不永久保存块的位置信息, 会在系统启动是由数据节点重建.
客户端(client)代表用户通过于namenode和datanode交互访问整个文件系统.
datanode是工作节点. 存储/检索数据块(受client和namenode调度), 定期像namenode发送它们所存储块的列表.
namenod的容错非常重要.
- 备份组成文件系统元数据持久状态的文件. 原子操作.
- 辅助namenode, 不是namenode. 定义通过编辑日志合并命名空间镜像, 防止编辑日志过大. 保存状态滞后于主节点. 所以, 若主节点全部失效, 一般把namenode元数据全部复制到辅助namenode上, 作为新的namenode运行.
3.2.3 命令行接口
伪分布配置.
fs.default.name
, 设置为 hdfs://localhost/
, 为Hadoop的默认文件系统. HDFS的daemon会通过此确定HDFS namenode的主机及端口. 默认8020.
dfs.replication
, 此时设置为1, 一般为3. 因为是单机模式. 不然为警告块副本不足.
3.2.4 基础文件系统操作
帮助:
% hadoop fs -help
复制:
% hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://localhost/user/tom/quangle.txt
检查复制是否成功:
% hadoop fs -copyFromLocal quangle.txt quangle.copy.txt % md5 input/docs/quangle.txt quangle.copy.txt
创建文件夹:
% hadoop fs -mkdir books % hadoop fs -ls .
3.2.5 Hadoop文件系统
Hadoop有一个抽象的文件系统, HDFS只是它的一种实现.
org.apache.hadoop.fs.FileSystem
, 该抽象类定义了Hadoop中的一个文件系统接口. 具体实现有: local, HDFS, HFTP, HSFTP, HAR, hfs, FTP S3, S3.
3.2.6 接口
文件系统的命令行解释器就是一个java
Thrift, C语言, FUSE, WebDAV.
3.2.7 其他HDFS接口
HTTP(只读), FTP.
3.3 Java 接口
深入探索Hadoop的 Filesystem
类: 与Hadoop某一文件系统交互的API.
由于本书使用的是0.20.0版本, 并且成文方式类似于字典. 这一块的学习暂时仅浏览一遍. 不需记录.
不过, 还是记录下涉及到的类的名字.
3.3.1 从Hadoop URL中读取数据
类名/方法名 | 简单描述 |
---|---|
java.net.URL | 从Hadoop文件系统读取对象时, 打开数据流 |
URL().openStream() | 方法 |
IOUtils.closeStream() | Hadoop简洁的IOUtils类 |
URL.setURLStreamHandlerFactory() | |
FsUrlStreamHandlerFactory() | |
3.3.2 通过FileSystem API读取数据
类名/方法名 | 简单描述 |
---|---|
static FileSystem get(Configuration conf) throws IOException | 获取FileSystem实例的两种静态工厂方法 |
static FileSystem get(URI uri, Configuration conf) throws IOException | |
Configuration | 封装客户端或服务端配置 |
FSDataInputStream open(Path f) throws IOException | 调用open()获取文件的输入流 |
abstract FSDataInputStream open(Path f, int bufferSize) throws IOException | 调用open()获取文件的输入流 |
Hadoop Path | 不用java.io.File对象表示文件 |
- FSDataInputStream
FSDataInputStream对象, 而不是标准的java.io类对象.
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable { // implementation elided }
Seekable
接口支持在文件中找到任意位置.public interface Seekable { void seek(long pos) throws IOException; //定位到绝对位置, 于java.io.InputStream#skip()不同 long getPos() throws IOException; boolean seekToNewSource(long targetPos) throws IOException; }
注意:
seek()
,高开销, 建议用流数据构建应用的访问模式, 比如使用MapReduce.PositionedReadable
接口, 从一个偏移量处读取文件的一部分.public interface Positionedable { // int read() // void readFully() // void readFully() }
3.3.3 写入数据
3.3.4 查询文件系统
3.3.5 删除数据
3.4 数据流
3.4.2 文件写入分析
我们考虑的情况是, 如何创建一个新文件, 并把数据写入该文件, 最后关闭该文件.
- 客户端通过
DistributedFileSystem
对象调用create()
创建文件; DistributedFileSystem
对 namenode 创建一个 RPC 调用. 此时文件还没有相对应的数据块. namenode执行各种检查确保文件不存在, 且客户端有创建的权限. 如果通过, 创建一条新文件记录, 否则, 抛出 IOException;DistributedFileSystem
对象向客户端返回一个FSDataOutputStream
对象, 由此, 客户端开始写入数据. 同时,FSDataOutputStream
对象封装一个DFSoutPutstream
对象, 该对象负责处理 datanode 和 namenode 之间的通信;- 客户端如何写入数据呢?
FSDataOutputStream
将数据分成一个个数据包, 并写入内部队列, 称为”数据队列”(data queue).DataStreamer
处理队列, 根据 datanode 列表来要求 namenode 分配适合的新块储存数据备份. 这一组 datanode 构成一个 pipe line – 我们假设副本为3.DataStreamer
将数据包流式传输到管线中的第一个 datanode, 该 datanode 存储数据包并将它发送到管线中的第2个 datanode. 同样第2个存储并发送到第3个(最后一个). DFSOutputStream
也维护着一个内部数据包队列来等待 datanode 的收到确认回执, 称为确认队列(ack queue). 当收到所有 datanode 的确认信息, 该队列会从确认队列中删除;- 完成后, 调用
close()
; - 并等待 namenode 确认已经知道文件由哪些块组成(通过
DataStreamer
询问数据块的分配);
3.5 一致模型
文件系统的 一致模型 (coherency model) 描述了对文件读/写的数据可见性.
4 MapReduce 应用开发
5 实例分析
5.1 Hadoop 在 Last.fm 的应用
存储, 处理和管理这些用户数据成为挑战.
为什么使用 Hadoop:
- 分布式系统(ex: 网志, 用户收听音乐的数据)提供冗余备份而不增加额外的费用.
- 可以方便的增添便宜, 普通的硬件来满足可扩展性需求.
- Hadoop 免费.
- Hadoop 开源.
在这些集群中, 运行着数百种执行各种操作的日常作业, ex, 日志分析, A/B测试评测, 即时处理和图标生成. 本节重点介绍产生图表.
5.2 用 Hadoop 产生图表
针对每个国家或个人音轨数据产生一周汇总图表. 以天, 周或月为单位执行.
在 Last.fm 排行上, 有两类, 一类是 listeners(在网络电台上, 流技术), 一类是 plays(用户播放自己的音乐, 比如通过PC客户端, 第三方应用啥的, 就像虾米网曾经推过的一个虾小米).
处理数据时, 也分为两类. 对数据源进行划分, 避免 Last.fm 推荐系统出现信息反馈循环的问题. 而且, 很明显, Last.fm 的推荐系统只是用第二类(scrobble). Last.fm 的 Hadoop 一项重要任务是, 接收这些收听数据, 做统计并形成能够在 Last.fm 网站上进行显示和作为其他 Hadoop 程序输入的数据格式. 由 Track Statistics(音轨统计)程序实现.
5.3 Track Statistics 程序
乐收听信息被发送到 Last.fm 时, 首先经过验证和转换阶段, 最终结果是一系列由空格分隔的文本文件, 包含用户ID(UserId), 音乐(磁道)ID(TrackId), 这首音乐被收藏的次数(Scrobble, 这个奇怪了, 一个用户只能收藏一次吧, 所以是 0/1), 这首音乐在电台收听的次数(Radio)以及被跳过的次数(Skip).
表16-1 收听数据
UserId | TrackId | Scrobble | Radio | Skip |
---|---|---|---|---|
111115 | 222 | 0 | 1 | 0 |
111113 | 225 | 1 | 0 | 0 |
111117 | 223 | 0 | 1 | 1 |
11115 | 225 | 1 | 0 | 0 |
这些作为初始数据提供给 Track Statics 程序, 它包括利用这个输入数据计算各种数据值的两个作业和一个用来合并结果的作业.
Unique Listeners 作业模块统计收听同一首音频的不同用户数(第一次访问, 忽略同一用户对该首音频的多次访问).Sum 作业模块通过对所有用户的所有收听信息进行计数来为每个音频统计收听总数, 收藏总数, 电台总数以及被跳过的总数.
这两个作业的输入格式相同, 但 Unique Listeners 作业模块负责为每个用户对每个音频产生统计值, Sum 作业模块为每个音频产生统计值. 最后 Merger 作业模块负责合并两个作业产生的中间输出数据得到最终统计结果. 产生下列几项数值:
- 不同的听众数.
- 音频的收藏次数.
- 音频在电台中的点播(播放)次数.
- 音频在电台被收听的总次数.
- 音频在电台被跳过的次数.