HBase Java编程示例

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: HelloWorld.zip 点击(此处)折叠或打开 package elementary; import java.
img_e25d4fb2f8de1caf41a735ec53088516.pngHelloWorld.zip

点击(此处)折叠或打开

  1. package elementary;

  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.TimeUnit;

  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.hbase.Cell;
  13. import org.apache.hadoop.hbase.HBaseConfiguration;
  14. import org.apache.hadoop.hbase.HColumnDescriptor;
  15. import org.apache.hadoop.hbase.HTableDescriptor;
  16. import org.apache.hadoop.hbase.MasterNotRunningException;
  17. import org.apache.hadoop.hbase.TableName;
  18. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  19. import org.apache.hadoop.hbase.client.Delete;
  20. import org.apache.hadoop.hbase.client.Get;
  21. import org.apache.hadoop.hbase.client.Admin;
  22. import org.apache.hadoop.hbase.client.BufferedMutator;
  23. import org.apache.hadoop.hbase.client.BufferedMutatorParams;
  24. import org.apache.hadoop.hbase.client.Connection;
  25. import org.apache.hadoop.hbase.client.ConnectionFactory;
  26. import org.apache.hadoop.hbase.client.Table;
  27. import org.apache.hadoop.hbase.client.Put;
  28. import org.apache.hadoop.hbase.client.Result;
  29. import org.apache.hadoop.hbase.client.ResultScanner;
  30. import org.apache.hadoop.hbase.client.Scan;
  31. import org.apache.hadoop.hbase.util.Bytes;
  32. import org.apache.hadoop.util.ThreadUtil;

  33. public class HelloWorld {
  34.     private static Configuration conf = null;
  35.     private static Connection conn = null;
  36.     private static Admin admin = null;
  37.     public static AtomicInteger count = new AtomicInteger();

  38.     /**
  39.      * 初始化配置
  40.      */
  41.     static {
  42.         conf = HBaseConfiguration.create();
  43.         //如果沒有配置文件,一定要記得手動宣告

  44.         conf.set("hbase.zookeeper.quorum", "10.148.137.143");
  45.         conf.set("hbase.zookeeper.property.clientPort", "2181");
  46.     }
  47.     
  48.     static {
  49.         try {
  50.          conn = ConnectionFactory.createConnection();
  51.      admin = conn.getAdmin();
  52.      } catch (IOException e) {
  53.      e.printStackTrace();
  54.      }
  55.     }

  56.     static public class MyThread extends Thread
  57.     {
  58.         int _start;
  59.         String _tablename;
  60.         Connection conn;
  61.         //BufferedMutator table;
  62.         Table table;

  63.         public MyThread(int start, String tablename) {
  64.             _start = start;
  65.             _tablename = tablename;
  66.         }
  67.         
  68.         public void run() {
  69.             String tablename = _tablename;
  70.             Thread current = Thread.currentThread();
  71.             long thread_id = current.getId();
  72.             System.out.printf("thread[%d] run\n", thread_id);
  73.             
  74.             try {
  75.                 conn = ConnectionFactory.createConnection();
  76.                 //BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
  77.                 //params.writeBufferSize(1024 * 4);
  78.                 //table = conn.getBufferedMutator(params);
  79.                 table = conn.getTable(TableName.valueOf(tablename));

  80.                 for (int j=_start; j100; ++j) {
  81.                     for (int i=0; i10000000; ++i) {
  82.                         // zkb_0_0
  83.                         String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i);
  84.                         Put put = new Put(Bytes.toBytes(zkb));
  85.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0)));                  
  86.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1)));
  87.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2)));
  88.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3)));
  89.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4)));
  90.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5)));
  91.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6)));
  92.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7)));
  93.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8)));
  94.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9)));
  95.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10)));
  96.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11)));
  97.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12)));
  98.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13)));
  99.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14)));
  100.                         //table.mutate(put);
  101.                         table.put(put);

  102.              int m = HelloWorld.count.incrementAndGet();
  103.              if (m % 10000 == 0) {
  104.                  Date dt = new Date();
  105.                  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa");
  106.                  String now = sdf.format(dt);
  107.                  System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i);
  108.              }
  109.                     }
  110.                 }

  111.                 System.out.printf("thread[%d] over\n", thread_id);
  112.             }
  113.             catch (Exception e) {
  114.                 e.printStackTrace();
  115.             }
  116.         }
  117.     }
  118.     
  119.     /**
  120.      * 建立表格
  121.      * @param tablename
  122.      * @param cfs
  123.      */
  124.     public static void createTable(String tablename, String[] cfs){
  125.         try {
  126.             if (admin.tableExists(TableName.valueOf(tablename))) {
  127.                 System.out.println("table already exists!");
  128.             } else {
  129.                 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
  130.                 for (int i = 0; i cfs.length; i++) {
  131.                     HColumnDescriptor desc = new HColumnDescriptor(cfs[i]);
  132.                     desc.setMaxVersions(3650);     
  133.                     tableDesc.addFamily(desc);
  134.                 }

  135.                 byte[][] splitKeys = new byte[][] {
  136.                     Bytes.toBytes("zkb_0_0"),
  137.                     Bytes.toBytes("zkb_10_0"),
  138.                     Bytes.toBytes("zkb_20_0"),
  139.                     Bytes.toBytes("zkb_30_0"),
  140.                     Bytes.toBytes("zkb_40_0"),
  141.                     Bytes.toBytes("zkb_50_0"),
  142.                     Bytes.toBytes("zkb_60_0"),
  143.                     Bytes.toBytes("zkb_70_0"),
  144.                     Bytes.toBytes("zkb_80_0"),
  145.                     Bytes.toBytes("zkb_90_0"),
  146.                     Bytes.toBytes("zkb_100_0")
  147.                 };
  148.                 admin.createTable(tableDesc, splitKeys);
  149.                 admin.close();
  150.                 System.out.println("create table " + tablename + " ok.");
  151.             }
  152.         } catch (MasterNotRunningException e) {
  153.             e.printStackTrace();
  154.         } catch (ZooKeeperConnectionException e) {
  155.             e.printStackTrace();
  156.         } catch (IOException e) {
  157.             e.printStackTrace();
  158.         }
  159.     }
  160.     
  161.     /**
  162.      * 刪除表格
  163.      * @param tablename
  164.      */
  165.     public static void deleteTable(String tablename){
  166.         try {
  167.             //Connection conn = ConnectionFactory.createConnection();
  168.             //Admin admin = conn.getAdmin();     
  169.             admin.disableTable(TableName.valueOf(tablename));
  170.             admin.deleteTable(TableName.valueOf(tablename));
  171.             System.out.println("delete table " + tablename + " ok.");
  172.         } catch (IOException e) {
  173.             e.printStackTrace();
  174.         }
  175.     }

  176.     /**
  177.      * 刪除一筆資料
  178.      * @param tableName
  179.      * @param rowKey
  180.      */
  181.     public static void delRecord (String tableName, String rowKey){
  182.         try {
  183.             Table table = conn.getTable(TableName.valueOf(tableName));
  184.             
  185.             ListDelete> list = new ArrayListDelete>();
  186.             Delete del = new Delete(rowKey.getBytes());
  187.             list.add(del);
  188.             table.delete(list);
  189.             System.out.println("del recored " + rowKey + " ok.");
  190.         } catch (IOException e) {
  191.             e.printStackTrace();
  192.         }
  193.     }
  194.     
  195.     /**
  196.      * 取得一筆資料
  197.      * @param tableName
  198.      * @param rowKey
  199.      */
  200.     public static void getOneRecord (String tableName, String rowKey){
  201.         try {
  202.             Table table = conn.getTable(TableName.valueOf(tableName));
  203.             
  204.             Get get = new Get(rowKey.getBytes());
  205.             Result rs = table.get(get);
  206.             ListCell> list = rs.listCells();
  207.             for(Cell cell:list){
  208.                 System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  209.                 System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  210.                 System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  211.                 System.out.print(cell.getTimestamp() + " " );
  212.                 System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  213.                 System.out.println("");
  214.             }
  215.         } catch (IOException e) {
  216.             e.printStackTrace();
  217.         }
  218.     }
  219.     
  220.     /**
  221.      * 取得所有資料
  222.      * @param tableName
  223.      */
  224.     public static void getAllRecord (String tableName) {
  225.         try{
  226.             //Connection conn = ConnectionFactory.createConnection();
  227.             Table table = conn.getTable(TableName.valueOf(tableName));
  228.             
  229.             Scan scan = new Scan();
  230.             ResultScanner resultscanner = table.getScanner(scan);
  231.             for(Result rs:resultscanner){
  232.                 ListCell> list = rs.listCells();
  233.                 for(Cell cell:list){
  234.                     System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  235.                     System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  236.                     System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  237.                     System.out.print(cell.getTimestamp() + " " );
  238.                     System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  239.                     System.out.println("");
  240.                 }
  241.             }
  242.         } catch (IOException e){
  243.             e.printStackTrace();
  244.         }
  245.     }
  246.     
  247.     /**
  248.      * 取得Family清單
  249.      * @param tableName
  250.      * @return
  251.      */
  252.     public static ArrayListString> getAllFamilyName(String tableName) {
  253.         ArrayListString> familyname_list = new ArrayListString>();
  254.         try{
  255.             //Connection conn = ConnectionFactory.createConnection();
  256.             Table table = conn.getTable(TableName.valueOf(tableName));
  257.             
  258.             HTableDescriptor htabledescriptor = table.getTableDescriptor();
  259.             HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies();
  260.             for(int i=0;ihdlist.length;i++){
  261.                 HColumnDescriptor hd = hdlist[i];
  262.                 familyname_list.add(hd.getNameAsString());
  263.             }
  264.         } catch (IOException e){
  265.             e.printStackTrace();
  266.         }
  267.         return familyname_list;
  268.     }

  269.     // java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5
  270.     public static void main(String[] args) {
  271.         System.out.println("HelloWorldX");
  272.         if (args.length > 0)
  273.             System.out.println(args[0]);
  274.         
  275.         int start = 0;
  276.         if (args.length > 1)
  277.             start = Integer.valueOf(args[1]);
  278.         if (start 0)
  279.             start = 0;
  280.         
  281.         int num_threads = 16;
  282.         if (args.length > 2)
  283.             num_threads = Integer.valueOf(args[2]);
  284.         
  285.         try {
  286.             String tablename = "scores";
  287.             String[] familys = {"grade", "course"};
  288.             HelloWorld.createTable(tablename, familys);

  289.             //ExecutorService thread_pool = Executors.newSingleThreadExecutor();
  290.             ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads);
  291.             Thread[] pool = new HelloWorld.MyThread[80];
  292.             for (int i=0; ipool.length; ++i) {
  293.                 pool[i] = new HelloWorld.MyThread(i, tablename);
  294.                 thread_pool.execute(pool[i]);
  295.             }
  296.             
  297.             thread_pool.shutdown();
  298.             System.out.println("over");
  299.         }
  300.         catch (Exception e) {
  301.             e.printStackTrace();
  302.         }
  303.     }
  304.     
  305. }

相关实践学习
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
14天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
15天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
11天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
15天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
17天前
|
缓存 安全 Java
Java并发编程进阶:深入理解Java内存模型
【4月更文挑战第6天】Java内存模型(JMM)是多线程编程的关键,定义了线程间共享变量读写的规则,确保数据一致性和可见性。主要包括原子性、可见性和有序性三大特性。Happens-Before原则规定操作顺序,内存屏障和锁则保障这些原则的实施。理解JMM和相关机制对于编写线程安全、高性能的Java并发程序至关重要。
|
4天前
|
IDE Java 物联网
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
《Java 简易速速上手小册》第1章:Java 编程基础(2024 最新版)
10 0
|
4天前
|
安全 Java 开发者
Java并发编程:深入理解Synchronized关键字
【4月更文挑战第19天】 在Java多线程编程中,为了确保数据的一致性和线程安全,我们经常需要使用到同步机制。其中,`synchronized`关键字是最为常见的一种方式,它能够保证在同一时刻只有一个线程可以访问某个对象的特定代码段。本文将深入探讨`synchronized`关键字的原理、用法以及性能影响,并通过具体示例来展示如何在Java程序中有效地应用这一技术。
|
5天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
6天前
|
缓存 分布式计算 监控
Java并发编程:深入理解线程池
【4月更文挑战第17天】在Java并发编程中,线程池是一种非常重要的技术,它可以有效地管理和控制线程的执行,提高系统的性能和稳定性。本文将深入探讨Java线程池的工作原理,使用方法以及在实际开发中的应用场景,帮助读者更好地理解和使用Java线程池。
|
6天前
|
Java API 数据库
深研Java异步编程:CompletableFuture与反应式编程范式的融合实践
【4月更文挑战第17天】本文探讨了Java中的CompletableFuture和反应式编程在提升异步编程体验上的作用。CompletableFuture作为Java 8引入的Future扩展,提供了一套流畅的链式API,简化异步操作,如示例所示的非阻塞数据库查询。反应式编程则关注数据流和变化传播,通过Reactor等框架实现高度响应的异步处理。两者结合,如将CompletableFuture转换为Mono或Flux,可以兼顾灵活性和资源管理,适应现代高并发环境的需求。开发者可按需选择和整合这两种技术,优化系统性能和响应能力。