Phoenix-基于HBase的低延迟操作 头歌——答案

简介: Phoenix-基于HBase的低延迟操作 头歌——答案

第1关:Phoenix初识


package com.educoder.bigData.sparksql4;
import java.sql.Connection;  
import java.sql.DriverManager;  
import java.sql.SQLException;
public class Case1 {  
    static {  
        try {  
            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");  
        } catch (ClassNotFoundException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }
    public static void createEducoderTable1() {  
        Connection connection = null;  
        try {  
            connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181");  
            connection.createStatement().execute(  
                    "CREATE TABLE EDUCODER_TABLE1 (ID BIGINT not null primary key, info.BEGINTIME VARCHAR, ENDTIME VARCHAR, SALARY INTEGER, CITY VARCHAR)DATA_BLOCK_ENCODING='DIFF',VERSIONS=3,BLOCKSIZE='32000',MAX_FILESIZE=10000000");  
            connection.commit();  
        } catch (Exception e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
        finally {  
            if(connection != null) {  
                try {  
                    connection.close();  
                } catch (SQLException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
    public static void createEducoderTable2() {  
        Connection connection = null;  
        try {  
            connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181");  
            connection.createStatement().execute(  
                    "CREATE TABLE \"educoder_table2\" (\"id\" BIGINT not null primary key, \"info\".\"begintime\" VARCHAR, \"endtime\" VARCHAR, \"salary\" INTEGER, \"city\" VARCHAR)COMPRESSION='GZ',VERSIONS=5,BLOCKSIZE='65536',MAX_FILESIZE=20000000");  
            connection.commit();  
        } catch (Exception e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
        finally {  
            if(connection != null) {  
                try {  
                    connection.close();  
                } catch (SQLException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
        }  
    }
}


第2关 Phoenix 查询和更新


package com.educoder.bigData.sparksql4;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
public class Test2 extends InitHdb2{
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        try {
            connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181");
            connection.createStatement().executeUpdate("UPSERT INTO EDUCODER_TABLE1 VALUES('20190615','20190915', 1, 20, '上海') ON DUPLICATE KEY UPDATE SALARY = SALARY + 20");
            connection.createStatement().executeUpdate("UPSERT INTO EDUCODER_TABLE1 VALUES('20190618','20190918', 2, 10, '北京') ON DUPLICATE KEY UPDATE SALARY = SALARY + 20");
            connection.commit();
            queryTable(connection,"EDUCODER_TABLE1");
            connection.createStatement().executeUpdate("UPSERT INTO \"educoder_table2\"  SELECT * FROM EDUCODER_TABLE1 ");
            connection.commit();
            queryTable(connection,"educoder_table2");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally {
            if(connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    private static void queryTable(Connection connection, String tableName) throws SQLException {
        ResultSet executeQuery = connection.createStatement().executeQuery("select * from \""+tableName+"\"");
        ResultSetMetaData metaData = executeQuery.getMetaData();
        int n = 0;
        while (executeQuery.next()) {
            System.out.println(tableName + "第" + ++n + "条数据:");
            for (int j = 0; j < metaData.getColumnCount(); j++) {
                String col_name = metaData.getColumnName(j + 1);
                Object value = executeQuery.getObject(col_name);
                System.out.print(col_name + ":" + value);
                if(j != metaData.getColumnCount() -1) {
                    System.out.print(",");
                }
                else {
                    System.out.println();
                }
            }
        }
    }
}


第3关 Phoenix 二级索引


package com.educoder.bigData.sparksql4;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
public class Test3 extends InitHdb3{
  public static void main(String[] args) throws SQLException {
    Connection connection = null;
    /********* Begin *********/
        String sql = "select * from my_index limit 10";         
        connection = DriverManager.getConnection("jdbc:phoenix:127.0.0.1:2181");
        connection.createStatement().executeUpdate("CREATE  INDEX my_index ON EDUCODER_TABLE1 (salary) ");  
    /********* End *********/
    queryTable(sql, connection);
  }
  public static void queryTable(String sql,Connection connection) throws SQLException {
    ResultSet executeQuery = connection.createStatement().executeQuery(sql);
    ResultSetMetaData metaData = executeQuery.getMetaData();
    System.out.println("索引表数据:");
    while (executeQuery.next()) {
      for (int j = 0; j < metaData.getColumnCount(); j++) {
        String col_name = metaData.getColumnName(j + 1);
        Object value = executeQuery.getObject(col_name);
        System.out.print(col_name + ":" + value);
        if(j != metaData.getColumnCount() -1) {
          System.out.print(",");
        }
        else {
          System.out.println();
        }
      }
    }
  }
}


第4关 Phoenix Spark操作


package com.educoder.bigData.sparksql4;
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.sql.Dataset;  
import org.apache.spark.sql.Row;  
import org.apache.spark.sql.SaveMode;  
import org.apache.spark.sql.SparkSession;
public class Case4 {
    public static void case4(TableVo1 vo) {  
        SparkSession spark = SparkSession.builder().appName("test1").master("local").getOrCreate();  
        JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());  
        JavaRDD<TableVo1> parallelize = context.parallelize(Arrays.asList(vo));  
        Dataset<Row> df = spark.createDataFrame(parallelize, TableVo1.class);  
        df.write().mode(SaveMode.Overwrite).format("org.apache.phoenix.spark").option("zkUrl", "127.0.0.1:2181").option("table", "OUTPUT_TABLE").save();  
        spark.read().option("table", "OUTPUT_TABLE").option("zkUrl", "127.0.0.1:2181")  
        .format("org.apache.phoenix.spark").load().show();  
    }  
}
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
SQL 存储 分布式计算
【Hive】(二十三)简单几招教你如何解决 Hive 中小文件过多的问题
【Hive】(二十三)简单几招教你如何解决 Hive 中小文件过多的问题
1769 0
|
6月前
|
存储 SQL 分布式计算
技术心得记录:深入学习HBase架构原理
技术心得记录:深入学习HBase架构原理
|
7月前
|
SQL 关系型数据库 分布式数据库
Flink报错问题之用flush方法写入hbase报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
存储 算法 分布式数据库
HBase原理 | HBase内部探险
HBase原理 | HBase内部探险
121 0
|
监控 大数据 分布式数据库
|
分布式计算 分布式数据库 Spark
Phoenix-基于HBase的低延迟操作 头歌——答案
Phoenix-基于HBase的低延迟操作 头歌——答案
434 0
|
分布式计算 分布式数据库 Hbase
当HBase遇上MapReduce头歌答案
当HBase遇上MapReduce头歌答案
629 0
|
缓存 分布式数据库 Hbase
【HBase】(六)详解 HBase 的读、写流程(面试重点)
【HBase】(六)详解 HBase 的读、写流程(面试重点)
281 0
【HBase】(六)详解 HBase 的读、写流程(面试重点)
|
调度
HBase2.0 procedureV2原理简析
总体流程图 就绪区: 这部分的核心实现类是MasterProcedureScheduler,主要的作用就是对Procedure进行调度; 从排队的角度看,可以认为存在三层队列调度; type队列: type包含meta、server、table,,三者之间存在优先级:meta>server>t.
1956 0
HBase2.0 procedureV2原理简析
|
监控 Java 大数据
【HBase从入门到精通系列】如何避免HBase写入过快引起的各种问题
首先我们简单回顾下整个写入流程 client api ==> RPC ==> server IPC ==> RPC queue ==> RPC handler ==> write WAL ==> write memstore ==> flush to filesystem 整个写入流程从客户端调用API开始,数据会通过protobuf编码成一个请求,通过scoket实现的IPC模块被送达server的RPC队列中。
18173 0