首先编写 Reduce 端编程框架,自定义的 FlowSumReducer 需要继承父类 Reducer,输入数据和输出数据都是**KV 对**的形式。具体框架代码如下:
publicclassFlowSumReducerextendsReducer<Text, FlowBean, Text, FlowBean> { }
- **KEYIN**:对应 Mapper 端输出的 KEYOUT,即每个手机号,所以是 String,对应 Hadoop 中的 Text
- **VALUEIN**:对应 Mapper 端输出的 VALUEOUT,即封装的流量信息类,所以是 FlowBean
- **KEYOUT**:用户自定义逻辑方法返回数据中key的类型,由用户业务逻辑决定,在此程序中,我们输出的key是手机号,所以是String,对应 Hadoop 中的 Text
- **VALUEOUT**:用户自定义逻辑方法返回数据中value的类型,由用户业务逻辑决定,在此程序中,我们输出的value是累加求和后封装的流量信息类,所以是 FlowBean
已知 Reducer 中的业务逻辑写在 **reduce() 方法**中,在此 reduce()方法中我们需要接收 MapTask 的输出结果,然后按照 key(手机号) 对 value(上行流量、下行流量) 做**汇总操作**。具体代码如下所示:
protectedvoidreduce(Textkey, Iterable<FlowBean>values, Reducer<Text, FlowBean, Text, FlowBean>.Contextcontext) throwsIOException, InterruptedException { longsumUpFlow=0L; // 总上行流量longsumDownFlow=0L; // 总下行流量for (FlowBeanfb : values) { sumUpFlow+=fb.getUpFlow(); sumDownFlow+=fb.getDownFlow(); } // 获取总上行流量和总下行流量,对其进行封装FlowBeanresultsum=newFlowBean(sumUpFlow, sumDownFlow); // 将手机号作为key,将封装的流量信息类作为value,写出最终结果context.write(key, resultsum); }
FlowSumReducer.java 的完整代码如下所示:
publicclassFlowSumReducerextendsReducer<Text, FlowBean, Text, FlowBean> { /** * 框架每传递一组数据<13502468823,{FlowBean,FlowBean,FlowBean...}>调用一次reduce方法* reduce中的业务逻辑是遍历values,然后进行累加求和后输出*/protectedvoidreduce(Textkey, Iterable<FlowBean>values, Reducer<Text, FlowBean, Text, FlowBean>.Contextcontext) throwsIOException, InterruptedException { longsumUpFlow=0L; // 总上行流量longsumDownFlow=0L; // 总下行流量for (FlowBeanfb : values) { sumUpFlow+=fb.getUpFlow(); sumDownFlow+=fb.getDownFlow(); } // 获取总上行流量和总下行流量,对其进行封装FlowBeanresultsum=newFlowBean(sumUpFlow, sumDownFlow); // 将手机号作为key,将封装的流量信息类作为value,写出最终结果context.write(key, resultsum); } }