大数据应用之HBase数据插入性能优化之多线程并行插入测试案例

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介:

一、引言:

  上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

二、源程序:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;

public class HBaseImportEx {
    static Configuration hbaseConfig = null;
    public static HTablePool pool = null;
    public static String tableName = "T_TEST_1";
    static{
         //conf = HBaseConfiguration.create();
         Configuration HBASE_CONFIG = new Configuration();
         HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");
         HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");
         HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
         hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);
         
         pool = new HTablePool(hbaseConfig, 1000); 
    }
    /*
     * Insert Test single thread
     * */
    public static void SingleThreadInsert()throws IOException
    {
        System.out.println("---------开始SingleThreadInsert测试----------");
        long start = System.currentTimeMillis();
        //HTableInterface table = null;
        HTable table = null;
        table = (HTable)pool.getTable(tableName);
        table.setAutoFlush(false);
        table.setWriteBufferSize(24*1024*1024);
        //构造测试数据
        List<Put> list = new ArrayList<Put>();
        int count = 10000;
        byte[] buffer = new byte[350];
        Random rand = new Random();
        for(int i=0;i<count;i++)
        {
            Put put = new Put(String.format("row %d",i).getBytes());
            rand.nextBytes(buffer);
            put.add("f1".getBytes(), null, buffer);
            //wal=false
            put.setWriteToWAL(false);
            list.add(put);    
            if(i%10000 == 0)
            {
                table.put(list);
                list.clear();    
                table.flushCommits();
            }            
        }
        long stop = System.currentTimeMillis();
        //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
          
        System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
        
        System.out.println("---------结束SingleThreadInsert测试----------");
    }
    /*
     * 多线程环境下线程插入函数 
     * 
     * */
    public static void InsertProcess()throws IOException
    {
        long start = System.currentTimeMillis();
        //HTableInterface table = null;
        HTable table = null;
        table = (HTable)pool.getTable(tableName);
        table.setAutoFlush(false);
        table.setWriteBufferSize(24*1024*1024);
        //构造测试数据
        List<Put> list = new ArrayList<Put>();
        int count = 10000;
        byte[] buffer = new byte[256];
        Random rand = new Random();
        for(int i=0;i<count;i++)
        {
            Put put = new Put(String.format("row %d",i).getBytes());
            rand.nextBytes(buffer);
            put.add("f1".getBytes(), null, buffer);
            //wal=false
            put.setWriteToWAL(false);
            list.add(put);    
            if(i%10000 == 0)
            {
                table.put(list);
                list.clear();    
                table.flushCommits();
            }            
        }
        long stop = System.currentTimeMillis();
        //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);
          
        System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
    }
    
    
    /*
     * Mutil thread insert test
     * */
    public static void MultThreadInsert() throws InterruptedException
    {
        System.out.println("---------开始MultThreadInsert测试----------");
        long start = System.currentTimeMillis();
        int threadNumber = 10;
        Thread[] threads=new Thread[threadNumber];
        for(int i=0;i<threads.length;i++)
        {
            threads[i]= new ImportThread();
            threads[i].start();            
        }
        for(int j=0;j< threads.length;j++)
        {
             (threads[j]).join();
        }
        long stop = System.currentTimeMillis();
          
        System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s");        
        System.out.println("---------结束MultThreadInsert测试----------");
    }    

    /**
     * @param args
     */
    public static void main(String[] args)  throws Exception{
        // TODO Auto-generated method stub
        //SingleThreadInsert();        
        MultThreadInsert();
        
        
    }
    
    public static class ImportThread extends Thread{
        public void HandleThread()
        {                        
            //this.TableName = "T_TEST_1";
        
            
        }
        //
        public void run(){
            try{
                InsertProcess();            
            }
            catch(IOException e){
                e.printStackTrace();                
            }finally{
                System.gc();
                }
            }            
        }

}

三、说明

1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

四、测试结果

---------开始MultThreadInsert测试----------

线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------

 备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。


相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
16天前
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
1月前
|
缓存 安全 Java
面试中的难题:线程异步执行后如何共享数据?
本文通过一个面试故事,详细讲解了Java中线程内部开启异步操作后如何安全地共享数据。介绍了异步操作的基本概念及常见实现方式(如CompletableFuture、ExecutorService),并重点探讨了volatile关键字、CountDownLatch和CompletableFuture等工具在线程间数据共享中的应用,帮助读者理解线程安全和内存可见性问题。通过这些方法,可以有效解决多线程环境下的数据共享挑战,提升编程效率和代码健壮性。
67 6
|
2月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
4月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
154 62
|
4月前
|
存储 设计模式 分布式计算
Java中的多线程编程:并发与并行的深度解析####
在当今软件开发领域,多线程编程已成为提升应用性能、响应速度及资源利用率的关键手段之一。本文将深入探讨Java平台上的多线程机制,从基础概念到高级应用,全面解析并发与并行编程的核心理念、实现方式及其在实际项目中的应用策略。不同于常规摘要的简洁概述,本文旨在通过详尽的技术剖析,为读者构建一个系统化的多线程知识框架,辅以生动实例,让抽象概念具体化,复杂问题简单化。 ####
|
5月前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
91 2
|
5月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
149 1
|
5月前
|
运维 监控 数据可视化
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
大数据-171 Elasticsearch ES-Head 与 Kibana 配置 使用 测试
242 1
|
5月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
91 4
|
5月前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
140 1

热门文章

最新文章