大数据应用之HBase数据插入性能优化之多线程并行插入测试案例

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 一、引言:   上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。

一、引言:

  上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

二、源程序:

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.hbase.HBaseConfiguration;
  3 import java.io.BufferedReader;
  4 import java.io.File;
  5 import java.io.FileNotFoundException;
  6 import java.io.FileReader;
  7 import java.io.IOException;
  8 import java.util.ArrayList;
  9 import java.util.List;
 10 import java.util.Random;
 11 
 12 import org.apache.hadoop.conf.Configuration;
 13 import org.apache.hadoop.hbase.HBaseConfiguration;
 14 import org.apache.hadoop.hbase.client.HBaseAdmin;
 15 import org.apache.hadoop.hbase.client.HTable;
 16 import org.apache.hadoop.hbase.client.HTableInterface;
 17 import org.apache.hadoop.hbase.client.HTablePool;
 18 import org.apache.hadoop.hbase.client.Put;
 19 
 20 public class HBaseImportEx {
 21     static Configuration hbaseConfig = null;
 22     public static HTablePool pool = null;
 23     public static String tableName = "T_TEST_1";
 24     static{
 25          //conf = HBaseConfiguration.create();
 26          Configuration HBASE_CONFIG = new Configuration();
 27          HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
 28          HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
 29          HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
 30          hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
 31          
 32          pool = new HTablePool(hbaseConfig, 1000); 
 33     }
 34     /*
 35      * Insert Test single thread
 36      * */
 37     public static void SingleThreadInsert()throws IOException
 38     {
 39         System.out.println("---------开始SingleThreadInsert测试----------");
 40         long start = System.currentTimeMillis();
 41         //HTableInterface table = null;
 42         HTable table = null;
 43         table = (HTable)pool.getTable(tableName);
 44         table.setAutoFlush(false);
 45         table.setWriteBufferSize(24*1024*1024);
 46         //构造测试数据
 47         List<Put> list = new ArrayList<Put>();
 48         int count = 10000;
 49         byte[] buffer = new byte[350];
 50         Random rand = new Random();
 51         for(int i=0;i<count;i++)
 52         {
 53             Put put = new Put(String.format("row %d",i).getBytes());
 54             rand.nextBytes(buffer);
 55             put.add("f1".getBytes(), null, buffer);
 56             //wal=false
 57             put.setWriteToWAL(false);
 58             list.add(put);    
 59             if(i%10000 == 0)
 60             {
 61                 table.put(list);
 62                 list.clear();    
 63                 table.flushCommits();
 64             }            
 65         }
 66         long stop = System.currentTimeMillis();
 67         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
 68           
 69         System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
 70         
 71         System.out.println("---------结束SingleThreadInsert测试----------");
 72     }
 73     /*
 74      * 多线程环境下线程插入函数 
 75      * 
 76      * */
 77     public static void InsertProcess()throws IOException
 78     {
 79         long start = System.currentTimeMillis();
 80         //HTableInterface table = null;
 81         HTable table = null;
 82         table = (HTable)pool.getTable(tableName);
 83         table.setAutoFlush(false);
 84         table.setWriteBufferSize(24*1024*1024);
 85         //构造测试数据
 86         List<Put> list = new ArrayList<Put>();
 87         int count = 10000;
 88         byte[] buffer = new byte[256];
 89         Random rand = new Random();
 90         for(int i=0;i<count;i++)
 91         {
 92             Put put = new Put(String.format("row %d",i).getBytes());
 93             rand.nextBytes(buffer);
 94             put.add("f1".getBytes(), null, buffer);
 95             //wal=false
 96             put.setWriteToWAL(false);
 97             list.add(put);    
 98             if(i%10000 == 0)
 99             {
100                 table.put(list);
101                 list.clear();    
102                 table.flushCommits();
103             }            
104         }
105         long stop = System.currentTimeMillis();
106         //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
107           
108         System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
109     }
110     
111     
112     /*
113      * Mutil thread insert test
114      * */
115     public static void MultThreadInsert() throws InterruptedException
116     {
117         System.out.println("---------开始MultThreadInsert测试----------");
118         long start = System.currentTimeMillis();
119         int threadNumber = 10;
120         Thread[] threads=new Thread[threadNumber];
121         for(int i=0;i<threads.length;i++)
122         {
123             threads[i]= new ImportThread();
124             threads[i].start();            
125         }
126         for(int j=0;j< threads.length;j++)
127         {
128              (threads[j]).join();
129         }
130         long stop = System.currentTimeMillis();
131           
132         System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");        
133         System.out.println("---------结束MultThreadInsert测试----------");
134     }    
135 
136     /**
137      * @param args
138      */
139     public static void main(String[] args)  throws Exception{
140         // TODO Auto-generated method stub
141         //SingleThreadInsert();        
142         MultThreadInsert();
143         
144         
145     }
146     
147     public static class ImportThread extends Thread{
148         public void HandleThread()
149         {                        
150             //this.TableName = "T_TEST_1";
151         
152             
153         }
154         //
155         public void run(){
156             try{
157                 InsertProcess();            
158             }
159             catch(IOException e){
160                 e.printStackTrace();                
161             }finally{
162                 System.gc();
163                 }
164             }            
165         }
166 
167 }

三、说明

1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

四、测试结果

---------开始MultThreadInsert测试----------

线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------

 备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。


作者:张子良
出处:http://www.cnblogs.com/hadoopdev
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
563 1
|
4天前
|
关系型数据库 MySQL 数据处理
针对MySQL亿级数据的高效插入策略与性能优化技巧
在处理MySQL亿级数据的高效插入和性能优化时,以上提到的策略和技巧可以显著提升数据处理速度,减少系统负担,并保持数据的稳定性和一致性。正确实施这些策略需要深入理解MySQL的工作原理和业务需求,以便做出最适合的配置调整。
29 6
|
28天前
|
JSON NoSQL MongoDB
MongoDB Schema设计实战指南:优化数据结构,提升查询性能与数据一致性
【8月更文挑战第24天】MongoDB是一款领先的NoSQL数据库,其灵活的文档模型突破了传统关系型数据库的限制。它允许自定义数据结构,适应多样化的数据需求。设计MongoDB的Schema时需考虑数据访问模式、一致性需求及性能因素。设计原则强调简洁性、查询优化与合理使用索引。例如,在构建博客系统时,可以通过精心设计文章和用户的集合结构来提高查询效率并确保数据一致性。正确设计能够充分发挥MongoDB的优势,实现高效的数据管理。
37 3
|
3月前
|
存储 分布式计算 Hadoop
Hadoop性能优化存储效率
【6月更文挑战第5天】
75 7
|
3月前
|
存储 SQL 分布式计算
技术心得记录:深入学习HBase架构原理
技术心得记录:深入学习HBase架构原理
|
4月前
|
NoSQL Java 关系型数据库
实时计算 Flink版产品使用合集之实现存量读取时采用多线程、增量读取时采用单线程如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
缓存 分布式计算 分布式数据库
巧用ChatGPT 解决 Hbase 快照方式读性能优化问题
巧用ChatGPT 解决 Hbase 快照方式读性能优化问题
78 0
|
4月前
|
存储 SQL Java
jvm性能调优实战 - 27亿级数据量的实时分析引擎,为啥频繁发生Full GC
jvm性能调优实战 - 27亿级数据量的实时分析引擎,为啥频繁发生Full GC
80 0
|
4月前
|
存储 关系型数据库 MySQL
史上最全MySQL剖析:优化+存储+查询+索引+复制+可扩展+高可用
在互联网行业,MySQL数据库毫无疑问已经是最常用的数据库,LAMP (Linux +Apache + MySQL + PHP)甚至已经成为专有名词,也是很多中小网站建站的首选技术架构。
|
存储 分布式计算 关系型数据库