mapreduce辅助排序和序列化的实例练习

简介: mapreduce辅助排序和序列化的实例练习

需求


数据


0000001 Pdt_01  222.8
0000002 Pdt_06  722.4
0000001 Pdt_05  25.8
0000003 Pdt_01  222.8
0000003 Pdt_01  33.8
0000002 Pdt_03  522.8
0000002 Pdt_04  122.4


订单号 x 价格

希望的到的数据: 根据订单号得到分区,每个分区清洗出最大的价格

分区1

0000001 222.8

分区2

0000002 722.4

分区3

0000003 222.8


思路


这个每行数据可以定义一个orderbean,实现序列化接口,自定义比较器,先根据id排序,然后根据价格倒序,重写写tostring方法(这里定义输出格式)map的key就是一个orderben,value可以为空分组; 根据orderbean的id进行分组,因为上面一步价格倒序了,所以分组得到的第一条数据都是最大的价格,满足要求分区,根据订单号id进行分区

reduce合并,输出key orderbean,value可以为空自定义main函数,将map, 分组比较器,分区,reduce等写在job中,执行job任务,指定到文件夹输出即可码代码


bean


@Setter
@Getter
public class OrderBean implements WritableComparable<OrderBean> {
    private int id;
    private double price;
    @Override
    public int compareTo(OrderBean o) {
        if (this.id > o.id) {
            return 1;
        } else if (this.id < o.id) {
            return -1;
        } else {
            return this.price - o.price >  0 ? -1 : 1;
        }
    }
    @Override
    public void write(DataOutput output) throws IOException {
        output.writeInt(this.id);
        output.writeDouble(this.price);
    }
    @Override
    public void readFields(DataInput input) throws IOException {
        this.id = input.readInt();
        this.price = input.readDouble();
    }
    @Override
    public String toString() {
        return this.id + "\t" + this.price;
    }
}


map


//0000002 Pdt_04  122.4
//mapreduce根据key进行排序,所以k2,v2的类型应该为 FlowBean, NullWritable
public class OrderMap extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    OrderBean bean = new OrderBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        String[] split = s.split("\t");
        bean.setId(Integer.parseInt(split[0]));
        bean.setPrice(Double.parseDouble(split[2]));
        context.write(bean, NullWritable.get());
    }
}


分组比较器(辅助排序)、


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupComparator extends WritableComparator {
    public GroupComparator() {
        //按照自定义的orderbean排序,第二个参数是辅助排序
        super(OrderBean.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean  oa = (OrderBean)a;
        OrderBean  ob = (OrderBean)b;
         if(oa.getId() > ob.getId()){
             return 1;
         }else if(oa.getId() < ob.getId()){
             return -1;
         }else{
             return 0;
         }
    }
}


分区


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class OrderPartition extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
        return (orderBean.getId() & Integer.MAX_VALUE) % 3;
    }
}


reduce


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderReduce extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean bean, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(bean, NullWritable.get());
    }
}


测试job任务


public class MainOrder {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"F:\\input\\GroupingComparator.txt", "F:\\output\\GroupingComparator4"};
        //获取配置文件
        Configuration conf = new Configuration();
        //创建job任务
        Job job = Job.getInstance(conf);
        // 加载jar包
        job.setJarByClass(MainOrder.class);
        //指定Map类和map的输出类型 Text, NullWritable
        //关联map和reduce
        job.setMapperClass(OrderMap.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 设置最终输出类型
        job.setReducerClass(OrderReduce.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        //指定辅助排序
        job.setGroupingComparatorClass(GroupComparator.class);
        //设置分区
        job.setPartitionerClass(OrderPartition.class);
        job.setNumReduceTasks(3);
        //指定数据输入的路径和输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}


查看结果


1dc618a0ed9580ce8bfa6facb208c08f.png5d4c6812c8535adbb050f4ddf2e1bce8.png


完美!!大功告成!!


相关文章
|
1月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
131 4
|
1月前
|
XML 存储 BI
如何把一个 ABAP 类的实例,序列化成 XML 字符串试读版
如何把一个 ABAP 类的实例,序列化成 XML 字符串试读版
12 0
|
4月前
|
网络安全 PHP
[网络安全/CTF] BUUCTF极客大挑战2019PHP解题详析(Dirsearch使用实例+php反序列化)
[网络安全/CTF] BUUCTF极客大挑战2019PHP解题详析(Dirsearch使用实例+php反序列化)
39 0
|
4月前
|
存储 分布式计算 Hadoop
MapReduce序列化【用户流量使用统计】
MapReduce序列化【用户流量使用统计】
|
5月前
|
分布式计算 Java Hadoop
26 MAPREDUCE中的序列化
26 MAPREDUCE中的序列化
34 1
|
7月前
|
分布式计算 资源调度 Java
大数据MapReduce统计单词实例
大数据MapReduce统计单词实例
181 0
|
8月前
|
分布式计算 资源调度 Hadoop
Hadoop基础学习---5、MapReduce概述和WordCount实操(本地运行和集群运行)、Hadoop序列化
Hadoop基础学习---5、MapReduce概述和WordCount实操(本地运行和集群运行)、Hadoop序列化
|
11月前
|
存储 安全 PHP
【反序列化漏洞】phar反序列化原理&实例分析
简单来说phar就是php压缩文档。它可以把多个文件归档到同一个文件中,而且不经过解压就能被 php 访问并执行,与file:// php://等类似,也是一种流包装器。
171 0
|
11月前
|
存储 分布式计算 资源调度
|
11月前
|
存储 分布式计算 Hadoop
MapReduce排序
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数