Hbase java 常见操作

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:       import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.

 

 

 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.HashMap;
import java.util.List;
import java.util.Map; 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import org.mortbay.log.Log; 




public class HbaseUtil implements IOperator
{
	private static Configuration conf = null; 
	private static String configFile = "hbase-site-test_bj.xml";
	private   Map<String, String> aMap = null;
	private   String mapTable = null; 
	private   String[] tableFamily = null;  
	
	public HbaseUtil() { 
		
	}
	 
	public HbaseUtil( String mapAppTable , String[] appTableFamily ) { 
		this.aMap =  new HashMap<String, String>();
		this.mapTable = mapAppTable; 
		this.tableFamily = appTableFamily;  
		
	}

	static
	{
		Configuration HBASE_CONFIG = new Configuration();
		HBASE_CONFIG.addResource(configFile);
		conf = HBaseConfiguration.create(HBASE_CONFIG);
		System.err.println(conf.get("hbase.zookeeper.property.dataDir"));
	}

	/**
	 * 创建表操作
	 * 
	 * @throws IOException
	 */
	public void createTable(String tablename, String[] cfs) throws IOException
	{
		HBaseAdmin admin  = new HBaseAdmin(conf);
			if (admin.tableExists(tablename))
			{
				System.out.println("表已经存在!");
			}
			else
			{
				HTableDescriptor tableDesc = new HTableDescriptor(tablename);
				for (int i = 0; i < cfs.length; i++)
				{
					tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
				}
				admin.createTable(tableDesc);
				System.out.println("表创建成功!");
			}
		  admin.close(); 
	}

	/**
	 * 删除表操作
	 * 
	 * @param tablename
	 * @throws IOException
	 */
	public void deleteTable(String tablename) throws IOException
	{
		HBaseAdmin admin = new HBaseAdmin(conf);
			if (!admin.tableExists(tablename))
			{
				System.out.println("table(" + tablename + ") not exists, won't delete");
			}
			else
			{
				admin.disableTable(tablename);
				admin.deleteTable(tablename);
				System.out.println("table(" + tablename + ") delete success");
			} 
		 admin.close(); 
	}

	public void insertRow() throws IOException
	{
		HTable table = new HTable(conf, "test");
		Put put = new Put(Bytes.toBytes("row3"));
		put.add(Bytes.toBytes("cf"), Bytes.toBytes("444"), Bytes.toBytes("value444"));
		table.put(put);
		table.close();
	}

	/**
	 * 插入一行记录
	 * 
	 * @param tablename
	 * @param cfs
	 * @throws IOException 
	 */
	public void writeRow(String tablename, String[] cfs) throws IOException
	{
		HTable    table = new HTable(conf, tablename);
		Put put = new Put(Bytes.toBytes(cfs[0])); 
		put.add(Bytes.toBytes(cfs[1]), Bytes.toBytes(cfs[2]), Bytes.toBytes(cfs[3]));
		table.put(put);
		System.out.println("写入成功!"); 
		table.close();
	}

	// 写多条记录
	public void writeMultRow(String tablename, String[][] cfs) throws IOException
	{
		List<Put> lists = new ArrayList<Put>();
		HTable table = new HTable(conf, tablename);
		for (int i = 0; i < cfs.length; i++)
		{
			Put put = new Put(Bytes.toBytes(cfs[i][0]));
			put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3]));
			lists.add(put);
		}
		table.put(lists);
		 table.close();
		 
	}

	// 写多条记录
	public void writeMultRowByDevice(HTable table, String tablename, String[][] cfs) throws IOException
	{
		 
		List<Put> lists = new ArrayList<Put>();
		// HTable table = new HTable(conf, tablename);
		for (int i = 0; i < cfs.length; i++)
		{
			Put put = new Put(Bytes.toBytes(cfs[i][0]));
			Log.info("writeMultRowByDevice  "+Bytes.toBytes(cfs[i][1])+"="+Bytes.toBytes(cfs[i][2])+"="+Bytes.toBytes(cfs[i][3]));
			put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3]));
			lists.add(put);
		}
		Log.info("push start");
		table.put(lists);
		Log.info("push end");
		 
	}

	/**
	 * 删除一行记录
	 * 
	 * @param tablename
	 * @param rowkey
	 * @throws IOException
	 */
	public void deleteRow(String tablename, String rowkey) throws IOException
	{
		HTable table = new HTable(conf, tablename);
		List<Delete> list = new ArrayList<Delete>();
		Delete d1 = new Delete(rowkey.getBytes());
		list.add(d1);
		table.delete(list);
		System.out.println("delete row(" + rowkey + ") sucess");
		table.close();
	}

	/**
	 * 查找一行记录
	 * 
	 * @param tablename
	 * @param rowkey
	 */
	public   void selectRow(String tablename, String rowKey) throws IOException
	{
		HTable table = new HTable(conf, tablename);
		Get g = new Get(rowKey.getBytes());
		// g.addColumn(Bytes.toBytes("cf:1"));
		Result rs = table.get(g);
		for (KeyValue kv : rs.raw())
		{
			System.out.print(new String(kv.getRow()) + "  ");
			System.out.print(new String(kv.getFamily()) + ":");
			System.out.print(new String(kv.getQualifier()) + "  ");
			System.out.print(kv.getTimestamp() + "  ");
			System.out.println(new String(kv.getValue()));
		}
	   table.close();
	   
	}

	/**
	 * 查询表中所有行
	 * 
	 * @param tablename
	 * @throws IOException 
	 */
	public void scaner(String tablename) throws IOException
	{
		 
			HTable table = new HTable(conf, tablename);
			Scan s = new Scan();
			ResultScanner rs = table.getScanner(s);
			for (Result r : rs)
			{
				KeyValue[] kv = r.raw();
				// for (int i = 0; i < kv.length; i++) {
				/*
				 * System.out.print(new String(kv[i].getRow()) + "  ");
				 * System.out.print(new String(kv[i].getFamily()) + ":");
				 * System.out.print(new String(kv[i].getQualifier()) + "  ");
				 * System.out.print(kv[i].getTimestamp() + "  ");
				 * System.out.println(new String(kv[i].getValue()));
				 */
				System.out.println(new String(kv[1].getValue()) + "==" + new String(kv[0].getValue()));
				// }
			}
		 rs.close();
		 table.close(); 
	}


	public void scanByTimestamp(String tablename, long maxtime) throws IOException
	{ 
			HTable table = new HTable(conf, tablename);
			Scan s = new Scan();
			// TODO 存放所有的结果
			FilterList allInfo = new FilterList();
			// allInfo.addFilter();
			s.setFilter(allInfo);
			
	}

	public   Map<String, String> getMap()
	{
		Map<String, String> map = new HashMap<String, String>();
		try
		{
			HTable table = new HTable(conf, mapTable);
			Scan s = new Scan();
			ResultScanner rs = table.getScanner(s);
			for (Result r : rs)
			{
				KeyValue[] kv = r.raw();
				map.put(new String(kv[0].getRow()), new String(kv[0].getValue()));
			}
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
		return map;
	}
 

	

}

 

 

import java.io.IOException;
import java.util.Map;

public interface IOperator {

	public void createTable(String tablename, String[] cfs) throws IOException ;
	public void deleteTable(String tablename) throws IOException;
	public void insertRow() throws IOException;
	public void writeRow(String tablename, String[] cfs) throws IOException;
	public void writeMultRow(String tablename, String[][] cfs) throws IOException;
	public void deleteRow(String tablename, String rowkey) throws IOException;
	public void selectRow(String tablename, String rowKey) throws IOException;
	public void scaner(String tablename) throws IOException;
	public void scanByTimestamp(String tablename, long maxtime) throws IOException;
	public Map<String, String> getMap() throws IOException; 
}

 

public abstract class BaseRunnabler implements Runnable{

	String sourceFile=""; // 读取文件路径
	String numberFile="";     
	String hbaseTable="";  // hbase  表名
	String [] hbaseFamily=null;    // 行列簇名
	String keywords ="";
	
	public BaseRunnabler(String sourceFile,String hbaseTable,String [] hbaseFamily,String numberFile ,String keywords ){
		this.sourceFile=sourceFile;
		this.numberFile=numberFile;
		this.hbaseTable=hbaseTable;
		this.hbaseFamily = hbaseFamily;
		this.keywords = keywords;
	}	
	
	@Override
	public void run() {
		try{
		IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily);
    	hu.createTable(hbaseTable,hbaseFamily ); 
		processFile(hu );
		}catch (Exception e) {
			e.printStackTrace();
		}

	}

	public abstract void processFile(IOperator hu) throws Exception; 	 
}

 

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader; 
import java.io.IOException;
import java.util.Date;  
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; 
import Model.Device; 
import com.alibaba.fastjson.JSON; 

public class DeviceReadThread extends BaseRunnabler {

	  static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class);
	
	public DeviceReadThread(String sourceFile, String hbaseTable,
			String[] hbaseFamily, String numberFile, String keywords) {
		super(sourceFile, hbaseTable, hbaseFamily, numberFile, keywords);
	}

	@Override
	public void processFile(IOperator hu) {
		FileReader logReader = null;
		BufferedReader logBufferedReader = null;
		try { 
			File logFile = new File(sourceFile);
			logReader = new FileReader(logFile);
			logBufferedReader = new BufferedReader(logReader);
			String temp = logBufferedReader.readLine();
			//logger.error(" temp is  " + temp );
			while ( temp  != null) {
				Device device = JSON.parseObject(temp, Device.class); 
				//logger.error(" device is null ? " + ( device == null ) );
				
				String[][] s = new String[][] {
						{ device.getLid(), hbaseFamily[0], "lid" , device.getLid() } ,
						{ device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, 
						{ device.getLid(), hbaseFamily[2], "os", "2" },
						{ device.getLid(), hbaseFamily[2], "osv", "3" } };
				hu.writeMultRow(hbaseTable, s);
				logger.info(" hbase util end "   );
				temp = logBufferedReader.readLine();
			}
		} catch (Exception e) {
			logger.error(" DeviceReadThread error "   );
			e.printStackTrace();
		} finally { 
			try {
				logBufferedReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
			try {
				logReader.close();
			} catch (IOException e) { 
				e.printStackTrace();
			}
		}
	}

}

 

 

 

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.Properties;

public class HbaseStarter {

	public static void main(String[] args) throws  Exception {
		Properties properties=new Properties();
		//String config = "D:/work/util/aoi-hbase/trunk/src/main/resources/testua.properties";
		String config = "/home/aoi/aoi-hbase/conf/config.properties"; 
		FileInputStream fis = new FileInputStream(config);
		properties.load(fis);
		fis.close(); 
		
		String sourceFile=properties.getProperty("sourceFile")+"device2.log"+","+properties.getProperty("sourceFile")+"applist.log";
		String hbaseTable = properties.getProperty("hbaseTable");
		String hbaseFamily = properties.getProperty("hbaseFamily");
		String numFile=properties.getProperty("sourceFile")+"num.txt";
		
		
		String[] sourceFileName=sourceFile.split(",");  // file 
		String[] hbaseTableName=hbaseTable.split(",");  // table
		String[] hbaseFamilyName=hbaseFamily.split("&");     // family  
		
		
		DeviceReadThread device = new DeviceReadThread(sourceFileName[0],hbaseTableName[0],hbaseFamilyName[0].split(","),"","");
		new Thread(device).start();
		
		AppReadThread app = new AppReadThread(sourceFileName[1],hbaseTableName[1],hbaseFamilyName[1].split(","),numFile,"");
		new Thread(app).start();
		
	}
}
 

 

 

config.properties
sourceFile=//data//logs//
hbaseTable=device-ua,app-ua
hbaseFamily="device","history","Description"&"app", "history", "Description"
 

 

hbase-site-test_bj.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
        <property>
                <name>hbase.rootdir</name>
                <value>hdfs://xxx.com:9000/hbase</value>
        </property>
        <property>
                <name>hbase.cluster.distributed</name>
                <value>true</value>
        </property>
        <property>
                <name>hfile.block.cache.size</name>
                <value>0.4</value>
        </property>
        <property>
                <name>hbase.regionserver.handler.count</name>
                <value>150</value>
        </property>

        <property>
                <name>hbase.zookeeper.property.dataDir</name>
                <value>/var/lib/zookeeper</value>
        </property>

        <property>
                <name>hbase.zookeeper.property.clientPort</name>
                <value>2181</value>
        </property>
        <property>
                <name>hbase.zookeeper.quorum</name>
                <value>xxx.com,xxx.com,rabbitmq1</value>
        </property>

        <property>
                <name>zookeeper.session.timeout</name>
                <value>60000</value>
        </property>

        <property>
                <name>hbase.master.maxclockskew</name>
                <value>180000</value>
                <description>Time difference of regionserver from master</description>
        </property>
        <property>
                <name>hbase.hregion.memstore.flush.size</name>
                <value>512</value>
        </property>
        <property>
                <name>hbase.zookeeper.property.maxClientCnxns</name>
                <value>1000</value>
        </property>
        <property>
                <name>hbase.hregion.max.filesize</name>
                <value>1024</value>
        </property>
</configuration>

 

 

 

device2.log



 

 

 

 结果:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

捐助开发者

在兴趣的驱动下,写一个免费的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。



 
 
 谢谢您的赞助,我会做的更好!

 

 

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
分布式计算 Java Hadoop
java使用hbase、hadoop报错举例
java使用hbase、hadoop报错举例
97 4
|
4月前
|
Java BI 数据处理
如何在Java中实现Excel操作
如何在Java中实现Excel操作
|
2月前
|
IDE Java 分布式数据库
Apache HBase 落地JAVA 实战
Apache HBase 落地 Java 实战主要涉及使用 Java API 来操作 HBase 数据库,包括表的创建、删除、数据的插入、查询等操作。以下是一个基于 Java 的 HBase 实战指南,包括关键步骤和示例代码。
197 23
|
2月前
|
缓存 Java Linux
java操作hbase报错:KeeperErrorCode=NoNode for /hbase-unsecure/master
java操作hbase报错:KeeperErrorCode=NoNode for /hbase-unsecure/master
148 2
|
3月前
|
缓存 监控 Java
"Java垃圾回收太耗时?阿里HBase GC优化秘籍大公开,让你的应用性能飙升90%!"
【8月更文挑战第17天】阿里巴巴在HBase实践中成功将Java垃圾回收(GC)时间降低90%。通过选用G1垃圾回收器、精细调整JVM参数(如设置堆大小、目标停顿时间等)、优化代码减少内存分配(如使用对象池和缓存),并利用监控工具分析GC行为,有效缓解了高并发大数据场景下的性能瓶颈,极大提升了系统运行效率。
81 4
|
4月前
|
存储 Java 索引
Java ArrayList操作指南:如何移除并返回第一个元素
通过上述方法,你可以方便地从Java的 `ArrayList` 中移除并返回第一个元素。这种操作在日常编程中非常常见,是处理列表时的基本技能之一。希望这篇指南能帮助你更好地理解和运用Java的 `ArrayList`。
53 4
|
4月前
|
DataWorks 数据管理 大数据
DataWorks操作报错合集之在连接HBase时出现超时问题,该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
分布式计算 DataWorks Java
DataWorks操作报错合集之使用ODPS Tunnel Upload功能时,遇到报错:Java 堆内存不足,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4月前
|
SQL 缓存 Java
使用MyBatis优化Java持久层操作
使用MyBatis优化Java持久层操作
|
4月前
|
Java API 开发者
Java中的文件I/O操作详解
Java中的文件I/O操作详解
下一篇
无影云桌面