HBase与MR的集成

简介: 笔记
   MR   <--- 读取  HBase的数据
   MR   ---> 写入  HBase(使用场景:数据迁移)
   HBase ->  MR  -> HBase 读取和写入(使用场景:索引表的建立)

1.MR与HBase集成所需要的jar包

bin/hbase mapredcp

2.运行环境

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-1.2.0-cdh5.9.3.jar

3.编写MR程序

执行流程:将stu写入到stuMR表中

stu ->  mr  -> stuMR
package com.kfk.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/11/10
 * @time : 7:07 下午
 *
 * 需求分析,从数据表stu读取info到新表stuMR
 */
public class HBaseMR extends Configured implements Tool {
    /**
     * map
     * 一.Mapper class extends TableMapper<KEYOUT输出的Key的类型, VALUEOUT输出的Value的类型>
     * 原版的Mapper程序是有输入的KV类型,和输出的KV类型四个参数,源码:extends Mapper<ImmutableBytesWritable,
     * Result, KEYOUT, VALUEOUT>
     * Put类型为hbase中定义的类型,便于作为Reducer的输入类型,根据reducer输入类型可知
     */
    public static class MyMapper extends TableMapper<Text, Put> {
        private  Text mapOutputKey = new Text();
        public void map(ImmutableBytesWritable rowkey, Result value, Context context) throws InterruptedException, IOException {
            // set outputRowKey
            mapOutputKey.set(Bytes.toString(rowkey.get()));
            // 通过rowKey创建put对象
            Put put = new Put(rowkey.get());
            // 迭代以获取cell数据
            for (Cell cell:value.rawCells()){
                // 实际业务中不一定会是全表输出,可以选择性输出
                if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    if ("username".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                    else if ("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                }
            }
            context.write(mapOutputKey,put);
        }
    }
    /**
     * reduce
     * 二.Reducer calss extends TableReducer<KEYIN, VALUEIN, KEYOUT>
     * 输出key 类型为ImmutableBytesWritable 实现writeableComparable的字节数组
     * 输出 value 类型为 Mutation 是 delete put increment append 的父类
     */
    public static class MyReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
        public void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            // 从得到的put中得到数据
            for (Put put:values){
                // 往外写数据
                context.write(null,put);
            }
        }
    }
    /**
     * run
     * @param args
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1) get conf
        Configuration configuration = this.getConf();
        // 2) create job
        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        // set input and set mapper
        TableMapReduceUtil.initTableMapperJob(
                "stu",        // input table
                scan,               // Scan instance to control CF and attribute selection
                MyMapper.class,     // mapper class
                Text.class,         // mapper output key
                Put.class,  // mapper output value
                job);
        // set reducer and output
        TableMapReduceUtil.initTableReducerJob(
                "stuMR",        // output table
                MyReducer.class,    // reducer class
                job);
        // 4) commit,执行job
        boolean isSuccess = job.waitForCompletion(true);
        // 如果正常执行返回0,否则返回1
        return (isSuccess) ? 0 : 1;
    }
    public static void main(String[] args) {
        Configuration configuration = HBaseConfiguration.create();
        try {
            // 调用run方法
            int status = ToolRunner.run(configuration,new HBaseMR(),args);
            // 退出程序
            System.exit(status);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在本地运行的时候需要将core-site.xml文件的HDFS部分屏蔽掉

<configuration>
<!--      <property>-->
<!--        <name>fs.defaultFS</name>-->
<!--        <value>hdfs://bigdata-pro-m01:9000</value>-->
<!--        </property>-->
        <property>
        <name>hadoop.http.staticuser.user</name>
        <value>caizhengjie</value>
        </property>
</configuration>

原数据:

hbase(main):003:0> scan 'stu'
ROW                                                             COLUMN+CELL                                                                                                                                                                             
 001                                                            column=info:addres, timestamp=1604928754019, value=guangzhou                                                                                                                            
 001                                                            column=info:age, timestamp=1604928702206, value=20                                                                                                                                      
 001                                                            column=info:username, timestamp=1604928590845, value=alex                                                                                                                               
 002                                                            column=info:age, timestamp=1604955412618, value=30                                                                                                                                      
 002                                                            column=info:username, timestamp=1604955441414, value=ben                                                                                                                                
 003                                                            column=info:addres, timestamp=1604997979803, value=beijing                                                                                                                              
 003                                                            column=info:age, timestamp=1604997979803, value=32                                                                                                                                      
 003                                                            column=info:username, timestamp=1604997979803, value=jack                                                                                                                               
3 row(s) in 0.0360 seconds

运行结果:

hbase(main):002:0> scan 'stuMR'
ROW                                                             COLUMN+CELL                                                                                                                                                                             
 001                                                            column=info:age, timestamp=1604928702206, value=20                                                                                                                                      
 001                                                            column=info:username, timestamp=1604928590845, value=alex                                                                                                                               
 002                                                            column=info:age, timestamp=1604955412618, value=30                                                                                                                                      
 002                                                            column=info:username, timestamp=1604955441414, value=ben                                                                                                                                
 003                                                            column=info:age, timestamp=1604997979803, value=32                                                                                                                                      
 003                                                            column=info:username, timestamp=1604997979803, value=jack                                                                                                                               
3 row(s) in 0.1780 seconds

4.打包发布运行命令:

export HADOOP_HOME=/opt/modules/hadoop
export HBASE_HOME=/opt/modules/hbase
HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar /opt/jars/hbasemr.jar

由于我本地maven加载的是hbase-1.2.0版本,集群用的是hbase-1.2.0-cdh5.9.3版本,所以在运行jar包的时候会出现版本冲突:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.setCaching(I)Lorg/apache/hadoop/hbase/client/Scan;
        at com.kfk.hbase.HBaseMR.run(HBaseMR.java:105)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.kfk.hbase.HBaseMR.main(HBaseMR.java:136)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

我的解决方案是将hbase-client-1.2.0.jar移到hbase/lib目录下,将hbase-client-1.2.0-cdh5.9.3.jar改名,或者maven直接加载hbase的cdh版本,和集群的一致。


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
7月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
7月前
|
缓存 分布式计算 NoSQL
分布式NoSQL列存储数据库Hbase_MR集成Hbase:读写Hbase规则(九)
分布式NoSQL列存储数据库Hbase_MR集成Hbase:读写Hbase规则(九)
66 0
|
XML 缓存 分布式计算
集成 Oozie 服务&amp;集成 Hbase 服务 | 学习笔记
快速学习 集成 Oozie 服务&amp;集成 Hbase 服务
106 0
集成 Oozie 服务&amp;集成 Hbase 服务 | 学习笔记
|
SQL 分布式计算 Java
Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
Hbase的客户端有原生java客户端,Hbase Shell,Thrift,Rest,Mapreduce,WebUI等等。 下面是这几种客户端的常见用法。
1624 0
Hbase入门(五)——客户端(Java,Shell,Thrift,Rest,MR,WebUI)
|
SQL 分布式计算 关系型数据库
|
SQL Java 分布式数据库
Hive与HBase的集成
Hive提供了与HBase的集成,使得能够在HBase表上使用HQL语句进行查询 插入操作以及进行Join和Union等复杂查询、同时也可以将hive表中的数据映射到Hbase中。
|
2月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
26天前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
49 0