hbase使用

本文涉及的产品
云原生多模数据库 Lindorm,多引擎 多规格 0-4节点
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

Hbase 常用shell操作

list

#查看default中的表
     list
     # 查看命名空间
     list_namespace
     #查看命名空间表
     list_namespace_tables 'hbase'

create

#创建表
    create 't5','cf1'
    #创建命名空间
    create_namespace 'beh'
    #创建命名空间表
    create 'beh:t5','cf'
    #创建分区表
    create 't1','f1', SPLITS => ['10','20','30']
    create 't1','f1',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

put

  # 向表中插入数据
    put 'beh:t1','r1','cf1:c1','value1'
    # 向命名空间的表中插入数据
    put 'beh:t1','r2','cf1:c2','value2'
    # 指定时间戳
    put 'beh:t1','r3','cf1:c2','value2',1234567
    #指定额外属性 
    put 'beh:t1','r100','cf:c2','value2',1234567 , {ATTRIBUTES=>{'mykey'=>'myvalue'}}

get

# 获取一行数据
    get 't1','r1'
    # 只获取cf2 列簇的数据
    get 't1','r1','cf2'
    # 只获取c1,c2两列数据
    get 't1','r1',['cf2:c1','cf2:c2']
    #限定时间戳
    get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => 123456}
    #限定版本
    get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => 12345, VERSIONS => 4}

scan

# 扫描表
    scan 't1'
    # 扫描列
    scan 't1',{COLUMN => ['cf1:c1','cf2:c2']}
    # limit 10
    scan 't1',{LIMIT => 10}
    # 设置开始位置
    scan 't1',{STARTROW => 'r20'}
    # 添加过滤器
    scan 't1', FILTER=>"ColumnPrefixFilter('c2') AND ValueFilter(=,'substring:8')"

drop 和delete

 #删除数据
    deleteall 't1','r1'
    #删除一列数据
    delete 't1','r2','cf1,c1'
    # 删除表
    disable 't1'
    drop 't1'
    #清空表
    truncate 't1'

hbase起停

#启动
 start-hbase.sh
 #启动单个服务
 hbase-daemon.sh start master
 hbase-daemon.sh start regionserver

hbase常用java api

package com.bonc.hbase;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

@SuppressWarnings("deprecation")
public class ApiSample {
	private static HBaseAdmin admin;
	private static HTable table;
	private static byte[] tableName = Bytes.toBytes("a1");
	private static byte[] cf1 = Bytes.toBytes("cf1");
	private static byte[] cf2 = Bytes.toBytes("cf2");
	Configuration conf = HBaseConfiguration.create();
	{
		try {
			admin = new HBaseAdmin(conf);
		} catch (Exception e) {
		}
	}

	public static void main(String[] args) throws Exception {
		ApiSample sample = new ApiSample();
		// sample.listTable();
		// sample.listNameSpace();
		// sample.listTableWithNamespace();
		// // sample.createTable();
		// // sample.createTableWithSplit();
		// sample.editVersion();
		// sample.putData();
		// sample.getData();
		// sample.scanData();
		sample.scanDataWithFilter();
		sample.deleteData();
		sample.dropTable();
	}

	// list
	public void listTable() throws Exception {
		System.out.println("---------table------------------");
		TableName[] ts = admin.listTableNames();
		for (TableName tb : ts) {
			System.out.println(tb.getNameWithNamespaceInclAsString());
		}
	}

	// list namespace
	public void listNameSpace() throws Exception {
		System.out.println("---------namespace------------------");
		NamespaceDescriptor[] nss = admin.listNamespaceDescriptors();
		for (NamespaceDescriptor ns : nss) {
			System.out.println(ns.getName());
		}
	}

	// 获取 beh 命名空间下的表
	// list
	public void listTableWithNamespace() throws Exception {
		System.out.println("---------table on beh------------------");
		TableName[] ts = admin.listTableNamesByNamespace("beh");
		for (TableName tb : ts) {
			System.out.println(tb.getNameWithNamespaceInclAsString());
		}
	}

	// 创建表 a1 列簇: cf1,cf2
	public void createTable() throws Exception {
		System.out.println("---------create table------------------");
		HTableDescriptor t = new HTableDescriptor(tableName);
		t.addFamily(new HColumnDescriptor(cf1));
		t.addFamily(new HColumnDescriptor(cf2));
		admin.createTable(t);

	}

	// 创建预分区表a1 列簇: cf1,cf2 开始row 1 结束row 9 分10个分区
	public void createTableWithSplit() throws Exception {
		System.out.println("---------create table with 10 regin------------------");
		HTableDescriptor t = new HTableDescriptor(Bytes.toBytes("a2"));
		t.addFamily(new HColumnDescriptor(cf1));
		t.addFamily(new HColumnDescriptor(cf2));
		admin.createTable(t, Bytes.toBytes(0), Bytes.toBytes(9), 10);

	}

	// put数据
	public void putData() throws Exception {
		System.out.println("---------put data to a1------------------");
		table = new HTable(conf, tableName);
		Put p1 = new Put(Bytes.toBytes("r1"));
		p1.add(cf1, Bytes.toBytes("c1"), Bytes.toBytes("v1"));
		p1.add(cf2, Bytes.toBytes("c2"), Bytes.toBytes("v666"));
		p1.add(cf1, Bytes.toBytes("c3"), Bytes.toBytes("v3"));
		Put p2 = new Put(Bytes.toBytes("r3"));
		p2.add(cf1, Bytes.toBytes("c1"), Bytes.toBytes("v4"));
		p2.add(cf2, Bytes.toBytes("c2"), Bytes.toBytes("v5"));
		p2.add(cf1, Bytes.toBytes("c3"), Bytes.toBytes("v6"));
		table.put(p1);
		table.put(p2);
		table.close();
	}

	// 修改a1存储多版本
	public void editVersion() throws Exception {
		System.out.println("---------修改表存储多版本------------------");
		HColumnDescriptor cf = new HColumnDescriptor(cf2);
		cf.setMaxVersions(3);
		admin.modifyColumn(tableName, cf);

	}

	// get数据
	public void getData() throws Exception {
		System.out.println("---------get data from a1------------------");
		table = new HTable(conf, tableName);
		Get get = new Get(Bytes.toBytes("r1"));

		printResult(table.get(get));
		System.out.println("---------get r1 only cf2 family-----------------");
		get.addFamily(cf2);
		printResult(table.get(get));
		System.out.println("----------get with max version----------------");
		get.setMaxVersions();
		printResult(table.get(get));

		table.close();
	}

	// scan
	public void scanData() throws Exception {
		System.out.println("---------scan data from a1------------------");
		table = new HTable(conf, tableName);
		Scan scan = new Scan();
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data from a1 start with row key r2------------------");
		scan.setStartRow(Bytes.toBytes("r2"));
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data from a1 start with row key r2 only get cf1 :c1 data-----------------");
		scan.addColumn(cf1, Bytes.toBytes("c1"));
		printResultScan(table.getScanner(scan));
		table.close();
	}

	// scan
	/**
	 * row过滤 过滤设置scan范围 设置setRowPrefixFilter 跟设置开始结束一样 使用rowFilter family
	 * addFalimy() FamilyFilter 列过滤 addColumn() ColumnPrefixFilter
	 * ColumnPaginationFilter 值过滤 valueFilter SingleColumnValueFiler
	 * SingleColumnValueExcludeFilter
	 * 
	 * @throws Exception
	 */
	public void scanDataWithFilter() throws Exception {
		System.out.println("---------scan data with row filter from a1------------------");
		table = new HTable(conf, tableName);
		Scan scan = new Scan();
		Filter fiter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("r1")));

		scan.setFilter(fiter);
		printResultScan(table.getScanner(scan));

		System.out.println("---------scan data with add family from a1------------------");
		// scan.addFamily(cf1);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with cf Filter filter from a1------------------");
		scan.setFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("cf1"))));
		printResultScan(table.getScanner(scan));

		System.out.println("---------scan data with column prefix filter from a1------------------");
		FilterList list = new FilterList(Operator.MUST_PASS_ALL);
		list.addFilter(new ColumnPrefixFilter(Bytes.toBytes("c")));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));

		System.out.println("---------scan data with column pagination filter from a1------------------");
		// 对行进行分页,获取一行的几列,从哪开始
		list.addFilter(new ColumnPaginationFilter(2, 2));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with value  filter from a1------------------");
		list.addFilter(new ValueFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("v5"))));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with column  value  filter from a1------------------");
		list.addFilter(new SingleColumnValueFilter(cf1, Bytes.toBytes("c2"), CompareOp.GREATER, Bytes.toBytes("v5")));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		System.out.println("---------scan data with column  value include filter from a1------------------");
		list.addFilter(
				new SingleColumnValueExcludeFilter(cf1, Bytes.toBytes("c2"), CompareOp.GREATER, Bytes.toBytes("v5")));
		scan.setFilter(list);
		printResultScan(table.getScanner(scan));
		table.close();
	}

	public void deleteData() throws Exception {
		System.out.println("---------delete data from a1------------------");
		table = new HTable(conf, tableName);
		Delete delete = new Delete(Bytes.toBytes("r1"));
		table.delete(delete);
		System.out.println("---------get  r1 from a1------------------");
		Get get = new Get(Bytes.toBytes("r1"));
		printResult(table.get(get));
		table.close();
	}

	public void dropTable() throws Exception {
		System.out.println("---------delete table a1------------------");
		admin.disableTable(tableName);
		admin.deleteTable(tableName);
		admin.disableTable(Bytes.toBytes("a2"));
		admin.deleteTable(Bytes.toBytes("a2"));
		admin.close();
	}

	private void printResultScan(ResultScanner rs) throws Exception {
		Result r = null;
		while ((r = rs.next()) != null) {
			printResult(r);
		}
		rs.close();
	}

	private void printResult(Result rs) {
		if (!rs.isEmpty()) {
			for (KeyValue kv : rs.list()) {
				System.out.println(String.format("row:%s\t family:%s\t qualifier:%s\t value:%s\t timestamp:%s",
						Bytes.toString(kv.getRow()), Bytes.toString(kv.getFamily()), Bytes.toString(kv.getQualifier()),
						Bytes.toString(kv.getValue()), kv.getTimestamp()));
			}
		}
	}
}

hbase 常见的二级索引

Coprocessor

思路

利用coprocessor可以在入库,查询前后的一些操作实现同时对所用表的入库,删除等维护索引表

实现

package com.bonc.hbase.coprocessor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;

public class SecondaryIndexByComprocessorForGlobal extends BaseMasterAndRegionObserver {

	private static String INDEX_TABLE_NAME = "index.table.name";
	private static String INDEX_FAMILY_NAME = "index.family.name";
	private static String TABLE_CELL = "table.cell";
	private byte[] index_name;
	private byte[] index_family;
	private byte[] main_family;
	private byte[] main_qualifier;
	private Configuration conf;
	private Connection conn;

	/**
	 * 需要参数: 索引表名称,索引表列簇名称,主表需要索引列
	 */
	@Override
	public void start(CoprocessorEnvironment ctx) throws IOException {

		conf = ctx.getConfiguration();
		index_name = Bytes.toBytes(conf.get(INDEX_TABLE_NAME));
		index_family = Bytes.toBytes(conf.get(INDEX_FAMILY_NAME));
		String cell = conf.get(TABLE_CELL);
		if (cell != null) {
			String[] s = cell.split(":");
			main_family = Bytes.toBytes(s[0]);
			main_qualifier = Bytes.toBytes(s[1]);
		}
		System.out.println("添加协处理器,建立索引的表:" + Bytes.toString(index_name) + " ,被索引的列 : " + cell);
		super.start(ctx);
	}

	private Connection getConnection() {
		if (conn == null || conn.isClosed()) {
			try {
				conn = ConnectionFactory.createConnection(conf);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return conn;
	}

	private Table getTable() throws IllegalArgumentException, IOException {
		return getConnection().getTable(TableName.valueOf(index_name));
	}

	@Override
	// 定义创建表之前的动作,因为表创建表之前无法确定有那些列,索引没办法同步创建索引表
	public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, HTableDescriptor desc,
			HRegionInfo[] regions) throws IOException {
		// TODO Auto-generated method stub
		super.preCreateTable(ctx, desc, regions);
	}

	@Override
	// put数据之后执行,但是很难保证事务性
	public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)
			throws IOException {
		// put
		List<Cell> list = put.get(main_family, main_qualifier);
		if (list != null && !list.isEmpty()) {
			for (Cell cell : list) {
				putDataToIndex(cell);
			}
		}
		super.prePut(e, put, edit, durability);
	}

	@Override
	// 主表删除数据后,索引表删除数据
	public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
			Durability durability) throws IOException {
		byte[] row = delete.getRow();
		Result result = e.getEnvironment().getRegion().get(new Get(row));
		KeyValue kv = result.getColumnLatest(main_family, main_qualifier);
		byte[] value = kv.getValue();
		deleteDataFromIndex(Bytes.add(Bytes.toBytes(MD5Hash.getMD5AsHex(value).substring(0, 8)), value, row));
		super.postDelete(e, delete, edit, durability);
	}

	@Override
	// 删除表时同时删除索引表
	public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName)
			throws IOException {
		Admin admin = getConnection().getAdmin();
		admin.deleteTable(TableName.valueOf(INDEX_TABLE_NAME));
		super.postDeleteTable(ctx, tableName);
	}

	private void deleteDataFromIndex(byte[] row) {
		Delete d = new Delete(row);
		try {
			Table table = getTable();
			table.delete(d);
			table.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}

	private void putDataToIndex(Cell cell) throws IOException {
		byte[] row = cell.getRow();
		byte[] value = cell.getValue();
		Put put = new Put(Bytes.add(Bytes.toBytes(MD5Hash.getMD5AsHex(value).substring(0, 8)), value, row));
		put.add(index_family, Bytes.toBytes("c"), row);
		Table table = getTable();
		table.put(put);
		table.close();
	}
}


HBase技术交流社区 - 阿里官方“HBase生态+Spark社区大群”点击加入:https://dwz.cn/Fvqv066s

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
7月前
|
Java 分布式数据库 Hbase
hbase系列
hbase系列
44 0
|
4月前
|
负载均衡 分布式数据库 Hbase
HBase
【8月更文挑战第8天】
37 5
|
5月前
|
分布式计算 Hadoop 大数据
Hbase一:Hbase介绍及特点
Hbase一:Hbase介绍及特点
55 0
|
7月前
|
存储 Java 分布式数据库
什么是HBase?它的特点是什么?
什么是HBase?它的特点是什么?
536 0
|
NoSQL 大数据 分布式数据库
【HBase】(1)-HBase的安装
【HBase】(1)-HBase的安装
155 0
【HBase】(1)-HBase的安装
|
存储 分布式计算 安全
第8章 HBase应用
第8章 HBase应用
422 0
|
SQL 缓存 NoSQL
HBase问题答疑汇总(2021)
HBase问题答疑汇总(2021)
|
存储 负载均衡 监控
从一无所知到5分钟快速了解HBase
从一无所知到5分钟快速了解HBase
260 0
从一无所知到5分钟快速了解HBase
|
存储 NoSQL Java
HBase特点
HBase特点
169 0
|
Web App开发 存储 大数据
Hbase问题汇总与解答
今天早上分享了下HBase,分享的时候同事提出了一些问题,可能大部分有有这样的困惑,汇总下来: HBase问题汇总与解答 两个独立的服务器,一台用HDFS,一台不用HDFS可以吗?HDFS和Hbase必须要装在同一台服务器上吗? 答: As HBase runs on HDFS (and eac...