MapReduce的方式进行HBase向HDFS导入和导出

简介:

附录代码:

HBase---->HDFS

复制代码
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.hbase.HBaseConfiguration;
 6 import org.apache.hadoop.hbase.client.Result;
 7 import org.apache.hadoop.hbase.client.Scan;
 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 9 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10 import org.apache.hadoop.hbase.mapreduce.TableMapper;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16 
17 public class HBase2HDFS {
18 
19     public static void main(String[] args) throws Exception {
20         Configuration conf = HBaseConfiguration.create();
21         Job job = Job.getInstance(conf, HBase2HDFS.class.getSimpleName());
22         job.setJarByClass(HBase2HDFS.class);
23         //MR有输入和输出,输入一般是FileInputFormat等...但是在HBase中需要用到一个特殊的工具类是TableMapReduceUtil
24         TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), HBase2HDFSMapper.class,
25                                             Text.class, Text.class, job);
26         //HBase中的具体操作打到MR的job中.
27         TableMapReduceUtil.addDependencyJars(job);
28         job.setMapperClass(HBase2HDFSMapper.class);
29         job.setMapOutputKeyClass(Text.class);
30         job.setMapOutputValueClass(Text.class);
31         job.setOutputFormatClass(TextOutputFormat.class);
32         FileOutputFormat.setOutputPath(job, new Path(args[1]));
33         //FileOutputFormat.setOutputPath(job, new Path("/t1-out"));
34         job.setNumReduceTasks(0);
35         job.waitForCompletion(true);
36         
37         
38     }
39     static class HBase2HDFSMapper extends TableMapper<Text, Text>{
40         private Text rowKeyText = new Text();
41         private Text value = new Text();
42         
43         //这个TableMapper中的两个泛型是Map阶段的输出..HBase中的数据要想进入HBase,几乎都用引号引起来.
44         //TableMapper是Mapper类的一个子类.这个类用来定义前面的两个泛型参数.
45         @Override
46         protected void map(
47                 ImmutableBytesWritable key,
48                 Result result,
49                 Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
50                 throws IOException, InterruptedException {
51             //结果都在result对象,用raw方法从result对象中找到数据. 这个raw()方法已经过时了.
52             /*
53             KeyValue[] raw = result.raw();
54             for (KeyValue keyValue : raw) {
55                 keyValue.getValue();
56             }
57             */
58             /*
59              * 想输出的数据格式如下: 1 zhangsan 13  (行键,name,age)
60              *                     2 lisi 14
61              */
62             
63             //要想精确的获得某一列的值,要根据行键,列族,列的时间戳.
64             //getColumnLatestCell 是获得最新的时间戳的值 相当于时间戳已经定义好了.
65             byte[] nameBytes = result.getColumnLatestCell("cf".getBytes(), "name".getBytes()).getValue();
66             byte[] ageBytes = result.getColumnLatestCell("cf".getBytes(), "age".getBytes()).getValue();
67             
68             rowKeyText.set(key.get());
69             value.set(new String(nameBytes) + "\t" + new String(ageBytes));
70             context.write(new Text(key.get()), value);
71             //这里已经把数据搞成了 1 name age 的形式....就不需要写Reduce
72         }
73     }
74 }
复制代码

 

HDFS---->HBase 通过MR导入到HBase

复制代码
 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.hbase.HBaseConfiguration;
 5 import org.apache.hadoop.hbase.client.Mutation;
 6 import org.apache.hadoop.hbase.client.Put;
 7 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 9 import org.apache.hadoop.hbase.mapreduce.TableReducer;
10 import org.apache.hadoop.io.LongWritable;
11 import org.apache.hadoop.io.NullWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapreduce.Job;
14 import org.apache.hadoop.mapreduce.Mapper;
15 import org.apache.hadoop.mapreduce.Reducer;
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 
19 public class HDFS2HBaseImport {
20 
21     public static void main(String[] args) throws Exception {
22         Configuration conf = HBaseConfiguration.create();
23         conf.set(TableOutputFormat.OUTPUT_TABLE, args[0]);
24         
25         Job job = Job.getInstance(conf, HDFS2HBaseImport.class.getSimpleName());
26         job.setJarByClass(HDFS2HBaseImport.class);
27         
28         //数据到底放到哪一张表中,还是要用到TableMapReduceUtil类.
29         TableMapReduceUtil.addDependencyJars(job);
30         job.setMapperClass(HDFS2HBaseMapper.class);
31         job.setMapOutputKeyClass(Text.class);
32         job.setMapOutputValueClass(Text.class);
33         job.setOutputFormatClass(TextOutputFormat.class);
34         job.setReducerClass(HDFS2HBaseReducer.class);
35         job.setOutputFormatClass(TableOutputFormat.class);
36         FileInputFormat.setInputPaths(job, args[1]);
37         job.waitForCompletion(true);        
38     }
39     
40     static class HDFS2HBaseMapper extends Mapper<LongWritable, Text, Text, Text>{
41         private Text rowKeyText = new Text();
42         private Text value = new Text();
43         
44         @Override
45         protected void map(LongWritable key, Text text,
46                 Mapper<LongWritable, Text, Text, Text>.Context context)
47                 throws IOException, InterruptedException {
48             String[] splits = text.toString().split("\t");
49             rowKeyText.set(splits[0]);
50             value.set(splits[1] + "\t" + splits[2]);//name\tage
51             context.write(rowKeyText, value);
52         }
53     }
54     //Reduce继承的是和在导出的时候Map extends TableMapper 对应的  因为导入的是HBase中,所以后面的参数用NullWritable代替
55     static class HDFS2HBaseReducer extends TableReducer<Text, Text, NullWritable> {
56         @Override
57         protected void reduce(Text k2, Iterable<Text> v2s,
58                 Reducer<Text, Text, NullWritable, Mutation>.Context context)
59                 throws IOException, InterruptedException {
60             //向HBase中插入数据一定要用到Put对象.
61             Put put = new Put(k2.getBytes());
62             
63             for (Text text : v2s) {
64                 String[] splits = text.toString().split("\t");
65                 //加载列和对应的值
66                 put.add("cf".getBytes(), "name".getBytes(), splits[0].getBytes());
67                 put.add("cf".getBytes(), "age".getBytes(), splits[1].getBytes());
68                 context.write(NullWritable.get(), put);//一个参数是key,一个是对应的value.
69                 //导入HBase不需要key...直接用NullWritable对象和封装好数据的put对象.
70             }
71         }
72     }
73 }
复制代码

 


本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/5583135.html,如需转载请自行联系原作者

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
14天前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
61 3
|
14天前
|
分布式计算 资源调度 Hadoop
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
Hadoop-05-Hadoop集群 集群WordCount 超详细 真正的分布式计算 上传HDFS MapReduce计算 YRAN查看任务 上传计算下载查看
34 1
|
14天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
54 0
|
14天前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
27 0
|
14天前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
36 0
|
2月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
49 1
|
3月前
|
Shell 分布式数据库 Hbase
使用 HBase Shell 进行数据的批量导入和导出
使用 HBase Shell 进行数据的批量导入和导出
499 6
|
2月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
37 0
|
13天前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
78 6
|
14天前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
40 3