Spark与HBase的集成

简介: 笔记

版本说明:


hbase版本:hbase-1.3.1

spark版本:spark-2.4.7-bin-hadoop2.7


一、Spark与HBase的集成


背景:

Spark支持多种数据源,但是Spark对HBase的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的数据源API自己实现了一套比较方便操作HBase的API。


数据模型:

row,addres,age,username
001,guangzhou,20,alex
002,shenzhen,34,jack
003,beijing,23,lili

需求分析:

通过spark读取hbase中的数据或者将数据写入到hbase中。

添加配置:

在pom.xml文件中添加如下配置:

<!-- hbase依赖包 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

代码实现:

package com.kfk.spark.sql
import com.kfk.spark.common.CommSparkSessionScala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/10
 * @time : 8:53 下午
 */
object HBaseSpark {
    def main(args: Array[String]): Unit = {
        // configuration
        val spark = CommSparkSessionScala.getSparkSession()
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
        hbaseConf.set("hbase.zookeeper.quorum","bigdata-pro-m04")
        hbaseConf.set("hbase.master","bigdata-pro-m04:60010")
        // get
        getHBase(hbaseConf,spark)
        // write
        writeHBase(hbaseConf,spark)
    }
    /**
     * 读取hbase中的数据
     * @param hbaseConf
     * @param spark
     */
    def getHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={
        // 获取表名
        hbaseConf.set(TableInputFormat.INPUT_TABLE,"stu")
        // 将hbase中的数据转换成rdd
        val hbaseRDD =  spark.sparkContext.newAPIHadoopRDD(hbaseConf,
            classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
            classOf[org.apache.hadoop.hbase.client.Result])
        // 打印数据
        hbaseRDD.foreach(result => {
            val key = Bytes.toString(result._2.getRow)
            val addres = Bytes.toString(result._2.getValue("info".getBytes(),"addres".getBytes()))
            val age = Bytes.toString(result._2.getValue("info".getBytes(),"age".getBytes()))
            val username = Bytes.toString(result._2.getValue("info".getBytes(),"username".getBytes()))
            println("row key:" + key + " addres=" + addres + " age=" + age + " username=" + username)hashCode()
            /**
             * row key:001 addres=guangzhou age=20 username=alex
             * row key:002 addres=shenzhen age=34 username=jack
             * row key:003 addres=beijing age=23 username=lili
             */
        })
    }
    /**
     * 将数据写入到hbase
     * @param hbaseConf
     * @param spark
     */
    def writeHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={
        // 初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
        val jobConf = new JobConf(hbaseConf)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        // 获取表名
        jobConf.set(TableOutputFormat.OUTPUT_TABLE,"stu")
        // 准备数据
        val array = Array("004,shanghai,25,jone",
                          "005,nanjing,31,cherry",
                          "006,wuhan,18,pony")
        val rdd = spark.sparkContext.makeRDD(array)
        // 将写入到hbase的数据转换成rdd
        val saveRDD = rdd.map(line => line.split(",")).map(x => {
            /**
             * 一个Put对象就是一行记录,在构造方法中指定主键
             * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
             * Put.addColumn 方法接收三个参数:列族,列名,数据
             */
            val put = new Put(Bytes.toBytes(x(0)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("addres"),Bytes.toBytes(x(1)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(x(2)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("username"),Bytes.toBytes(x(3)))
            (new ImmutableBytesWritable,put)
        })
        // 写入到hbase中
        saveRDD.saveAsHadoopDataset(jobConf)
    }
}

运行结果:

row key:001 addres=guangzhou age=20 username=alex
row key:002 addres=shenzhen age=34 username=jack
row key:003 addres=beijing age=23 username=lili
row key:004 addres=shanghai age=25 username=jone
row key:005 addres=nanjing age=31 username=cherry
row key:006 addres=wuhan age=18 username=pony

本来源数据只有前三行,通过写入后三行,再打印出结果。


二、Spark SQL与HBase的集成


Spark SQL与HBase集成,其核心就是Spark Sql通过hive外部表来获取HBase的表数据。


将hbase、hive、mysql相关jar包拷贝到spark的jars目录下

hbase:
hbase-client-1.3.1.jar
hbase-common-1.3.1.jar
hbase-protocol-1.3.1.jar
hbase-server-1.3.1.jar
metrics-core-2.2.0.jar
hive:
hive-hbase-handler-2.3.3.jar
htrace-core-3.1.0-incubating.jar
mysql:
mysql-connector-java-5.1.48-bin.jar

创建与HBase集成的Hive的外部表:

CREATE EXTERNAL TABLE stu(
id string,
addres string,
age string,
username string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = 
":key,info:addres,info:age,info:username") 
TBLPROPERTIES ("hbase.table.name" = "stu");


相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
分布式计算 API Apache
Spark与Elasticsearch的集成与全文搜索
Spark与Elasticsearch的集成与全文搜索
|
2月前
|
SQL 分布式计算 大数据
Paimon 与 Spark 的集成(二):查询优化
通过一系列优化,我们将 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平,本文对其中的关键优化点进行了详细介绍。
117542 30
|
4月前
|
存储 缓存 分布式计算
Spark与云存储的集成:S3、Azure Blob Storage
Spark与云存储的集成:S3、Azure Blob Storage
|
4月前
|
消息中间件 分布式计算 Kafka
Spark与Kafka的集成与流数据处理
Spark与Kafka的集成与流数据处理
|
4月前
|
存储 分布式计算 NoSQL
Spark与Cassandra的集成与数据存储
Spark与Cassandra的集成与数据存储
|
4月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
15天前
|
前端开发 Java 应用服务中间件
从零手写实现 tomcat-08-tomcat 如何与 springboot 集成?
该文是一系列关于从零开始手写实现 Apache Tomcat 的教程概述。作者希望通过亲自动手实践理解 Tomcat 的核心机制。文章讨论了 Spring Boot 如何实现直接通过 `main` 方法启动,Spring 与 Tomcat 容器的集成方式,以及两者生命周期的同步原理。文中还提出了实现 Tomcat 的启发,强调在设计启动流程时确保资源的正确加载和初始化。最后提到了一个名为 mini-cat(嗅虎)的简易 Tomcat 实现项目,开源于 [GitHub](https://github.com/houbb/minicat)。
|
1月前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
16天前
|
前端开发 Java 应用服务中间件
从零手写实现 tomcat-08-tomcat 如何与 springboot 集成?
本文探讨了Spring Boot如何实现像普通Java程序一样通过main方法启动,关键在于Spring Boot的自动配置、内嵌Servlet容器(如Tomcat)以及`SpringApplication`类。Spring与Tomcat集成有两种方式:独立模式和嵌入式模式,两者通过Servlet规范、Spring MVC协同工作。Spring和Tomcat的生命周期同步涉及启动、运行和关闭阶段,通过事件和监听器实现。文章鼓励读者从实现Tomcat中学习资源管理和生命周期管理。此外,推荐了Netty权威指南系列文章,并提到了一个名为mini-cat的简易Tomcat实现项目。
|
2天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01