Spark查询Hbase小案例

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 写作目的1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下3)好久没发博客了,水一篇

版本


Scala 2.11.1

Spark 2.11

HBase 2.0.5


代码


其中hbase-site.xml为hbase安装目录下/hbase/conf里的hbase-site.xml


1.png


pom依赖


 <properties>
        <mysql.version>6.0.5</mysql.version>
        <spring.version>4.3.6.RELEASE</spring.version>
        <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version>
        <log4j.version>1.2.17</log4j.version>
        <quartz.version>2.2.3</quartz.version>
        <slf4j.version>1.7.22</slf4j.version>
        <hibernate.version>5.2.6.Final</hibernate.version>
        <camel.version>2.18.2</camel.version>
        <config.version>1.10</config.version>
        <jackson.version>2.8.6</jackson.version>
        <servlet.version>3.0.1</servlet.version>
        <net.sf.json.version>2.4</net.sf.json.version>
        <activemq.version>5.14.3</activemq.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.11</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>


查询


查全表


package com.bjfu.spark.demo.hbasedemo
import com.google.common.collect.Table.Cell
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * @author chaird
  * @create 2020-11-13 22:04
  */
object HBaseDemo {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
    //创建Spark上下文对象
    val sc = new SparkContext(config)
    //创建HBase配置对象
    val conf: Configuration = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")
    //全表扫描
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])
    hbaseRDD.foreach {
      //第一种展示方式
      case (rowkey, result) => {
        val cells: Array[hbase.Cell] = result.rawCells()
        for (cell <- cells) {
          println(Bytes.toString(CellUtil.cloneValue(cell)))
        }
      }
      第二种展示方式
      //      case (rowkey,result)=>{
      //        //rowKey
      //        val key: String = Bytes.toString(result.getRow)
      //        //列族,列,值
      //        val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"),Bytes.toBytes("288")))
      //        println("rowKey:"+key+"  "+"value:"+value)
      //
      //      }
    }
    //释放资源
    sc.stop()
  }
}


根据rowKey查询


package com.bjfu.spark.demo.hbasedemo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HTable, Scan}
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter._
import java.util
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
/**
  * @author chaird
  * @create 2020-11-14 20:39
  */
object HBaseConditionDemo {
  def main(args: Array[String]): Unit = {
    //创建Spark上下文对象
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo")
    val sc = new SparkContext(config)
    //创建HBaseConf
    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set(TableInputFormat.INPUT_TABLE, "EcodataFei")
    //创建过滤器(主键)
    val scan = new Scan()
    val rowkeys = List("17-2020-41-12 17:09", "2-2020-37-12 17:09")
    val filters = new util.ArrayList[Filter]()
    for (cookieid <- rowkeys) {
      val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cookieid)))
      filters.add(filter)
    }
    val filterList = new FilterList(Operator.MUST_PASS_ONE, filters)
    scan.setFilter(filterList)
    hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result])
    hbaseRDD.foreach {
      case (rowkey, result) => {
        //rowKey
        val key: String = Bytes.toString(result.getRow)
        //列族,列,值
        val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("288")))
        println("rowKey:" + key + "  " + "value:" + value)
      }
    }
    //释放资源
    sc.stop()
  }
  def convertScanToString(scan: Scan) = {
    val proto = ProtobufUtil.toScan(scan)
    Base64.encodeBytes(proto.toByteArray)
  }
}


将结果集保存下来


 val list: List[String] = hbaseRDD.map(
      x => (Bytes.toString(x._2.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("customer_id"))))).collect().toList
    list.foreach(println(_))
相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5月前
|
SQL 分布式计算 数据可视化
Spark SQL案例【电商购买数据分析】
Spark SQL案例【电商购买数据分析】
|
5月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
87 0
|
7月前
|
SQL 分布式计算 资源调度
线上 hive on spark 作业执行超时问题排查案例分享
线上 hive on spark 作业执行超时问题排查案例分享
|
7月前
|
SQL 存储 分布式数据库
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
113 0
|
21天前
|
分布式计算 Hadoop Scala
Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
【4月更文挑战第13天】Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
22 0
|
5月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
82 0
|
4月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
5月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
49 0
|
5月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
82 0
|
5月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
67 0