HBase结合MapReduce批量导入

简介:
  • Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据。
    开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出:
    这里写图片描述
    图片格式描述:
    这里写图片描述
    先介绍一个日期格式的转换:

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public  class  TestDate
    {
          public  static  void  main(String[] args)
          {
                Date d =  new  Date();
                SimpleDateFormat df =  new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );  
                String time = df.format(d);
                System.out.println(time);
          }
    }
    /*2016-05-14 13:32:24*/
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
      在Java当中,我们经常利用SimpledateFormat这个类将给定的日期转化成指定的格式。
      接下来在归纳一下Hbase结合MapReduce批量导入数据的时候,在代码当中应该注意的事项:
      ①MyReducer类继承的是TableReduce类,而不在是MapReduce中常用的Reducer类
      ②的数值类型没有什么用,通常将k3的数值类型设置为NullWritable即可
      ③只设置map函数的输出类型,不在设置reduce函数的输出类型,因为②的原因
      ④指定对输出文件格式化处理的类改为TableOutputFormat,而不在是TextOutputFormat
      ⑤输出文件的路径改为指定的表名,在Configuration中进行设定,而不在是path的方式
      ⑥如果想过jar包的方式运行程序,貌似还需要加入什么jar包,我没有整出来。
      接下来将贴出我在编程的时候第一次写出的业务代码:当然遇到了很多的问题。
    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    package  IT01;
    import  java.io.IOException;
    import  java.text.SimpleDateFormat;
    import  java.util.Date;
    import  org.apache.hadoop.conf.Configuration;
    import  org.apache.hadoop.fs.Path;
    import  org.apache.hadoop.hbase.client.Put;
    import  org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    import  org.apache.hadoop.hbase.mapreduce.TableReducer;
    import  org.apache.hadoop.io.LongWritable;
    import  org.apache.hadoop.io.NullWritable;
    import  org.apache.hadoop.io.Text;
    import  org.apache.hadoop.mapreduce.Job;
    import  org.apache.hadoop.mapreduce.Mapper;
    import  org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import  org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import  org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    public  class  HbaseApp
    {
          public  static  String path1 =  "hdfs://hadoop80:9000/FlowData.txt" ;
          public  static  void  main(String[] args)  throws  Exception
          {
                Configuration conf =  new  Configuration();
                conf.set( "hbaser.rootdir" , "hdfs://hadoop80:9000/hbase" );
                conf.set( "hbase.zookeeper.quorum" , "hadoop80" );
                conf.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log" ); //在这里需要指定表的名字:相当于输出文件的路径
                conf.set( "dfs.socket.timeout" , "2000" );
     
                Job job =  new  Job(conf, "HbaseApp" );
                FileInputFormat.setInputPaths(job,  new  Path(path1));
                job.setInputFormatClass(TextInputFormat. class );
                job.setMapperClass(MyMapper. class );
                job.setMapOutputKeyClass(Text. class );
                job.setMapOutputValueClass(Text. class );
     
                job.setNumReduceTasks( 1 );
                job.setPartitionerClass(HashPartitioner. class );
     
                job.setReducerClass(MyReducer. class );
    //         job.setOutputKeyClass(Text.class);
    //         job.setOutputValueClass(NullWritable.class);
                job.setOutputFormatClass(TableOutputFormat. class );
    //         FileOutputFormat.setOutputPath(job, new Path(path2));
                job.waitForCompletion( true );
          }
          public  static  class  MyMapper  extends  Mapper{
                 protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException
                 {
                       String[] splited = v1.toString().split( "\t" );
                       String reportTime = splited[ 0 ];
                       String msisdn = splited[ 1 ];
                       Date date =  new  Date(Long.parseLong(reportTime));
                       String time = DateConvert.dateParse(date);
                       String rowkey = msisdn+ ":" +time; //获取到行健
                       context.write( new  Text(rowkey), new  Text(v1.toString()));      
                 }
          }
          public  static  class  MyReducer  extends  TableReducer{
                 protected  void  reduce(Text k2, Iterablev2s,Context context) throws  IOException, InterruptedException
                 {
                       for  (Text v2 : v2s)
                      {
                          String[] splited = v2.toString().split( "\t" );
                          /**添加记录的时候需要指定行健、列族、列名、数值***/
                          Put put =  new  Put(k2.toString().getBytes());
                          put.add( "cf" .getBytes(), "reportTime" .getBytes(), splited[ 0 ].getBytes());
                          put.add( "cf" .getBytes(), "msisdn" .getBytes(), splited[ 1 ].getBytes());
                          put.add( "cf" .getBytes(), "apmac1" .getBytes(), splited[ 2 ].getBytes());
                          put.add( "cf" .getBytes(), "apmac2" .getBytes(), splited[ 3 ].getBytes());
                          put.add( "cf" .getBytes(), "host" .getBytes(), splited[ 4 ].getBytes());
                          put.add( "cf" .getBytes(), "sitetype" .getBytes(), splited[ 5 ].getBytes());
                          put.add( "cf" .getBytes(), "upPackNum" .getBytes(), splited[ 6 ].getBytes());
                          put.add( "cf" .getBytes(), "downPackNum" .getBytes(), splited[ 7 ].getBytes());
                          put.add( "cf" .getBytes(), "upPayLoad" .getBytes(), splited[ 8 ].getBytes());
                          put.add( "cf" .getBytes(), "downPayLoad" .getBytes(), splited[ 9 ].getBytes());
                          put.add( "cf" .getBytes(), "httpstatus" .getBytes(), splited[ 10 ].getBytes());
                          context.write(NullWritable.get(),put);
                      }       
                 }
          }
    }
    class  DateConvert
    {
         public  static  String dateParse(Date  date)
         {
              SimpleDateFormat df =  new  SimpleDateFormat( "yyyyMMddhhmmss" ); //构造一个日期解析器
              return  df.format(date); 
         }
    }

    程序运行完之后:显示如下异常NumberFormatException
    这里写图片描述
    显示的是数字格式异常, 于是我在map函数当中又加了一个throws NumberFormatException

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
                 protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException,NumberFormatException
                 {
                       String[] splited = v1.toString().split( "\t" );
                       String reportTime = splited[ 0 ];
                       String msisdn = splited[ 1 ];
                       Date date =  new  Date(Long.parseLong(reportTime));
                       String time = DateConvert.dateParse(date);
                       String rowkey = msisdn+ ":" +time; //获取到行健
                       context.write( new  Text(rowkey), new  Text(v1.toString()));      
                 }

    但是这样我发现也不对,因为当我追踪Mapp这个类的源代码时,我发现父类的map方法并没有抛出NumberFormatException这个异常,根据子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围,我又将上面这段代码用try——catch这种异常处理方式进行处理:

    ?
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    protected  void  map(LongWritable k1, Text v1,Context context) throws  IOException, InterruptedException
                 {
                       try
                       {
                           String[] splited = v1.toString().split( "\t" );
                           String reportTime = splited[ 0 ];
                           String msisdn = splited[ 1 ];
                           Date date =  new  Date(Long.parseLong(reportTime));
                           String time = DateConvert.dateParse(date);
                           String rowkey = msisdn+ ":" +time; //获取到行健
                           context.write( new  Text(rowkey), new  Text(v1.toString()));      
                       } catch (Exception e)
                       {
                           Counter counter = context.getCounter( "NumberExceptionNum" "num" );
                           counter.increment(1L);
                       }
                 }

    当我将代码改成这样的时候,此时程序并没有显示抛出NumberFormatException这个异常,说明异常得到了处理,但是当我去查看Hbase数据的时候,发现HDFS中的日志数据并没有导入到Hbase数据库中,于是我又查看了一下MapReduce的运行日志:
    这里写图片描述
    也就是我的22行数据在map函数中当中并没有输出,这个问题就匪夷所思了,为什么22行数据都会抛出数字格式异常呢,而且都没有输出,于是我想到可能是SimpleDateFZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcm1hdNXiuPbA4LXEzsrM4qOs09rKx87S09a/qsq8uPfW1rDZtsijrLeiz9bN+MnPuty24M7E1cK2vMrHxfrF0NXiuPbA4LXEo6zX7tbV1tXT2tXStb3By87KzOK1xL3ivva3vbC4o6zTw3RyaW0oKdXiuPa3vbeoyKWz/dfWt/u0rsewuvO1xL/VuPG8tL/JoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { try { String[] splited = v1.toString().split("\t"); String reportTime = splited[0].trim(); String msisdn = splited[1].trim(); Date date = new Date(Long.parseLong(reportTime)); String time = DateConvert.dateParse(date); String rowkey = msisdn+":"+time;//获取到行健 context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e) { Counter counter = context.getCounter("NumberExceptionNum", "num"); counter.increment(1L); } }

    于是我又开始运行程序,但是当我运行完之后,从MapReduce的计数器当中,我发现第一条数据文本并没有导入:因为数字格式异常的这个原因估计在运行过程中被终止了。下面是计数器的显示:
    这里写图片描述
    于是我又想到了一个解决方案,将第一条数据多复制一条即可,然后重写将数据上传到HDFS中。
    此时在一次 运行程序,显示正确,此时数据也全部导入到Hbase数据库中。
    这里写图片描述
    Hbase中数据查看核实:
    这里写图片描述
    将HDFS中的数据通过MapReduce导入到Hbase数据库时,总结如下:
    核心步骤:先将数据文件上传到HDFS,然后用MapReduce进行处理,将处理后的数据插入到 hbase中
    注意事项:
    1>子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围
    2>用trim()这个方法可以去除字符串前后的空格,换行符。
    3>既然第一条数据总是显示数字格式异常,将第一条数据复制为2份即可。



本文转自 SimplePoint 51CTO博客,原文链接:http://blog.51cto.com/2226894115/1896517,如需转载请自行联系原作者
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
14天前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
26 1
|
14天前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
27 0
|
2月前
|
Shell 分布式数据库 Hbase
使用 HBase Shell 进行数据的批量导入和导出
使用 HBase Shell 进行数据的批量导入和导出
235 6
|
4月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
43 0
|
10月前
|
分布式计算 分布式数据库 Hbase
99 MapReduce操作Hbase
99 MapReduce操作Hbase
72 0
|
12月前
|
SQL 分布式计算 分布式数据库
大数据Sqoop借助Hive将Mysql数据导入至Hbase
大数据Sqoop借助Hive将Mysql数据导入至Hbase
207 0
|
资源调度 Java Linux
Hbase实践将所有info列簇下的name列导入到另一张表中
Hbase实践将所有info列簇下的name列导入到另一张表中
|
分布式计算 分布式数据库 Hbase
当HBase遇上MapReduce头歌答案
当HBase遇上MapReduce头歌答案
589 0
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
128 0
|
3月前
|
存储 分布式计算 Hadoop
Hadoop节点文件存储HBase设计目的
【6月更文挑战第2天】
45 6
下一篇
云函数