大数据应用之HBase数据插入性能优化之多线程并行插入测试案例

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

一、引言:

  上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

二、源程序:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;

public class HBaseImportEx {
    static Configuration hbaseConfig = null;
    public static HTablePool pool = null;
    public static String tableName = "T_TEST_1";
    static{
         //conf = HBaseConfiguration.create();
         Configuration HBASE_CONFIG = new Configuration();
         HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
         HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
         HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
         hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
         
         pool = new HTablePool(hbaseConfig, 1000); 
    }
    /*
     * Insert Test single thread
     * */
    public static void SingleThreadInsert()throws IOException
    {
        System.out.println("---------开始SingleThreadInsert测试----------");
        long start = System.currentTimeMillis();
        //HTableInterface table = null;
        HTable table = null;
        table = (HTable)pool.getTable(tableName);
        table.setAutoFlush(false);
        table.setWriteBufferSize(24*1024*1024);
        //构造测试数据
        List<Put> list = new ArrayList<Put>();
        int count = 10000;
        byte[] buffer = new byte[350];
        Random rand = new Random();
        for(int i=0;i<count;i++)
        {
            Put put = new Put(String.format("row %d",i).getBytes());
            rand.nextBytes(buffer);
            put.add("f1".getBytes(), null, buffer);
            //wal=false
            put.setWriteToWAL(false);
            list.add(put);    
            if(i%10000 == 0)
            {
                table.put(list);
                list.clear();    
                table.flushCommits();
            }            
        }
        long stop = System.currentTimeMillis();
        //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
          
        System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
        
        System.out.println("---------结束SingleThreadInsert测试----------");
    }
    /*
     * 多线程环境下线程插入函数 
     * 
     * */
    public static void InsertProcess()throws IOException
    {
        long start = System.currentTimeMillis();
        //HTableInterface table = null;
        HTable table = null;
        table = (HTable)pool.getTable(tableName);
        table.setAutoFlush(false);
        table.setWriteBufferSize(24*1024*1024);
        //构造测试数据
        List<Put> list = new ArrayList<Put>();
        int count = 10000;
        byte[] buffer = new byte[256];
        Random rand = new Random();
        for(int i=0;i<count;i++)
        {
            Put put = new Put(String.format("row %d",i).getBytes());
            rand.nextBytes(buffer);
            put.add("f1".getBytes(), null, buffer);
            //wal=false
            put.setWriteToWAL(false);
            list.add(put);    
            if(i%10000 == 0)
            {
                table.put(list);
                list.clear();    
                table.flushCommits();
            }            
        }
        long stop = System.currentTimeMillis();
        //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
          
        System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
    }
    
    
    /*
     * Mutil thread insert test
     * */
    public static void MultThreadInsert() throws InterruptedException
    {
        System.out.println("---------开始MultThreadInsert测试----------");
        long start = System.currentTimeMillis();
        int threadNumber = 10;
        Thread[] threads=new Thread[threadNumber];
        for(int i=0;i<threads.length;i++)
        {
            threads[i]= new ImportThread();
            threads[i].start();            
        }
        for(int j=0;j< threads.length;j++)
        {
             (threads[j]).join();
        }
        long stop = System.currentTimeMillis();
          
        System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");        
        System.out.println("---------结束MultThreadInsert测试----------");
    }    

    /**
     * @param args
     */
    public static void main(String[] args)  throws Exception{
        // TODO Auto-generated method stub
        //SingleThreadInsert();        
        MultThreadInsert();
        
        
    }
    
    public static class ImportThread extends Thread{
        public void HandleThread()
        {                        
            //this.TableName = "T_TEST_1";
        
            
        }
        //
        public void run(){
            try{
                InsertProcess();            
            }
            catch(IOException e){
                e.printStackTrace();                
            }finally{
                System.gc();
                }
            }            
        }

}

三、说明

1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

四、测试结果

---------开始MultThreadInsert测试----------

线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------

 备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
2月前
|
开发框架 .NET Java
C#集合数据去重的5种方式及其性能对比测试分析
C#集合数据去重的5种方式及其性能对比测试分析
38 11
|
2月前
|
开发框架 .NET Java
C#集合数据去重的5种方式及其性能对比测试分析
C#集合数据去重的5种方式及其性能对比测试分析
55 10
|
3月前
|
机器学习/深度学习 算法 UED
在数据驱动时代,A/B 测试成为评估机器学习项目不同方案效果的重要方法
在数据驱动时代,A/B 测试成为评估机器学习项目不同方案效果的重要方法。本文介绍 A/B 测试的基本概念、步骤及其在模型评估、算法改进、特征选择和用户体验优化中的应用,同时提供 Python 实现示例,强调其在确保项目性能和用户体验方面的关键作用。
68 6
|
28天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
214 92
|
3月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
758 7
|
3月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
98 2
|
3月前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
155 1
|
26天前
|
存储 搜索推荐 大数据
数据大爆炸:解析大数据的起源及其对未来的启示
数据大爆炸:解析大数据的起源及其对未来的启示
91 15
数据大爆炸:解析大数据的起源及其对未来的启示
|
18天前
|
分布式计算 大数据 流计算
玩转数据:初学者的大数据处理工具指南
玩转数据:初学者的大数据处理工具指南
70 14