HBase结合MapReduce批量导入

简介:
  • Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据。
    开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出:
    这里写图片描述
    图片格式描述:
    这里写图片描述
    先介绍一个日期格式的转换:

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public  class  TestDate
    {
          public  static  void  main(String[] args)
          {
                Date d =  new  Date();
                SimpleDateFormat df =  new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );  
                String time = df.format(d);
                System.out.println(time);
          }
    }
    /*2016-05-14 13:32:24*/
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
      在Java当中,我们经常利用SimpledateFormat这个类将给定的日期转化成指定的格式。
      接下来在归纳一下Hbase结合MapReduce批量导入数据的时候,在代码当中应该注意的事项:
      ①MyReducer类继承的是TableReduce类,而不在是MapReduce中常用的Reducer类
      ②的数值类型没有什么用,通常将k3的数值类型设置为NullWritable即可
      ③只设置map函数的输出类型,不在设置reduce函数的输出类型,因为②的原因
      ④指定对输出文件格式化处理的类改为TableOutputFormat,而不在是TextOutputFormat
      ⑤输出文件的路径改为指定的表名,在Configuration中进行设定,而不在是path的方式
      ⑥如果想过jar包的方式运行程序,貌似还需要加入什么jar包,我没有整出来。
      接下来将贴出我在编程的时候第一次写出的业务代码:当然遇到了很多的问题。
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    package  IT01;
    import  java.io.IOException;
    import  java.text.SimpleDateFormat;
    import  java.util.Date;
    import  org.apache.hadoop.conf.Configuration;
    import  org.apache.hadoop.fs.Path;
    import  org.apache.hadoop.hbase.client.Put;
    import  org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import  org.apache.hadoop.hbase.mapreduce.TableReducer;
    import  org.apache.hadoop.io.LongWritable;
    import  org.apache.hadoop.io.NullWritable;
    import  org.apache.hadoop.io.Text;
    import  org.apache.hadoop.mapreduce.Job;
    import  org.apache.hadoop.mapreduce.Mapper;
    import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import  org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    public  class  HbaseApp
    {
          public  static  String path1 =  "hdfs://hadoop80:9000/FlowData.txt" ;
          public  static  void  main(String[] args)  throws  Exception
          {
                Configuration conf =  new  Configuration();
                conf.set( "hbaser.rootdir" , "hdfs://hadoop80:9000/hbase" );
                conf.set( "hbase.zookeeper.quorum" , "hadoop80" );
                conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log" ); //在这里需要指定表的名字:相当于输出文件的路径
                conf.set( "dfs.socket.timeout" , "2000" );
     
                Job job =  new  Job(conf, "HbaseApp" );
                FileInputFormat.setInputPaths(job,  new  Path(path1));
                job.setInputFormatClass(TextInputFormat. class );
                job.setMapperClass(MyMapper. class );
                job.setMapOutputKeyClass(Text. class );
                job.setMapOutputValueClass(Text. class );
     
                job.setNumReduceTasks( 1 );
                job.setPartitionerClass(HashPartitioner. class );
     
                job.setReducerClass(MyReducer. class );
    //         job.setOutputKeyClass(Text.class);
    //         job.setOutputValueClass(NullWritable.class);
                job.setOutputFormatClass(TableOutputFormat. class );
    //         FileOutputFormat.setOutputPath(job, new Path(path2));
                job.waitForCompletion( true );
          }
          public  static  class  MyMapper  extends  Mapper{
                 protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException
                 {
                       String[] splited = v1.toString().split( "\t" );
                       String reportTime = splited[ 0 ];
                       String msisdn = splited[ 1 ];
                       Date date =  new  Date(Long.parseLong(reportTime));
                       String time = DateConvert.dateParse(date);
                       String rowkey = msisdn+ ":" +time; //获取到行健
                       context.write( new  Text(rowkey), new  Text(v1.toString()));      
                 }
          }
          public  static  class  MyReducer  extends  TableReducer{
                 protected  void  reduce(Text k2, Iterablev2s,Context context) throws  IOException, InterruptedException
                 {
                       for  (Text v2 : v2s)
                      {
                          String[] splited = v2.toString().split( "\t" );
                          /**添加记录的时候需要指定行健、列族、列名、数值***/
                          Put put =  new  Put(k2.toString().getBytes());
                          put.add( "cf" .getBytes(), "reportTime" .getBytes(), splited[ 0 ].getBytes());
                          put.add( "cf" .getBytes(), "msisdn" .getBytes(), splited[ 1 ].getBytes());
                          put.add( "cf" .getBytes(), "apmac1" .getBytes(), splited[ 2 ].getBytes());
                          put.add( "cf" .getBytes(), "apmac2" .getBytes(), splited[ 3 ].getBytes());
                          put.add( "cf" .getBytes(), "host" .getBytes(), splited[ 4 ].getBytes());
                          put.add( "cf" .getBytes(), "sitetype" .getBytes(), splited[ 5 ].getBytes());
                          put.add( "cf" .getBytes(), "upPackNum" .getBytes(), splited[ 6 ].getBytes());
                          put.add( "cf" .getBytes(), "downPackNum" .getBytes(), splited[ 7 ].getBytes());
                          put.add( "cf" .getBytes(), "upPayLoad" .getBytes(), splited[ 8 ].getBytes());
                          put.add( "cf" .getBytes(), "downPayLoad" .getBytes(), splited[ 9 ].getBytes());
                          put.add( "cf" .getBytes(), "httpstatus" .getBytes(), splited[ 10 ].getBytes());
                          context.write(NullWritable.get(),put);
                      }       
                 }
          }
    }
    class  DateConvert
    {
         public  static  String dateParse(Date  date)
         {
              SimpleDateFormat df =  new  SimpleDateFormat( "yyyyMMddhhmmss" ); //构造一个日期解析器
              return  df.format(date); 
         }
    }

    程序运行完之后:显示如下异常NumberFormatException
    这里写图片描述
    显示的是数字格式异常, 于是我在map函数当中又加了一个throws NumberFormatException

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
                 protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException,NumberFormatException
                 {
                       String[] splited = v1.toString().split( "\t" );
                       String reportTime = splited[ 0 ];
                       String msisdn = splited[ 1 ];
                       Date date =  new  Date(Long.parseLong(reportTime));
                       String time = DateConvert.dateParse(date);
                       String rowkey = msisdn+ ":" +time; //获取到行健
                       context.write( new  Text(rowkey), new  Text(v1.toString()));      
                 }

    但是这样我发现也不对,因为当我追踪Mapp这个类的源代码时,我发现父类的map方法并没有抛出NumberFormatException这个异常,根据子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围,我又将上面这段代码用try——catch这种异常处理方式进行处理:

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException
                 {
                       try
                       {
                           String[] splited = v1.toString().split( "\t" );
                           String reportTime = splited[ 0 ];
                           String msisdn = splited[ 1 ];
                           Date date =  new  Date(Long.parseLong(reportTime));
                           String time = DateConvert.dateParse(date);
                           String rowkey = msisdn+ ":" +time; //获取到行健
                           context.write( new  Text(rowkey), new  Text(v1.toString()));      
                       } catch (Exception e)
                       {
                           Counter counter = context.getCounter( "NumberExceptionNum" "num" );
                           counter.increment(1L);
                       }
                 }

    当我将代码改成这样的时候,此时程序并没有显示抛出NumberFormatException这个异常,说明异常得到了处理,但是当我去查看Hbase数据的时候,发现HDFS中的日志数据并没有导入到Hbase数据库中,于是我又查看了一下MapReduce的运行日志:
    这里写图片描述
    也就是我的22行数据在map函数中当中并没有输出,这个问题就匪夷所思了,为什么22行数据都会抛出数字格式异常呢,而且都没有输出,于是我想到可能是SimpleDateFZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcm1hdNXiuPbA4LXEzsrM4qOs09rKx87S09a/qsq8uPfW1rDZtsijrLeiz9bN+MnPuty24M7E1cK2vMrHxfrF0NXiuPbA4LXEo6zX7tbV1tXT2tXStb3By87KzOK1xL3ivva3vbC4o6zTw3RyaW0oKdXiuPa3vbeoyKWz/dfWt/u0rsewuvO1xL/VuPG8tL/JoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { try { String[] splited = v1.toString().split("\t"); String reportTime = splited[0].trim(); String msisdn = splited[1].trim(); Date date = new Date(Long.parseLong(reportTime)); String time = DateConvert.dateParse(date); String rowkey = msisdn+":"+time;//获取到行健 context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e) { Counter counter = context.getCounter("NumberExceptionNum", "num"); counter.increment(1L); } }

    于是我又开始运行程序,但是当我运行完之后,从MapReduce的计数器当中,我发现第一条数据文本并没有导入:因为数字格式异常的这个原因估计在运行过程中被终止了。下面是计数器的显示:
    这里写图片描述
    于是我又想到了一个解决方案,将第一条数据多复制一条即可,然后重写将数据上传到HDFS中。
    此时在一次 运行程序,显示正确,此时数据也全部导入到Hbase数据库中。
    这里写图片描述
    Hbase中数据查看核实:
    这里写图片描述
    将HDFS中的数据通过MapReduce导入到Hbase数据库时,总结如下:
    核心步骤:先将数据文件上传到HDFS,然后用MapReduce进行处理,将处理后的数据插入到 hbase中
    注意事项:
    1>子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围
    2>用trim()这个方法可以去除字符串前后的空格,换行符。
    3>既然第一条数据总是显示数字格式异常,将第一条数据复制为2份即可。



本文转自 SimplePoint 51CTO博客,原文链接:http://blog.51cto.com/2226894115/1896517,如需转载请自行联系原作者
相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
1月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
32 0
|
7月前
|
分布式计算 分布式数据库 Hbase
99 MapReduce操作Hbase
99 MapReduce操作Hbase
49 0
|
10月前
|
资源调度 Java Linux
Hbase实践将所有info列簇下的name列导入到另一张表中
Hbase实践将所有info列簇下的name列导入到另一张表中
|
分布式计算 分布式数据库 Hbase
当HBase遇上MapReduce头歌答案
当HBase遇上MapReduce头歌答案
548 0
|
存储 缓存 分布式计算
HBase MapReduce_4 | 学习笔记
快速学习 HBase MapReduce_4
82 0
|
存储 分布式计算 Hadoop
HBase MapReduce_3 | 学习笔记
快速学习 HBase MapReduce_3
101 0
HBase MapReduce_3 | 学习笔记
|
Java 分布式数据库 开发者
HBase MapReduce_2 | 学习笔记
快速学习 HBase MapReduce_2
93 0
HBase MapReduce_2 | 学习笔记
|
分布式计算 分布式数据库 开发者
HBase MapReduce_1| 学习笔记
快速学习 HBase MapReduce_1
115 0
|
存储 分布式计算 分布式数据库
【HBase】(九)MapReduce 操作 Hbase
【HBase】(九)MapReduce 操作 Hbase
380 0
|
分布式计算 Hadoop 分布式数据库
通过Datax将CSV文件导入Hbase,导入之前的CSV文件大小和导入之后的Hadoop分布式文件大小对比引入的思考
由于项目需要做系统之间的离线数据同步,因为实时性要求不高,因此考虑采用了阿里的datax来进行同步。在同步之前,将数据导出未csv文件,因为需要估算将来的hbase运行的hadoop的分布式文件系统需要占用多少磁盘空间,因此想到了需要做几组测试。
2195 0