Hbase新人,水平低,见谅!数据存储量大概有190亿条,之前用的过时的方法:
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", connect);
HTable table=new HTable(conf, "GatewayDetailsUrlTable");
//将数据自动提交功能关闭 table.setAutoFlush(false);
//设置数据缓存区域 table.setWriteBufferSize(128*1024);
table.put(put);
table.flushCommits();
table.close();
最近在看新的方法:
先获得connection,
public class HbaseOperationUtil{
private static Connection connection = null;
private static final String ZKconnect="cmiot03.com:2181,cmiot04.com:2181,cmiot05.com:2181";
private static volatile HbaseOperationUtil instance;
private HbaseOperationUtil() {
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum",ZKconnect);
conf.set("hbase.client.write.buffer", "12582912");
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static HbaseOperationUtil getIstance() {
// 定义一个共有的静态方法,返回该类型实例
if (instance == null) {
// 对象实例化时与否判断(不使用同步代码块,instance不等于null时,直接返回对象,提高运行效率)
synchronized (HbaseOperationUtil.class) {
// 同步代码块(对象未初始化时,使用同步代码块,保证多线程访问时对象在第一次创建后,不再重复被创建)
if (instance == null) {
// 未初始化,则初始instance变量
instance = new HbaseOperationUtil();
}
}
}
return instance;
}
public Connection getConnection(){
return connection;
}
}
再用如下方法插入数据:
public class testi {
public static void main(String args) throws IOException, ParseException {
HbaseOperationUtil hy = HbaseOperationUtil.getIstance();
Connection connection = hy.getConnection();
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("SizeTest"));
int bestBathPutSize = 3177;
try(
BufferedMutator mutator = connection.getBufferedMutator(params);
){
List putLists = new ArrayList();
for(int count=0;count<10;count++){
int Max1=9999,Min1=1000;
int a = (int)Math.round(Math.random()*(Max1-Min1)+Min1);
String url=String.valueOf(a);
RondomDateTest rdt=new RondomDateTest();
String tim = rdt.testRondomDate();
int b = (int) (Math.random()*23);
String area=String.valueOf(b);
int c = (int) (Math.random()*9);
String seller=String.valueOf(c);
StringBuffer sb=new StringBuffer("CIOT00B");
int Max=99999,Min=10000;
int e = (int)Math.round(Math.random()*(Max-Min)+Min);
StringBuffer sba = sb.append(e);
String sn=new String(sba);
StringBuffer sb1=new StringBuffer("EC:8A:C7:20:21:0");
int d = (int) (Math.random()*5);
StringBuffer mac1 = sb1.append(d);
String mac=new String(mac1);
String rowkey=url+tim+sn;
/*String s="2018-10-04";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
Date date = simpleDateFormat.parse(s);
long ts = date.getTime();
System.out.println(ts);*/
/*long a1=8L;long b1=1546904000000L;
byte sk = Bytes.add(Longs.toByteArray(a1), Longs.toByteArray(b1));*/
Put put=new Put(rowkey.getBytes());
put.addColumn("info".getBytes(), "area".getBytes(), area.getBytes());
put.addColumn("info".getBytes(), "sel".getBytes(), seller.getBytes());
put.addColumn("info".getBytes(), "mac".getBytes(), mac.getBytes());
put.addColumn("info".getBytes(), "num".getBytes(), "3".getBytes());
put.addColumn("info".getBytes(), "tl".getBytes(), "1.2".getBytes());
put.addColumn("info".getBytes(), "us".getBytes(), "456/1234".getBytes());
put.addColumn("info".getBytes(), "ds".getBytes(),"541/1387".getBytes());
put.addColumn("info".getBytes(), "ms".getBytes(),"20".getBytes());
put.addColumn("info".getBytes(), "ct".getBytes(),"30".getBytes());
put.addColumn("info".getBytes(), "los".getBytes(),"23".getBytes());
put.setDurability(Durability.SKIP_WAL);
putLists.add(put);
if(putLists.size()==bestBathPutSize){
//达到最佳大小值了,马上提交一把
mutator.mutate(putLists);
mutator.flush();
putLists.clear();
}
} //剩下的未提交数据,最后做一次提交
mutator.mutate(putLists);
mutator.flush();
}catch (Exception e) {
// TODO: handle exception
}
}
}
请教一下,还有什么可以加快速度的配置吗
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
HBase BulkLoad批量写入数据可以参考社区文章HBase BulkLoad批量写入数据实战