分布式NoSQL列存储数据库Hbase_MR集成Hbase:读写Hbase规则(九)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式NoSQL列存储数据库Hbase_MR集成Hbase:读写Hbase规则(九)

分布式NoSQL列存储数据库Hbase(九)

知识点01:课程回顾

  1. 简述Hbase中hbase:meta表的功能及存储内容
  • 功能:记录表的元数据信息
  • 内容
  • rowkey:Hbase中每张表的每个Region的名称
  • Region名称
  • Region范围:startKey,stopKey
  • Region所在的RegionServer地址
  1. 简述Hbase中数据写入流程
  • step1:客户端连接ZK,获取meta表所在的地址,读取meta表数据
  • step2:根据表名,获取当前要操作的表的所有region的信息
region名称前缀:表名,startKey
  • step3:根据Rowkey,判断具体操作哪个Region
  • step4:获取对应Region的地址,请求对应的RegionServer
  • step5:RegionServer接受请求,将数据写入Region,先写入WAL
  • step6:根据列族来判断写入哪个Store中
  • 写入Store的memstore中
  1. 简述Hbase中数据读取流程
  • step1:客户端连接ZK,获取meta表所在的地址,读取meta表数据
  • step2:根据表名,获取当前要操作的表的所有region的信息
region名称前缀:表名,startKey
  • step3:根据Rowkey,判断具体操作哪个Region
  • step4:获取对应Region的地址,请求对应的RegionServer
  • step5::RegionServer接受请求,从Region中读取数据
  • 先读memstore
  • 判断查询数据是否做了缓存,如果做了缓存:就读BlockCache
  • 最后读StoreFile
  • 如果开启了缓存,查询结果会放入BlockCache
  1. 简述LSM模型的流程设计
  • step1:不论什么数据操作:增删改,都只对内存进行操作
  • 删除和修改都是写入操作来代替的
  • 内存写入成功,就返回
  • 顺序读写内存
  • 顺序读写磁盘
  • 随机读写内存:memStore,BlockCache
  • 随机读写磁盘:StoreFile
  • step2:数据写入内存,达到一定阈值,会将内存的数据写入磁盘
  • step3:定期将所有小文件和并为大文件,加快检索的效率
  1. 简述Hbase中的Flush、Compaction、Split的功能
  • Flush:将memstore中的数据刷写到HDFS,变成StoreFile文件
  • 2.0之前
  • memstore:单个memstore达到128M,就会Flush
  • 所有的memstore总存储达到95%,就会触发整个RS的Flush
  • 2.0之后
  • 设置一个水位线:max(128 / 列族个数,16)
  • 高于水位线的memstore:就会flush
  • 低于水位线的memstore:不会flush
  • 所有都低于,都flush
  • Compaction:用于将storefile文件进行合并,并且删除过期数据【被标记为更新和删除的数据】
  • minor compact:轻量级合并,将最早的几个小的storefile文件进行合并,不会删除过期数据
  • major compact:重量级合并,将所有的storefile合并为一个storefile,会删除过期数据
  • 默认每7天执行一次
  • 2.0版本开始:in-memory-compact:在memstore中将数据提前进行合并
  • none:不开启
  • basic:只合并,不删除过期数据
  • eager:合并并且删除过期数据
  • adapter:合并,根据数据量来判断是否自动删除过期数据
  • Split:为了避免一个Region存储的数据量过大,导致负载过高,通过Split将一个region分为两个region,分摊负载
  • 0.94之前:判断region中存储的文件大小是否达到10GB
  • 2.0之前:根据Region个数,来计算划分的条件,达到4个以后,都是按照10GB来分
min(10GB,256 * region个数3次方)
  • 2.0之后:根据region的个数做了判断
  • region的个数为1个:256M来划分
  • region的个数超过1个:10GB来划分

知识点02:课程目标

  1. MapReduce读写Hbase
  • 重点:记住读写的规则
  • Spark中读写Hbase规则与MapReduce的规则是一模一样的
  • 应用:一般在工作中都是使用Spark来读写Hbase,如果是MapReduce可以使用Hive来实现
  1. BulkLoad的实现【了解】
  • 问题:大量的数据并发往Hbase中写入,会导致内存和磁盘的利用率非常高,会影响其他程序的性能
  • Hbase中提供两种写入数据的方式
  • Put:直接写入memstore
  • BulkLoad:先将数据转换为storefile文件,将storefile文件直接放入Hbase表的目录中
  • 实现方式
  • 自己开发代码
  • 使用Hbase中的工具类来实现
  1. 协处理的介绍【了解】
  • 什么是协处理器,分类
  • 怎么开发协处理器:自己开发协处理器,实现索引表与原表数据同步
  1. Hbase中的优化方案【重点:记住】
  • 对于Hbase做了哪些性能的优化?
  • 内存优化
  • 压缩优化
  • 参数优化
  • ……

知识点03:MR集成Hbase:读Hbase规则

  • 目标
  • 掌握MapReduce中读取Hbase的开发规则
  • 分析
  • 读取由InputFormat决定
  • TextInputFormat:读取文件中的内容,每一行返回一个KV
  • K:行的偏移量:LongWritable
  • V:行的内容值:Text
  • TableInputFormat:负责实现读取Hbase的数据,将每个Rowkey的数据转换为一个KV对象
  • K:Rowkey的字节对象:ImmutableBytesWritable
  • V:Rowkey的数据内容:Result
  • 实现
  • step1:调用工具类方法,初始化Input和Map
  • MapReduce中封装了工具类,实现读取Hbase数据
TableMapReduceUtil.initTableMapperJob
public static void initTableMapperJob(
      String table, 
      Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, 
      Job job
);
  • step2:构建Map类继承TableMapper类
/**
 * Extends the base <code>Mapper</code> class to add the required input key
 * and value classes.
 *
 * @param <KEYOUT>  The type of the key.
 * @param <VALUEOUT>  The type of the value.
 * @see org.apache.hadoop.mapreduce.Mapper
 */
@InterfaceAudience.Public
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
  • 总结
  • MapReduce读取Hbase数据的API已经封装好了,只需要调用工具类实现即可

知识点04:MR集成Hbase:读Hbase实现

  • 目标
  • 实现从Hbase读取数据,将数据写入文件中
  • 分析
  • step1:使用TableInputFormat读取Hbase数据
  • step2:使用TextOutputFormat写入文件
  • 实现
package bigdata.itcast.cn.hbase.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @ClassName ReadHbaseTable
 * @Description TODO 通过MapReduce读取Hbase表中的数据
 * @Create By     Frank
 */
public class ReadHbaseTable extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        //todo:1-创建
        Job job =  Job.getInstance(this.getConf(),"read");
        job.setJarByClass(ReadHbaseTable.class);
        //todo:2-配置
        //input&map
//        job.setInputFormatClass(TextInputFormat.class);
//        TextInputFormat.setInputPaths(job,new Path(""));
//        job.setMapperClass(null);
//        job.setMapOutputKeyClass(null);
//        job.setMapOutputValueClass(null);
        //input&map
        /**
         * public static void initTableMapperJob(
         *       String table,                              指定从哪张表读取
         *       Scan scan,                                 读取Hbase数据使用的Scan对象,自定义过滤器
         *       Class<? extends TableMapper> mapper,       Mapper类
         *       Class<?> outputKeyClass,                   Map输出的Key类型
         *       Class<?> outputValueClass,                 Map输出的Value类型
         *       Job job                                    当前的job
         *  )
         */
        //构建TableInputFormat用于读取Hbase的scan对象
        Scan scan = new Scan();//为了方便让你使用过滤器,提前过滤数据,再传递到MapReduce中,所以让你自定义一个scan对象
        //可以为scan设置过滤器,将过滤后的数据加载到MapReduce程序中
        TableMapReduceUtil.initTableMapperJob(
                "itcast:t1",
                scan,
                ReadHbaseMap.class,
                Text.class,
                Text.class,
                job
        );
        //reduce
        job.setNumReduceTasks(0);
        //output
        TextOutputFormat.setOutputPath(job,new Path("datas/output/hbase"));
        //todo:3-提交
        return job.waitForCompletion(true) ? 0:-1;
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        //指定Hbase服务端地址
        conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
        int status = ToolRunner.run(conf, new ReadHbaseTable(), args);
        System.exit(status);
    }
    /**
     * TableMapper<KEYOUT, VALUEOUT>
     * extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
     */
    public static class ReadHbaseMap extends TableMapper<Text, Text>{
        //rowkey
        Text outputKey = new Text();
        //每一列的数据
        Text outputValue = new Text();
        /**
         * 每个KV【一个Rowkey】调用一次map方法
         * @param key:rowkey
         * @param value:这个rowkey的数据
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //给key进行赋值
            String rowkey = Bytes.toString(key.get());
            this.outputKey.set(rowkey);
            //给value赋值
            for(Cell cell : value.rawCells()){
                //得到每一列的数据
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                String val  = Bytes.toString(CellUtil.cloneValue(cell));
                long ts = cell.getTimestamp();
                this.outputValue.set(family+"\t"+column+"\t"+val+"\t"+ts);
                //输出每一列的数据
                context.write(this.outputKey,this.outputValue);
            }
        }
    }
}
  • 总结
  • 最终也是调用了Hbase Java API
  • 通过Scan来读取表的数据,返回到MapReduce程序汇总

知识点05:MR集成Hbase:写Hbase规则

  • 目标
  • 掌握MapReduce写入Hbase的开发规则
  • 分析
  • 输出由OutputFormat决定
  • TextOutputFormat:将KV输出写入文件中
  • TableOutputFormat:负责实现将上一步的KV数据写入Hbase表中
/**
 * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
 * while the output value <u>must</u> be either a {@link Put} or a
 * {@link Delete} instance.
 */
@InterfaceAudience.Public
public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
  • 要求输出的Value类型必须为Mutation类型:Put / Delete
  • Key是什么类型,不重要,在写入过程中,Key会被丢弃
  • 实现
  • step1:调用工具类初始化Reduce和Output
  • MapReduce中封装了工具类,实现读取Hbase数据
TableMapReduceUtil.initTableReducerJob
/**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(
      String table,
      Class<? extends TableReducer> reducer,  指定Reduce类,不用传递Key和Value类型,因为Key不重要,Value定死了
      Job job
  );
  • step2:构建Reduce类继承TableReducer
/**
 * Extends the basic <code>Reducer</code> class to add the required key and
 * value input/output classes. 
 *
 * @param <KEYIN>  The type of the input key.
 * @param <VALUEIN>  The type of the input value.
 * @param <KEYOUT>  The type of the output key.
 * @see org.apache.hadoop.mapreduce.Reducer
 */
@InterfaceAudience.Public
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
  extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}
  • 总结
  • MapReduce写入Hbase数据的API已经封装好了,只需要调用工具类实现即可

知识点06:MR集成Hbase:写Hbase实现

  • 目标
  • 实现从文件读取数据,将数据写入Hbase中
  • 分析
  • step1:使用TextInputFormat读取文件中的数据
  • step2:构建Put对象,封装Rowkey以及列
  • step3:使用TableOutputFormat将数据写入Hbase表中
  • 实现
  • Hbase中建表
create 'itcast:mrwrite','info'
  • 实现
package bigdata.itcast.cn.hbase.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @ClassName WriteHbaseTable
 * @Description TODO 通过MapReduce将数据写入Hbase
 * @Create By     Frank
 */
public class WriteHbaseTable extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        //todo:1-创建
        Job job =  Job.getInstance(this.getConf(),"write");
        job.setJarByClass(WriteHbaseTable.class);
        //todo:2-配置
        //input
        TextInputFormat.setInputPaths(job,new Path("datas/hbase/writeHbase.txt"));
        //map
        job.setMapperClass(WriteToHbaseMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Put.class);
        //shuffle
        //reduce&output
        /**
         *  public static void initTableReducerJob(
         *     String table,                                将数据写入Hbase的哪张表
         *     Class<? extends TableReducer> reducer,       reducer的类
         *     Job job)                                     当前的job
         *
         *     以前输出的写法:
         *      job.setoutputKey:因为Key可以任意的,这里根本用不到
         *      job.setoutputValue:在TableReduce中将outputValue定死了,所以不用写
         *
         */
        TableMapReduceUtil.initTableReducerJob(
            "itcast:mrwrite",
            WriteToHbaseReduce.class,
            job
        );
        //output & reduce
//        job.setReducerClass(null);
//        job.setOutputKeyClass(null);
//        job.setOutputValueClass(null);
//        job.setOutputFormatClass(TextOutputFormat.class);
//        TextOutputFormat.setOutputPath(job,new Path(""));
        //todo:3-提交
        return job.waitForCompletion(true) ? 0:-1;
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
        int status = ToolRunner.run(conf, new WriteHbaseTable(), args);
        System.exit(status);
    }
    /**
     * 读取文件,将文件中的内容,id作为key,其他的每一列作为一个Put对象
     */
    public static class WriteToHbaseMap extends Mapper<LongWritable,Text,Text, Put>{
        Text rowkey = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //value:1 liudehua  18  male
            String[] split = value.toString().split("\t");
            String row = split[0];
            String name = split[1];
            String age = split[2];
            String sex = split[3];
            //将id作为rowkey,放在key中输出
            this.rowkey.set(row);
            //构造输出的Value
            Put putname = new Put(Bytes.toBytes(row));
            putname.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));
            context.write(rowkey,putname);
            Put putage = new Put(Bytes.toBytes(row));
            putage.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));
            context.write(rowkey,putage);
            Put putsex = new Put(Bytes.toBytes(row));
            putsex.addColumn(Bytes.toBytes("info"),Bytes.toBytes("sex"),Bytes.toBytes(sex));
            context.write(rowkey,putsex);
        }
    }
    /**
     * public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
     * extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
     *     最后Reduce输出的Value类型必须为Put类型,才能将数据写入Hbase
     */
    public static class WriteToHbaseReduce extends TableReducer<Text,Put,Text>{
        /**
         * 相同rowkey的所有Put都在一个迭代器中
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            //直接遍历每个put对象,输出即可
            for (Put value : values) {
                context.write(key,value);
            }
        }
    }
}
  • 总结
  • 最终还是调用了Hbase Java API来实现的
  • 通过构建Table对象,执行所有的Put对象实现将数据写入Hbase

知识点07:BulkLoad的介绍

  • 目标
  • 了解BulkLoad的功能及应用场景
  • 分析
  • 问题:有一批大数据量的数据,要写入Hbase中,如果按照传统的方案来写入Hbase,必须先写入内存,然后内存溢写到HDFS,导致Hbase的内存负载和HDFS的磁盘负载过高,影响业务
  • 解决
  • 写入Hbase方式
  • 方式一:构建Put对象,先写内存
  • 方式二:BulkLoad,直接将数据变成StoreFile文件,放入Hbase对应的HDFS目录中
  • 数据不经过内存,读取数据时可以直接读取到
  • 实现
  • step1:先将要写入的数据转换为HFILE文件
  • step2:将HFILE文件加载到Hbase的表中【直接将文件放入了Hbase表对应的HDFS目录中】
  • 总结
  • 应用场景:Hbase提供BulkLoad来实现大数据量不经过内存直接写入Hbase
  • 特点
  • 优点:不经过内存,降低了内存和磁盘的IO吞吐
  • 缺点:性能上相对来说要慢一些,所有数据都不会在内存中被读取

知识点08:BulkLoad的实现

  • 目标
  • 实现BulkLoad方式加载数据到Hbase的表中
  • 分析
  • step1:先将要写入的数据转换为HFILE文件
  • step2:将HFILE文件加载到Hbase的表中【直接将文件放入了Hbase表对应的HDFS目录中】
  • 实现
  • 开发代码
  • 创建表
create 'mrhbase','info'
  • 上传测试文件
hdfs dfs -mkdir -p  /bulkload/input
hdfs dfs -put writeHbase.txt /bulkload/input/
  • 上传jar包到Linux上
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dQV6Ylwx-1616741489711)(20210326_分布式NoSQL列存储数据库Hbase(九).assets/image-20210326103006352.png)]
  • step1:转换为HFILE
yarn jar bulkload.jar bigdata.itcast.cn.hbase.bulkload.TransHfileMR /bulkload/input/ /bulkload/output
  • 运行找不到Hbase的jar包,手动申明HADOOP的环境变量即可,只在当前窗口有效
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/export/server/hbase-2.1.0/lib/shaded-clients/hbase-shaded-mapreduce-2.1.0.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/audience-annotations-0.5.0.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/commons-logging-1.2.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/findbugs-annotations-1.3.9-1.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/htrace-core4-4.2.0-incubating.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/log4j-1.2.17.jar:/export/server/hbase-2.1.0/lib/client-facing-thirdparty/slf4j-api-1.7.25.jar
  • step2:加载到Hbase表中
yarn jar bulkload.jar bigdata.itcast.cn.hbase.bulkload.BulkLoadToHbase /bulkload/output
  • 总结
  • step1:先将数据转换为HFILE文件
  • step2:将HFILE加载到Hbase表中

知识点09:ImportTSV的使用

  • 目标
  • 了解ImportTSV工具的功能及使用
  • 字面意思:导入tsv格式的数据文件
  • tsv:以制表符分隔每一列的文件
  • csv:以逗号分隔每一列的文件
  • 分析
  • importtsv功能:将可以将任何一种结构化的文件导入Hbase的表中,【默认是使用Put方式来导入的】
  • 默认导入tsv格式的文件
  • 实现
  • 使用方式一:直接使用Put方式导入
  • 使用
yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar importtsv 
-Dimporttsv.columns=HBASE_ROW_KEY,cf1:name,cf1:age,cf2:sex <你要写入哪张表> <读取文件的文件地址>
  • 手动指定分隔符
'-Dimporttsv.separator=,',自己指定分隔符,默认分隔符为\t
  • 举例
  • 现在是数据

         
  • 1 zhangsan 18 male
    2 lisi 20 female
- 导入Hbase中
```shell
yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar  \
importtsv  \
-Dimporttsv.columns=a,b,c \ --指定表中的每一列与文件中的每一列的对应关系
<tablename>   \--指定导入哪张表
<inputdir>      \--指定导入哪个文件
yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar  \
importtsv  \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:sex
mrhbase \
/bulkload/input
  • 使用方式二:结合BulkLoad的方式来实现
  • step1:将普通文件转换为HFILE文件
yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar importtsv 
-Dimporttsv.columns=HBASE_ROW_KEY,cf1:name,cf1:age,cf2:sex 
-Dimporttsv.bulk.output=HFILE文件的存储地址
<你要写入哪张表> <读取文件的文件地址>

         
yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar
importtsv
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:sex
-Dimporttsv.bulk.output=/bulkload/output
mrhbase
/bulkload/input
- step2:使用bulkload加载数据
  • yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar completebulkload HFILE文件的地址 表的名称
```shell
yarn jar /export/server/hbase-2.1.0/lib/hbase-mapreduce-2.1.0.jar completebulkload  /bulkload/output mrhbase
  • 总结
  • importTSV:默认读取tsv格式
  • 可以将数据直接写入Hbase
  • 也可以将数据转换为HFILE文件
  • 注意:importTSV可以读取别的格式:如果要读取csv格式
'-Dimporttsv.separator=,'

知识点10:协处理器的介绍

  • 目标
  • 了解协处理器的功能、分类和应用场景
  • 分析
  • 什么是协处理器?
  • 协处理器指的是可以自定义开发一些功能集成到Hbase中
  • 类似于Hive中的UDF,当没有这个功能时,可以使用协处理器来自定义开发,让Hbase支持对应的功能
  • 实现
  • observer类:观察者类,类似于监听机制,MySQL中的触发器、Zookeeper中的监听
  • 实现:监听A,如果A触发了,就执行B
  • 监听对象
  • Region
  • Table
  • RegionServer
  • Master
  • 触发:监听A,如果A触发了,执行B
  • pre:阻塞A,先执行B,再执行A
  • post:A先执行,B在A执行完成之后再执行
  • endpoint类:终端者类,类似于MySQL中的存储过程,Java中的方法
  • 实现:固定一个代码逻辑,可以随时根据需求调用代码逻辑
  • 总结
  • Hbase通过协处理器来弥补一些用户自定义功能的实现,例如二级索引等,但开发难度较高,一般通过第三方工具来实现

知识点11:协处理器的实现

  • 目标
  • 利用协处理器模拟实现二级索引同步原表与索引表数据
  • 分析
  • step1:开发协处理器,监听原表的put请求
  • step2:拦截原表put请求,获取put操作,获取rowkey以及值
  • step3:构建索引表的rowkey,往索引表写入数据
  • step4:释放原表请求,往原表写入数据
  • 实现
  • 创建两张表
#rowkey:time_id
create 'proc1','info'
#rowkey:id_time
create 'proc2','info'
  • 将开发好的协处理器jar包上传到hdfs上
hdfs dfs -mkdir -p /coprocessor/jar
mv bulkload.jar cop.jar
hdfs dfs -put cop.jar /coprocessor/jar/
  • 添加协处理器到proc1中,用于监听proc1的操作
disable 'proc1'
alter 'proc1',METHOD => 'table_att','Coprocessor'=>'hdfs://node1:8020/coprocessor/jar/cop.jar|bigdata.itcast.cn.hbase.coprocessor.SyncCoprocessor|1001|'
enable 'proc1'
  • 测试
put 'proc1','20191211_001','info:name','zhangsan'
scan 'proc1'
scan 'proc2'
  • 卸载协处理器
disable 'proc1'
alter 'proc1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'proc1'
  • 总结
  • 协处理器API过于繁琐,基于不同的需求需要开发多个协处理器共同实现,不建议使用
  • 建议使用Phoenix

知识点12:Hbase优化:内存分配

  • 目标
  • 了解Hbase中内存的管理及分配
  • 分析
  • 写缓存:Memstore
  • 读缓存:BlockCache
  • 使用的RegionServer的JVM堆内存
  • 注意:所有使用JVM堆内存工具,都会有一个共同的问题:GC停顿
  • 合理设计垃圾回收的机制来回收内存,避免GC停顿
  • 实现
  • MemStore:写缓存
hbase.regionserver.global.memstore.size = 0.4
  • 如果存多了,Flush到HDFS
  • BlockCache:读缓存
hfile.block.cache.size = 0.4
  • LRU淘汰算法,将最近最少被使用的数据从缓存中剔除
  • 读多写少,降低MEMStore比例
  • 读少写多,降低BlockCache比例
  • 总结
  • 可以根据实际的工作场景的需求,调整内存比例分配,提高性能

知识点13:Hbase优化:压缩机制

  • 目标
  • 了解Hbase中支持的压缩类型及配置实现
  • 分析
  • Hbase的压缩源自于Hadoop对于压缩的支持
  • 检查Hadoop支持的压缩类型
hadoop checknative
  • 需要将Hadoop的本地库配置到Hbase中
  • 实现
  • 关闭Hbase的服务
    配置Hbase的压缩本地库: lib/native/Linux-amd64-64
cd /export/server/hbase-2.1.0/
mkdir lib/native
  • 将Hadoop的压缩本地库创建一个软链接到Hbase的lib/native目录下
ln -s /export/server/hadoop-2.7.5/lib/native /export/server/hbase-2.1.0/lib/native/Linux-amd64-64
  • 启动Hbase服务
start-hbase.sh
hbase shell
  • 创建表
create 'testcompress',{NAME=>'cf1',COMPRESSION => 'SNAPPY'}
put 'testcompress','001','cf1:name','laoda'
  • 总结
  • Hbase提供了多种压缩机制实现对于大量数据的压缩存储,提高性能
  • 压缩属于列族的属性:基于列族设计压缩

知识点14:Hbase优化:布隆过滤

  • 目标
  • 了解布隆过滤器的功能及使用
  • 分析
  • 什么是布隆过滤器?
  • 是列族的一个属性,用于数据查询时对数据的过滤,类似于ORC文件中的布隆索引
  • 实现
  • 列族属性:BLOOMFILTER => NONE | ‘ROW’ | ROWCOL
  • NONE :不开启布隆过滤器
  • ROW:开启行级布隆过滤
  • 生成StoreFile文件时,会将这个文件中有哪些Rowkey的数据记录在文件的头部
  • 当读取StoreFile文件时,会从文件头部或者这个StoreFile中的所有rowkey,自动判断是否包含需要的rowkey,如果包含就读取这个文件,如果不包含就不读这个文件
  • ROWCOL:行列级布隆过滤
  • 生成StoreFile文件时,会将这个文件中有哪些Rowkey的以及对应的列族和列的信息数据记录在文件的头部
  • 当读取StoreFile文件时,会从文件头部或者这个StoreFile中的所有rowkey以及列的信息,自动判断是否包含需要的rowkey以及列,如果包含就读取这个文件,如果不包含就不读这个文件
  • 总结
  • Hbase通过布隆过滤器,在写入数据时,建立布隆索引,读取数据时,根据布隆索引加快数据的检索

知识点15:Hbase优化:列族属性

  • 目标
  • 了解其他常用列族属性
  • 分析
{NAME => 'cf1', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}   
  • 实现
  • NAME:表示列族的名称
  • VERSIONS:最大版本数
  • 表示这个列族中的列最多可以存储几个版本的值
  • TTL:设置版本的自动过期时间,默认永不过期的,修改单位为秒
  • VERSIONS = 5
  • MIN_VERSIONS = 2
  • 一旦到达TTL时间,会自动清理过期的版本,只保留2个版本
  • MIN_VERSIONS:最小版本数
  • BLOCKCACHE:开启缓存,如果列族开启了缓存,这个列族从HDFS的查询就会放入缓存中
  • 默认就开启的
  • 工作中要将不是经常读的列族的缓存关闭
  • 使用LRU算法淘汰过期的数据
  • IN_MOMERY:最高缓存级别,meta表就是这个级别,一般情况下不建议开启
  • 不会被优先淘汰
  • BLOCKSIZE:存储文件的块的大小
  • 块越小,索引越多,查询越快,占用内存越多
  • 块越大,索引越少,查询相对较慢,占用内存越少
  • 一般不建议调整

知识点16:Hbase优化:其他优化

  • 目标
  • 了解Linux、HDFS、Zookeeper、Hbase其他属性优化
  • 实现
  • Linux系统优化
  • 开启文件系统的预读缓存可以提高读取速度
sudo blockdev --setra 32768 /dev/sda
  • 最大限度使用物理内存
sudo sysctl -w vm.swappiness=0
  • 调整文件及进程句柄数
sudo vi /etc/security/limits.conf 修改打开文件数限制
末尾添加:
*                soft    nofile          1024000
*                hard    nofile          1024000
Hive             -       nofile          1024000
hive             -       nproc           1024000 
$ sudo vi /etc/security/limits.d/20-nproc.conf 修改用户打开进程数限制
修改为:
#*          soft    nproc     4096
#root       soft    nproc     unlimited
*          soft    nproc     40960
root       soft    nproc     unlimited
  • HDFS优化
  • 保证RPC调用会有较多的线程数
dfs.namenode.handler.count = 20
dfs.datanode.handler.count = 20
  • 文件块大小的调整
dfs.blocksize = 256M
  • 文件句柄数
dfs.datanode.max.transfer.threads = 4096
  • 超时时间
dfs.image.transfer.timeout = 60000
  • 避免DN错误宕机
dfs.datanode.failed.volumes.tolerated = 1
  • Zookeeper优化
  • 优化Zookeeper会话超时时间
zookeeper.session.timeout = 90000
  • Hbase属性优化
  • 设置RPC监听数量
hbase.regionserver.handler.count = 50
  • 优化hbase客户端缓存
hbase.client.write.buffer = 2097152
  • 指定scan.next扫描HBase所获取的行数
hbase.client.scanner.caching = 2147483647

附录一:Maven依赖

<repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
    <properties>
        <hadoop.version>2.7.3</hadoop.version>
        <hbase.version>2.1.2</hbase.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>
sion>
org.apache.hadoop
hadoop-mapreduce-client-jobclient
h a d o o p . v e r s i o n < / v e r s i o n > < / d e p e n d e n c y > < d e p e n d e n c y > < g r o u p I d > o r g . a p a c h e . h a d o o p < / g r o u p I d > < a r t i f a c t I d > h a d o o p − c o m m o n < / a r t i f a c t I d > < v e r s i o n > {hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>hadoop.version</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop−common</artifactId><version>{hadoop.version}
org.apache.hadoop
hadoop-mapreduce-client-core
h a d o o p . v e r s i o n < / v e r s i o n > < / d e p e n d e n c y > < d e p e n d e n c y > < g r o u p I d > o r g . a p a c h e . h a d o o p < / g r o u p I d > < a r t i f a c t I d > h a d o o p − a u t h < / a r t i f a c t I d > < v e r s i o n > {hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>hadoop.version</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop−auth</artifactId><version>{hadoop.version}
org.apache.hadoop
hadoop-hdfs
${hadoop.version}


commons-io

commons-io

2.6



         


相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8天前
|
关系型数据库 MySQL 分布式数据库
《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)
《MySQL 简易速速上手小册》第6章:MySQL 复制和分布式数据库(2024 最新版)
44 2
|
3天前
|
存储 分布式计算 Hadoop
基于Hadoop分布式数据库HBase1.0部署及使用
基于Hadoop分布式数据库HBase1.0部署及使用
|
9天前
|
存储 关系型数据库 MySQL
如何处理爬取到的数据,例如存储到数据库或文件中?
处理爬取的数据,可存储为txt、csv(适合表格数据)或json(适合结构化数据)文件。若需存储大量数据并执行复杂查询,可选择关系型(如MySQL)或非关系型(如MongoDB)数据库。以MySQL为例,需安装数据库和Python的pymysql库,创建数据库和表,然后编写Python代码进行数据操作。选择存储方式应考虑数据类型、数量及后续处理需求。
17 1
|
15天前
|
存储 SQL Oracle
【Oracle】玩转Oracle数据库(二):体系结构、存储结构与各类参数
【Oracle】玩转Oracle数据库(二):体系结构、存储结构与各类参数
36 7
|
23天前
|
数据库 存储 BI
SAP ABAP CDS View 源代码存储的数据库表揭秘和其他相关数据库表介绍试读版
SAP ABAP CDS View 源代码存储的数据库表揭秘和其他相关数据库表介绍试读版
12 0
SAP ABAP CDS View 源代码存储的数据库表揭秘和其他相关数据库表介绍试读版
|
存储 传感器 数据管理
【软件设计师备考 专题 】面向对象数据库和分布式对象:理解新的数据管理概念
【软件设计师备考 专题 】面向对象数据库和分布式对象:理解新的数据管理概念
53 0
|
1月前
|
存储 数据采集 数据挖掘
【软件设计师备考 专题 】数据仓库和分布式数据库基础知识
【软件设计师备考 专题 】数据仓库和分布式数据库基础知识
198 0
|
1月前
|
存储 SQL 数据库
C# 将 Word 转文本存储到数据库并进行管理
C# 将 Word 转文本存储到数据库并进行管理
|
18天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
25天前
|
NoSQL Java Redis
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
SpringBoot集成Redis解决表单重复提交接口幂等(亲测可用)
283 0