通过avro合并大文件 并计算文件词频

简介:

通过avro合并大文件 并计算文件词频

public class AvroFilemr {

// 读取avro文件 每读取一条记录 其实是一个小文件,对其进行wordcount解析
// 并以单词,1 的形式发送到reducer

public static class AvroFilemrMap extends Mapper<AvroKey<SmallFile>, NullWritable, Text, IntWritable> {
    private Text outKey = new Text();
    private final IntWritable ONE = new IntWritable(1);
    private String[] infos;
    private ByteBuffer content;

    @Override
    protected void map(AvroKey<SmallFile> key, NullWritable value,
            Mapper<AvroKey<SmallFile>, NullWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {

        content = key.datum().getContent();
        infos = new String(content.array()).split("\\s");
        for (String string : infos) {
            outKey.set(string);
            context.write(outKey, ONE);
        }
    }
}

// 把wordcount的计算结果 以word_count.avsc的模式输出成avro文件
public static class AvroFilemrReduce extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {
    private int sum;
    private Schema writeSchema;
    private GenericRecord record;
    private AvroKey<GenericRecord> outKey = new AvroKey<GenericRecord>();
    private NullWritable outValue = NullWritable.get();

    @Override
    protected void setup(Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable>.Context context)
            throws IOException, InterruptedException {
        Parser parser = new Parser();
        writeSchema = parser.parse(new File("src/main/avro/wordCount.avsc"));
        record = new GenericData.Record(writeSchema);

    }

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable>.Context context)
            throws IOException, InterruptedException {

        sum = 0;
        for (IntWritable value : values) {
            sum += value.get();

        }
        record.put("word", key.toString());
        record.put("count", sum);
        outKey.datum(record);
        context.write(outKey, outValue);
    }
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration =new Configuration();
    Job job=Job.getInstance(configuration);
    job.setJarByClass(AvroFilemr.class);
    job.setJobName("读avro文件计算并把结果写入到新的avro文件");
    
    job.setMapperClass(AvroFilemrMap.class);
    job.setReducerClass(AvroFilemrReduce.class);
    
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);
    
    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    
    AvroJob.setInputKeySchema(job, SmallFile.getClassSchema());
    Parser parser =new Parser();
    AvroJob.setOutputKeySchema(job, parser.parse(new File("src/main/avro/wordCount.avsc")));
    
    FileInputFormat.addInputPath(job, new Path("/AvroMErgeSmallFile"));
    Path outputPath =new Path("/AvroFilemr");
    outputPath.getFileSystem(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);
    
    System.exit(job.waitForCompletion(true)?0:1);
}

}

相关文章
|
消息中间件 Java Kafka
kafka 客户端使用Avro序列化
kafka 客户端使用Avro序列化
223 0
|
分布式计算 Java Hadoop
JAVA—其他—Avro序列化
Avro是hadoop的一个用于序列化的组件 理解特点: 1. 高效 2. 序列化后体积小 3. 动态 动态指的是数据的结构一旦定义,可以在多处语言生成实体类
298 0
|
存储 分布式计算 Java
深入对比Java与Hadoop大数据序列化机制Avro
Java有自己提供的序列化机制,而我们的Hadoop也提供了自己的序列化机制,二者究竟有什么差异呢?为什么Hadoop要重新设计自己的序列化体系?序列化大数据对象的过程,Writable接口底层源码实现。
2158 0
|
Java Maven 网络协议
Avro序列化和RPC实现
序列化和反序列化 Maven:Pom.xml <dependencies> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.
4287 0
|
SQL 存储 分布式计算
Avro数据序列化
序列化:把结构化的对象转换成字节流,使得能够在系统中或网络中通信 需要把数据存储到hadoop的hbase 常用序列化系统 thrift   (hive,hbase) Protocol Buffer (google) avro 本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077
1457 0
|
存储 Java Apache
rpc框架之 avro 学习 2 - 高效的序列化
同一类框架,后出现的总会吸收之前框架的优点,然后加以改进,avro在序列化方面相对thrift就是一个很好的例子。借用Apache Avro 与 Thrift 比较 一文中的几张图来说明一下,avro在序列化方面的改进: 1、无需强制生成目标语言代码 avro提供了二种使用方式,一种称之为Sepcific方式,这跟thrift基本一致,都是写定义IDL文件,然后用编译器(或插件)生成目标class,另一种方式是Generic,这种方式下,不用生成目标代码,而是采用动态加载定义文件的方式,将 FieldName - FieldValue,以Map的方式存储。
1261 0
|
4月前
|
文字识别 C# Python
使用C#将几个Excel文件合并去重分类
使用C#将几个Excel文件合并去重分类
41 3