小文件问题
针对HDFS而言,每一个小文件在namenode中都会占用150字节的内存空间,最终会导致集群中虽然储了很多个文件,但是文件的体积并不大,这样就没有意义了。
针对MapReduce而言,每一个小文件都是一个Block,都会产生一个InputSplit,最终每一个小文件都产生一个map任务,这样会导致同时启动太多的Map任务,Map任务的启动是非常消耗性能的,但是启动了以后执行了很短时间就停止了,因为小文件的数据量太小了,这样就会造成任务执行消耗的时间还没有有启动任务消耗的时间多,这样也会影响MapReduce执行的效率。
HDFS提供了两种类型的容器,分别是SequenceFile 和 MapFile
SequenceFile
SequeceFile是Hadoop 提供的一种二进制文件,这种二进制文件直接将<key, value>对序列化到文件中。
一般对小文件可以使用这种文件合并,即将小文件的文件名作为key,文件内容作为value序列化到大文件中。
但是这个文件有一个缺点,就是它需要一个合并文件的过程,最终合并的文件会比较大,并且合并后的文件查看起来不方便,必须通过遍历才能查看里面的每一个小文件
所以这个SequenceFile 其实可以理解为把很多小文件压缩成一个大的压缩包了。
生成SequenceFile的java代码
public class SmallFileSeq { public static void main(String[] args) throws Exception{ //生成SequenceFile文件 write("D:\\smallFile","/seqFile"); //读取SequenceFile文件 read("/seqFile"); } /** * 生成SequenceFile文件 * @param inputDir 输入目录-windows目录 * @param outputFile 输出文件-hdfs文件 * @throws Exception */ private static void write(String inputDir,String outputFile) throws Exception{ //创建一个配置对象 Configuration conf = new Configuration(); //指定HDFS的地址 conf.set("fs.defaultFS","hdfs://node01:9000"); //获取操作HDFD的对象 FileSystem fileSystem = FileSystem.get(conf); //删除HDFS上的输出文件 fileSystem.delete(new Path(outputFile),true); //构造opts数组,有三个元素 /* 第一个是输出路径【文件】 第二个是key的类型 第三个是value的类型 */ SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{ SequenceFile.Writer.file(new Path(outputFile)), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class) }; //创建了一个writer实例 SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts); //指定需要压缩的文件的目录 File inputDirPath = new File(inputDir); if(inputDirPath.isDirectory()){ //获取目录中的文件 File[] files = inputDirPath.listFiles(); //迭代文件 for (File file: files) { //获取文件的全部内容 String content = FileUtils.readFileToString(file, "UTF-8"); //获取文件名 String fileName = file.getName(); Text key = new Text(fileName); Text value = new Text(content); //向SequenceFile中写入数据 writer.append(key,value); } } writer.close(); } /** * 读取SequenceFile文件 * @param inputFile SequenceFile文件路径 * @throws Exception */ private static void read(String inputFile) throws Exception{ //创建一个配置对象 Configuration conf = new Configuration(); //指定HDFS的地址 conf.set("fs.defaultFS","hdfs://node01:9000"); //创建阅读器 SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile))); Text key = new Text(); Text value = new Text(); //循环读取数据 while(reader.next(key,value)){ //输出文件名称 System.out.print("文件名:"+key.toString()+","); //输出文件内容 System.out.println("文件内容:"+value.toString()+""); } reader.close(); } }
MapFile
MapFile是排序后的SequenceFile,MapFile由两部分组成,分别是index和data,index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的偏移位置。
在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件的位置。
因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存index数据。
MapFile的java代码
public class SmallFileMap { public static void main(String[] args) throws Exception{ //生成MapFile文件 write("D:\\smallFile","/mapFile"); //读取MapFile文件 read("/mapFile"); } /** * 生成MapFile文件 * @param inputDir 输入目录-windows目录 * @param outputDir 输出目录-hdfs目录 * @throws Exception */ private static void write(String inputDir,String outputDir) throws Exception{ //创建一个配置对象 Configuration conf = new Configuration(); //指定HDFS的地址 conf.set("fs.defaultFS","hdfs://bigdata01:9000"); //获取操作HDFD的对象 FileSystem fileSystem = FileSystem.get(conf); //删除HDFS上的输出文件 fileSystem.delete(new Path(outputDir),true); //构造opts数组,有两个元素 /* 第一个是key的类型 第二个是value的类型 */ SequenceFile.Writer.Option[] opts = new SequenceFile.Writer.Option[]{ MapFile.Writer.keyClass(Text.class), MapFile.Writer.valueClass(Text.class) }; //创建了一个writer实例 MapFile.Writer writer = new MapFile.Writer(conf, new Path(outputDir), opts); //指定需要压缩的文件的目录 File inputDirPath = new File(inputDir); if(inputDirPath.isDirectory()){ //获取目录中的文件 File[] files = inputDirPath.listFiles(); //迭代文件 for (File file: files) { //获取文件的全部内容 String content = FileUtils.readFileToString(file, "UTF-8"); //获取文件名 String fileName = file.getName(); Text key = new Text(fileName); Text value = new Text(content); //向SequenceFile中写入数据 writer.append(key,value); } } writer.close(); } /** * 读取MapFile文件 * @param inputDir MapFile文件路径 * @throws Exception */ private static void read(String inputDir)throws Exception{ //创建一个配置对象 Configuration conf = new Configuration(); //指定HDFS的地址 conf.set("fs.defaultFS","hdfs://bigdata01:9000"); //创建阅读器 MapFile.Reader reader = new MapFile.Reader(new Path(inputDir),conf); Text key = new Text(); Text value = new Text(); //循环读取数据 while(reader.next(key,value)){ //输出文件名称 System.out.print("文件名:"+key.toString()+","); //输出文件内容 System.out.println("文件内容:"+value.toString()+""); } reader.close(); } }
案例: SequenceFile实现小文件的存储和计算
有十个小文件,使用sequenceFile,写一个mapreduce程序观察提交到yarn上的map任务有几个?
public class WordCountJobSeq { /** * Map阶段 */ public static class MyMapper extends Mapper<Text, Text,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyMapper.class); /** * 需要实现map函数 * 这个map函数就是可以接收<k1,v1>,产生<k2,v2> * @param k1 * @param v1 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(Text k1, Text v1, Context context) throws IOException, InterruptedException { //输出k1,v1的值 System.out.println("<k1,v1>=<"+k1.toString()+","+v1.toString()+">"); //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">"); //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容 //对获取到的每一行数据进行切割,把单词切割出来 String[] words = v1.toString().split(" "); //迭代切割出来的单词数据 for (String word : words) { //把迭代出来的单词封装成<k2,v2>的形式 Text k2 = new Text(word); LongWritable v2 = new LongWritable(1L); //把<k2,v2>写出去 context.write(k2,v2); } } } /** * Reduce阶段 */ public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ Logger logger = LoggerFactory.getLogger(MyReducer.class); /** * 针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3写出去 * @param k2 * @param v2s * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //创建一个sum变量,保存v2s的和 long sum = 0L; //对v2s中的数据进行累加求和 for(LongWritable v2: v2s){ //输出k2,v2的值 //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); //logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">"); sum += v2.get(); } //组装k3,v3 Text k3 = k2; LongWritable v3 = new LongWritable(sum); //输出k3,v3的值 //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); //logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">"); // 把结果写出去 context.write(k3,v3); } } /** * 组装Job=Map+Reduce */ public static void main(String[] args) { try{ if(args.length!=2){ //如果传递的参数不够,程序直接退出 System.exit(100); } //指定Job需要的配置参数 Configuration conf = new Configuration(); //创建一个Job Job job = Job.getInstance(conf); //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的 job.setJarByClass(WordCountJobSeq.class); //指定输入路径(可以是文件,也可以是目录) FileInputFormat.setInputPaths(job,new Path(args[0])); //指定输出路径(只能指定一个不存在的目录) FileOutputFormat.setOutputPath(job,new Path(args[1])); //指定map相关的代码 job.setMapperClass(MyMapper.class); //指定k2的类型 job.setMapOutputKeyClass(Text.class); //指定v2的类型 job.setMapOutputValueClass(LongWritable.class); //设置输入数据处理类 job.setInputFormatClass(SequenceFileInputFormat.class); //指定reduce相关的代码 job.setReducerClass(MyReducer.class); //指定k3的类型 job.setOutputKeyClass(Text.class); //指定v3的类型 job.setOutputValueClass(LongWritable.class); //提交job job.waitForCompletion(true); }catch(Exception e){ e.printStackTrace(); } } }