import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes; import org.mortbay.log.Log; public class HbaseUtil implements IOperator { private static Configuration conf = null; private static String configFile = "hbase-site-test_bj.xml"; private Map<String, String> aMap = null; private String mapTable = null; private String[] tableFamily = null; public HbaseUtil() { } public HbaseUtil( String mapAppTable , String[] appTableFamily ) { this.aMap = new HashMap<String, String>(); this.mapTable = mapAppTable; this.tableFamily = appTableFamily; } static { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.addResource(configFile); conf = HBaseConfiguration.create(HBASE_CONFIG); System.err.println(conf.get("hbase.zookeeper.property.dataDir")); } /** * 创建表操作 * * @throws IOException */ public void createTable(String tablename, String[] cfs) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tablename)) { System.out.println("表已经存在!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tablename); for (int i = 0; i < cfs.length; i++) { tableDesc.addFamily(new HColumnDescriptor(cfs[i])); } admin.createTable(tableDesc); System.out.println("表创建成功!"); } admin.close(); } /** * 删除表操作 * * @param tablename * @throws IOException */ public void deleteTable(String tablename) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(tablename)) { System.out.println("table(" + tablename + ") not exists, won't delete"); } else { admin.disableTable(tablename); admin.deleteTable(tablename); System.out.println("table(" + tablename + ") delete success"); } admin.close(); } public void insertRow() throws IOException { HTable table = new HTable(conf, "test"); Put put = new Put(Bytes.toBytes("row3")); put.add(Bytes.toBytes("cf"), Bytes.toBytes("444"), Bytes.toBytes("value444")); table.put(put); table.close(); } /** * 插入一行记录 * * @param tablename * @param cfs * @throws IOException */ public void writeRow(String tablename, String[] cfs) throws IOException { HTable table = new HTable(conf, tablename); Put put = new Put(Bytes.toBytes(cfs[0])); put.add(Bytes.toBytes(cfs[1]), Bytes.toBytes(cfs[2]), Bytes.toBytes(cfs[3])); table.put(put); System.out.println("写入成功!"); table.close(); } // 写多条记录 public void writeMultRow(String tablename, String[][] cfs) throws IOException { List<Put> lists = new ArrayList<Put>(); HTable table = new HTable(conf, tablename); for (int i = 0; i < cfs.length; i++) { Put put = new Put(Bytes.toBytes(cfs[i][0])); put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3])); lists.add(put); } table.put(lists); table.close(); } // 写多条记录 public void writeMultRowByDevice(HTable table, String tablename, String[][] cfs) throws IOException { List<Put> lists = new ArrayList<Put>(); // HTable table = new HTable(conf, tablename); for (int i = 0; i < cfs.length; i++) { Put put = new Put(Bytes.toBytes(cfs[i][0])); Log.info("writeMultRowByDevice "+Bytes.toBytes(cfs[i][1])+"="+Bytes.toBytes(cfs[i][2])+"="+Bytes.toBytes(cfs[i][3])); put.add(Bytes.toBytes(cfs[i][1]), Bytes.toBytes(cfs[i][2]), Bytes.toBytes(cfs[i][3])); lists.add(put); } Log.info("push start"); table.put(lists); Log.info("push end"); } /** * 删除一行记录 * * @param tablename * @param rowkey * @throws IOException */ public void deleteRow(String tablename, String rowkey) throws IOException { HTable table = new HTable(conf, tablename); List<Delete> list = new ArrayList<Delete>(); Delete d1 = new Delete(rowkey.getBytes()); list.add(d1); table.delete(list); System.out.println("delete row(" + rowkey + ") sucess"); table.close(); } /** * 查找一行记录 * * @param tablename * @param rowkey */ public void selectRow(String tablename, String rowKey) throws IOException { HTable table = new HTable(conf, tablename); Get g = new Get(rowKey.getBytes()); // g.addColumn(Bytes.toBytes("cf:1")); Result rs = table.get(g); for (KeyValue kv : rs.raw()) { System.out.print(new String(kv.getRow()) + " "); System.out.print(new String(kv.getFamily()) + ":"); System.out.print(new String(kv.getQualifier()) + " "); System.out.print(kv.getTimestamp() + " "); System.out.println(new String(kv.getValue())); } table.close(); } /** * 查询表中所有行 * * @param tablename * @throws IOException */ public void scaner(String tablename) throws IOException { HTable table = new HTable(conf, tablename); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { KeyValue[] kv = r.raw(); // for (int i = 0; i < kv.length; i++) { /* * System.out.print(new String(kv[i].getRow()) + " "); * System.out.print(new String(kv[i].getFamily()) + ":"); * System.out.print(new String(kv[i].getQualifier()) + " "); * System.out.print(kv[i].getTimestamp() + " "); * System.out.println(new String(kv[i].getValue())); */ System.out.println(new String(kv[1].getValue()) + "==" + new String(kv[0].getValue())); // } } rs.close(); table.close(); } public void scanByTimestamp(String tablename, long maxtime) throws IOException { HTable table = new HTable(conf, tablename); Scan s = new Scan(); // TODO 存放所有的结果 FilterList allInfo = new FilterList(); // allInfo.addFilter(); s.setFilter(allInfo); } public Map<String, String> getMap() { Map<String, String> map = new HashMap<String, String>(); try { HTable table = new HTable(conf, mapTable); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { KeyValue[] kv = r.raw(); map.put(new String(kv[0].getRow()), new String(kv[0].getValue())); } } catch (IOException e) { e.printStackTrace(); } return map; } }
import java.io.IOException; import java.util.Map; public interface IOperator { public void createTable(String tablename, String[] cfs) throws IOException ; public void deleteTable(String tablename) throws IOException; public void insertRow() throws IOException; public void writeRow(String tablename, String[] cfs) throws IOException; public void writeMultRow(String tablename, String[][] cfs) throws IOException; public void deleteRow(String tablename, String rowkey) throws IOException; public void selectRow(String tablename, String rowKey) throws IOException; public void scaner(String tablename) throws IOException; public void scanByTimestamp(String tablename, long maxtime) throws IOException; public Map<String, String> getMap() throws IOException; }
public abstract class BaseRunnabler implements Runnable{ String sourceFile=""; // 读取文件路径 String numberFile=""; String hbaseTable=""; // hbase 表名 String [] hbaseFamily=null; // 行列簇名 String keywords =""; public BaseRunnabler(String sourceFile,String hbaseTable,String [] hbaseFamily,String numberFile ,String keywords ){ this.sourceFile=sourceFile; this.numberFile=numberFile; this.hbaseTable=hbaseTable; this.hbaseFamily = hbaseFamily; this.keywords = keywords; } @Override public void run() { try{ IOperator hu = new HbaseUtil( hbaseTable,hbaseFamily); hu.createTable(hbaseTable,hbaseFamily ); processFile(hu ); }catch (Exception e) { e.printStackTrace(); } } public abstract void processFile(IOperator hu) throws Exception; }
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import Model.Device; import com.alibaba.fastjson.JSON; public class DeviceReadThread extends BaseRunnabler { static Logger logger = LoggerFactory.getLogger(DeviceReadThread.class); public DeviceReadThread(String sourceFile, String hbaseTable, String[] hbaseFamily, String numberFile, String keywords) { super(sourceFile, hbaseTable, hbaseFamily, numberFile, keywords); } @Override public void processFile(IOperator hu) { FileReader logReader = null; BufferedReader logBufferedReader = null; try { File logFile = new File(sourceFile); logReader = new FileReader(logFile); logBufferedReader = new BufferedReader(logReader); String temp = logBufferedReader.readLine(); //logger.error(" temp is " + temp ); while ( temp != null) { Device device = JSON.parseObject(temp, Device.class); //logger.error(" device is null ? " + ( device == null ) ); String[][] s = new String[][] { { device.getLid(), hbaseFamily[0], "lid" , device.getLid() } , { device.getLid(), hbaseFamily[1], "date", (new Date()).toString() }, { device.getLid(), hbaseFamily[2], "os", "2" }, { device.getLid(), hbaseFamily[2], "osv", "3" } }; hu.writeMultRow(hbaseTable, s); logger.info(" hbase util end " ); temp = logBufferedReader.readLine(); } } catch (Exception e) { logger.error(" DeviceReadThread error " ); e.printStackTrace(); } finally { try { logBufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } try { logReader.close(); } catch (IOException e) { e.printStackTrace(); } } } }
import java.io.FileInputStream; import java.io.FileNotFoundException; import java.util.Properties; public class HbaseStarter { public static void main(String[] args) throws Exception { Properties properties=new Properties(); //String config = "D:/work/util/aoi-hbase/trunk/src/main/resources/testua.properties"; String config = "/home/aoi/aoi-hbase/conf/config.properties"; FileInputStream fis = new FileInputStream(config); properties.load(fis); fis.close(); String sourceFile=properties.getProperty("sourceFile")+"device2.log"+","+properties.getProperty("sourceFile")+"applist.log"; String hbaseTable = properties.getProperty("hbaseTable"); String hbaseFamily = properties.getProperty("hbaseFamily"); String numFile=properties.getProperty("sourceFile")+"num.txt"; String[] sourceFileName=sourceFile.split(","); // file String[] hbaseTableName=hbaseTable.split(","); // table String[] hbaseFamilyName=hbaseFamily.split("&"); // family DeviceReadThread device = new DeviceReadThread(sourceFileName[0],hbaseTableName[0],hbaseFamilyName[0].split(","),"",""); new Thread(device).start(); AppReadThread app = new AppReadThread(sourceFileName[1],hbaseTableName[1],hbaseFamilyName[1].split(","),numFile,""); new Thread(app).start(); } }
config.properties
sourceFile=//data//logs// hbaseTable=device-ua,app-ua hbaseFamily="device","history","Description"&"app", "history", "Description"
hbase-site-test_bj.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property> <name>hbase.rootdir</name> <value>hdfs://xxx.com:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hfile.block.cache.size</name> <value>0.4</value> </property> <property> <name>hbase.regionserver.handler.count</name> <value>150</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/var/lib/zookeeper</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>xxx.com,xxx.com,rabbitmq1</value> </property> <property> <name>zookeeper.session.timeout</name> <value>60000</value> </property> <property> <name>hbase.master.maxclockskew</name> <value>180000</value> <description>Time difference of regionserver from master</description> </property> <property> <name>hbase.hregion.memstore.flush.size</name> <value>512</value> </property> <property> <name>hbase.zookeeper.property.maxClientCnxns</name> <value>1000</value> </property> <property> <name>hbase.hregion.max.filesize</name> <value>1024</value> </property> </configuration>
device2.log
结果:
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!