MapReduce中一次reduce方法的调用中key的值不断变化分析及源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介:   摘要:mapreduce中执行reduce(KEYIN key, Iterable values, Context context),调用一次reduce方法,迭代value集合时,发现key的值也是在不断变化的,这是因为key的地址在内部会随着value的迭代而不断变化。

  摘要:mapreduce中执行reduce(KEYIN key, Iterable<VALUEIN> values, Context context),调用一次reduce方法,迭代value集合时,发现key的值也是在不断变化的,这是因为key的地址在内部会随着value的迭代而不断变化。

  序:我们知道reduce方法每执行一次,里面我们会通过for循环迭代value的迭代器。如果key是bean的时候,for循环里面value值变化的同时我们的bean值也是会跟随着变化,调用reduce方法时传参数就传了一次key的值,但是在方法内部迭代的时候,key值在变化,那他怎么变动的?

  误区:在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法传入的key和value的迭代器如<hello,{1,1,1,1,1,1.....}>。

 

给一个需求来观察现象

  对日志数据中的上下行流量信息汇总,并输出按照总流量倒序排序的结果,且该需求日志中手机号是不会重复的——即不会存在多条数据,手机号相同,且流量不同,还需要进行多条数据的汇总。

数据如下:

13888888801,1,9,10
13888888802,5,5,10
13888888803,2,7,9
13888888804,4,6,10
13888888805,6,4,10
13888888806,1,0,1

分析

  基本思路:实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输。

  MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key,所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。

package cn.intsmaze.flowsum.SortBean;
public class FlowBeanOne implements WritableComparable<FlowBeanOne> {

    private long upFlow;
    private long dFlow;
    private long sumFlow;
    private long phone;
    
    // 序列化框架在反序列化操作创建对象实例时会调用无参构造
    public FlowBeanOne() {
    }

    // 序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dFlow);
        out.writeLong(sumFlow);
        out.writeLong(phone);
    }

    // 反序列化方法,注意: 字段的反序列化顺序与序列化时的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.dFlow = in.readLong();
        this.sumFlow = in.readLong();
        this.phone = in.readLong();
    }
    
    public void set(long phone,long upFlow, long dFlow) {
        this.phone=phone;
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dFlow + "\t" + sumFlow+ "\t" + phone;
    }
  
//自定义倒序比较规则,总流量相同视为同一个key. @Override
public int compareTo(FlowBeanOne o) { return (int)(o.getSumFlow() - this.sumFlow); } get,set...... }

代码实现如下:

 
 
package cn.intsmaze.flowsum.SortBean;
/**
* 实现流量汇总并且按照流量大小倒序排序 * 前提:处理的数据是已经汇总过的结果文件,然后再次对该文件进行排序 * @author */ public class FlowSumSort { public static class FlowSumSortMapperOne extends Mapper<LongWritable, Text, FlowBeanOne, Text> { FlowBeanOne k = new FlowBeanOne(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); long phoneNbr = Long.parseLong(fields[0]); long upFlowSum = Long.parseLong(fields[1]); long dFlowSum = Long.parseLong(fields[2]); k.set(phoneNbr,upFlowSum, dFlowSum);//这里对bean作为key。 context.write(k, v); } } public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {
@Override
protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { System.out.println("-------------------"); for (Text text : phoneNbrs) { System.out.println(bean); context.write(text, bean); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(FlowSumSortMapperOne.class); job.setReducerClass(FlowSumSortReducerOne.class); job.setMapOutputKeyClass(FlowBeanOne.class); job.setMapOutputValueClass(Text.class); // 告诉框架,我们的mapperreducer输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanOne.class); // 告诉框架,我们要处理的文件在哪个路径下 FileInputFormat.setInputPaths(job, new Path("d:/intsmaze/input/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("d:/intsmaze/output/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }

  这里要注意,因为是汇总排序,所以reduce的并行度必须为1,。除了使用框架的组件外,我们还可以通过使用reduce的cleanup方法,自己在reduce端对收集到的数据进行汇总排序。

输出的结果确实是我们想要的结果: 
    6    4    10    13888888805
    4    6    10    13888888804
    5    5    10    13888888802
    1    9    10    13888888801
    2    7    9    13888888803
    1    0    1    13888888806
但是观察我们在控制台打印的信息:
-------------------
6    4    10    13888888805
4    6    10    13888888804
5    5    10    13888888802
1    9    10    13888888801
-------------------
2    7    9    13888888803
-------------------
1    0    1    13888888806

灵异现象

  执行job代码后,我们发现reduce任务中的reduce()方法只被调用了三次,参数key只被传入了三次,但是观察发现,key在一次reduce方法的调用中值是不断变化的,这有是怎么回事?
  我们重写的reduce方法如下:看参数确实是传入一个key以及key对应的value的迭代器集合,其实这个方法的参数只是一个误导,key值会随着value的迭代而不断的变化。reduce端的reduce方法接到map传来的数据并不是我们根据参数类型而认为的<hello,{1,1,1,1,1,1.....}>而是<hello,1>,<hello,1>,<hello,1>,<hello,1>......。
 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

来看看hadoop2.6.4源码解析吧:

因为这个问题是一年前遇到的,看完源码搞明白后,并没有时间去整理,所以再次解析有所不足。

Reducer源码解析

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }

  /**
   * 这个方法我们不需要管,因为我们实现的类重写了该方法。
   */
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  //通过debug我们可以看到,数据在结束map任务执行reduce任务的时候,reduce端会先调用这个方法,而调用这个
  //方法的类是我们实现的reduce类,通过继承调用该方法,然后在该方法里面调用我们实现类重写的reduce方法。
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {//这个地方调用ReduceContextImpl的方法进行判断
        reduce(context.getCurrentKey(), context.getValues(), context);//这个地方调用我们的实现类的reduce方法走我们的逻辑代码了
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}

ReduceContextImpl源码解析

(由于代码太多,我只截取了部分主要的代码)

public class ReduceContextImpl {
  private RawKeyValueIterator input;//这个迭代器里面存储的key-value对元素。
  private KEYIN key;                                  // current key
  private VALUEIN value;                              // current value
  private boolean firstValue = false;                 // first value in key
  private boolean nextKeyIsSame = false;              // more w/ this key
  private boolean hasMore;                            // more in file
  private ValueIterable iterable = new ValueIterable();//访问自己的内部类 
  
  public ReduceContextImpl() throws InterruptedException, IOException{
    hasMore = input.next();//对象创建的时候,就先判断reduce接收的key-value迭代器是否有元素,并获取下一个元素
  }
  /** 创建完成就调用该方法 ,开始处理下一个唯一的key*/
  public boolean nextKey() throws IOException,InterruptedException {
    while (hasMore && nextKeyIsSame) {
    //判断迭代器是否还有下一个元素已经下一个元素是否和上一个已经遍历出来的key-value元素的key是不是一样
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }
  /**
   * Advance to the next key/value pair.
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    
    //获取迭代器下一个元素的key
    DataInputBuffer nextKey = input.getKey();
    //设置当前key的坐标
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    
    //反序列化得到当前key对象
    key = keyDeserializer.deserialize(key);
    //获取迭代器下一个元素的value
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
        
    //反序列化value
    value = valueDeserializer.deserialize(value);
    currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();
    if (isMarked) {
        //存储下一个key和value
      backupStore.write(nextKey, nextVal);
    }
    
    //迭代器向下迭代一次
    hasMore = input.next();
    //如果还有元素,则进行比较,判断key是否相同
    if (hasMore) {
      nextKey = input.getKey();
      //这个地方也是比较关键的:
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    
    inputValueCounter.increment(1);
    return true;
  }
  
  //一个迭代器模式的内部类
  protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
    private boolean inReset = false;
    private boolean clearMarkFlag = false;
    @Override//它并不仅仅是判断迭代器是否还有下一个元素,而且还要判断下一个元素和上一个元素是不是相同的key
    public boolean hasNext() {
        if (inReset && backupStore.hasNext()) {
          return true;
        } 
      return firstValue || nextKeyIsSame;
    }
    @Override
    //这个地方要注意了,其实在获取下一个元素的时候主要调用的是nextKeyValue();
    public VALUEIN next() {
      if (inReset) {
          if (backupStore.hasNext()) {
            backupStore.next();
            DataInputBuffer next = backupStore.nextValue();
            buffer.reset(next.getData(), next.getPosition(), next.getLength()
                - next.getPosition());
            value = valueDeserializer.deserialize(value);
            return value;
          } else {
            inReset = false;
            backupStore.exitResetMode();
            if (clearMarkFlag) {
              clearMarkFlag = false;
              isMarked = false;
            }
          }
      } 
      // if this is the first record, we don't need to advance
      if (firstValue) {
        firstValue = false;
        return value;
      }
      // otherwise, go to the next key/value pair
    nextKeyValue();//该方法就是获取下一个key,value对,key值的变化也就在这里表现出来了。
    return value;
    }
  }
  
  //内部类,实现迭代器,具备迭代器功能
  protected class ValueIterable implements Iterable<VALUEIN> {
    private ValueIterator iterator = new ValueIterator();
    @Override
    public Iterator<VALUEIN> iterator() {
      return iterator;
    } 
  }
  public  Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
    return iterable;
  }
}

  简单一句话总结就是:ReduceContextImpl类的RawKeyValueIterator input迭代器对象里面存储中着key-value对的元素, 以及一个只存储value的迭代器,然后每调一次我们实现的reduce方法,就是传入ValueIterable迭代器对象和当前的key。但是我们在方法里面调用迭代器的next方法时,其实调用了nextKeyValue,来获取下一个key和value,并判断下一个key是否和 上一个key是否相同,然后决定hashNext方法是否结束,同时对key进行了一次重新赋值。

  这个方法获取KV的迭代器的下一个KV值,然后把K值和V值放到之前传入我们自己写的Reduce类的方法中哪个输入参数的地址上,白话说:框架调用我们写的reduce方法时,传入了三个参数,然后我们方法内部调用phoneNbrs.hashNext方法就是调用的ReduceContextImpl的内部类ValueIterator的hashNext方法,这个方法里面调用了ReduceContextImpl内的nextKeyValue方法,该方法内部又清除了之前调用用户自定义reduce方法时传入的k,v参数的内存地址的数据,然后获取了RawKeyValueIterator input迭代器的下一个KV值,然后把k值和V值放入该数据。这就是原因了。

 再看我们的reduce实现类

    public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {
        
        @Override
        protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {
            System.out.println("-------------------");
            for (Text text : phoneNbrs) {//这里就是迭代器,相当于调用ValueIterable.hashNext
                System.out.println(bean);
                context.write(text, bean);
            }
        }
    }

   最近实在是不知道学点什么了呦,就把hadoop回顾一下,当初学时,为了快速上手,都是记各种理论以及结论,没有时间去看源码验证,也不知道人家说的结论是否正确,这次回滚就是看源码验证当初结论的正确性。这也快一年没有用了,最近一直从事分布式实时计算的研究。

                           

作者: intsmaze(刘洋)
老铁,你的--->推荐,--->关注,--->评论--->是我继续写作的动力。
微信公众号号:Apache技术研究院
由于博主能力有限,文中可能存在描述不正确,欢迎指正、补充!
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
3月前
|
机器学习/深度学习 数据采集 存储
时间序列预测新突破:深入解析循环神经网络(RNN)在金融数据分析中的应用
【10月更文挑战第7天】时间序列预测是数据科学领域的一个重要课题,特别是在金融行业中。准确的时间序列预测能够帮助投资者做出更明智的决策,比如股票价格预测、汇率变动预测等。近年来,随着深度学习技术的发展,尤其是循环神经网络(Recurrent Neural Networks, RNNs)及其变体如长短期记忆网络(LSTM)和门控循环单元(GRU),在处理时间序列数据方面展现出了巨大的潜力。本文将探讨RNN的基本概念,并通过具体的代码示例展示如何使用这些模型来进行金融数据分析。
444 2
|
2月前
|
数据采集 自然语言处理 搜索推荐
基于qwen2.5的长文本解析、数据预测与趋势分析、代码生成能力赋能esg报告分析
Qwen2.5是一款强大的生成式预训练语言模型,擅长自然语言理解和生成,支持长文本解析、数据预测、代码生成等复杂任务。Qwen-Long作为其变体,专为长上下文场景优化,适用于大型文档处理、知识图谱构建等。Qwen2.5在ESG报告解析、多Agent协作、数学模型生成等方面表现出色,提供灵活且高效的解决方案。
203 49
|
2月前
|
测试技术 开发者 Python
使用Python解析和分析源代码
本文介绍了如何使用Python的`ast`模块解析和分析Python源代码,包括安装准备、解析源代码、分析抽象语法树(AST)等步骤,展示了通过自定义`NodeVisitor`类遍历AST并提取信息的方法,为代码质量提升和自动化工具开发提供基础。
73 8
|
1月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
59 4
|
2月前
|
数据采集 存储 自然语言处理
基于Qwen2.5的大规模ESG数据解析与趋势分析多Agent系统设计
2022年中国上市企业ESG报告数据集,涵盖制造、能源、金融、科技等行业,通过Qwen2.5大模型实现报告自动收集、解析、清洗及可视化生成,支持单/多Agent场景,大幅提升ESG数据分析效率与自动化水平。
129 0
|
3月前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
186 1
|
4月前
|
存储 缓存 自然语言处理
深度解析ElasticSearch:构建高效搜索与分析的基石
【9月更文挑战第8天】在数据爆炸的时代,如何快速、准确地从海量数据中检索出有价值的信息成为了企业面临的重要挑战。ElasticSearch,作为一款基于Lucene的开源分布式搜索和分析引擎,凭借其强大的实时搜索、分析和扩展能力,成为了众多企业的首选。本文将深入解析ElasticSearch的核心原理、架构设计及优化实践,帮助读者全面理解这一强大的工具。
321 7
|
5月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
93 1
|
4月前
|
监控 安全 网络安全
恶意软件分析:解析与实践指南
【8月更文挑战第31天】
258 0
|
5月前
|
网络协议 NoSQL 网络安全
【Azure 应用服务】由Web App“无法连接数据库”而逐步分析到解析内网地址的办法(SQL和Redis开启private endpoint,只能通过内网访问,无法从公网访问的情况下)
【Azure 应用服务】由Web App“无法连接数据库”而逐步分析到解析内网地址的办法(SQL和Redis开启private endpoint,只能通过内网访问,无法从公网访问的情况下)

热门文章

最新文章

推荐镜像

更多