开发者学堂课程【Hadoop 分布式计算框架 MapReduc:ReduceJoin 案例Reduce】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/94/detail/1548
ReduceJoin 案例 Reduce
代码示例
package com.liun.mr.reducejoin;
import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList;
import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
public class TableReducer<E> extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
// 存储所有订单集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); // 存储产品信息 TableBean pdBean = new TableBean();
for (TableBean tableBean : values) {
if ("order".equals(tableBean.getFlag())) {// 订单表
TableBean tmpBean = new TableBean();
try { // 拷贝传递过来的每条订单数据到集合中 BeanUtils.copyProperties(tmpBean, tableBean);
orderBeans.add(tmpBean);
} catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } else {// 产品表 try { // 拷贝传递过来的产品表到内存中 BeanUtils.copyProperties(pdBean, tableBean);
} catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } }
for (TableBean tableBean : orderBeans) { tableBean.setPname(pdBean.getPname());
// 写出 context.write(tableBean, NullWritable.get()); } } } |