大数据Sqoop借助Hive将Mysql数据导入至Hbase

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 大数据Sqoop借助Hive将Mysql数据导入至Hbase

1. BulkLoad 介绍

  • 目标
  • 理解 BulkLoad 的大致原理
  • 步骤
  1. 为什么要抽取
  2. 为什么使用 BulkLoad

1.1. 为什么要抽取

大数据Sqoop快速入门

因为数据仓库是甲方自建的, 所以如果我们需要数仓中的数据, 需要申请, 申请完成后, 甲方会将对应的 Hive 表开放给我们, 所以我们需要把 Hive 表中需要的数据抽取到我们的 HBase 中, 如下

抽取方向: Hive -> HBase


1.2. 为什么使用 BulkLoad

在大量数据需要写入HBase时,通常有 put方式和bulkLoad 两种方式。

1、put方式为单条插入,在put数据时会先将数据的更新操作信息和数据信息 写入WAL ,在写入到WAL后, 数据就会被放到MemStore中 ,当MemStore满后数据就会被 flush到磁盘(即形成HFile文件) ,在这种写操作过程会涉及到flush、split、compaction等操作,容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能,避免这些问题最好的方法就是使用BulkLoad的方式来加载数据到HBase中。


b9312adc2de34a8c8ba3652abddc354f.png

2、BulkLoader利用HBase数据按照HFile格式存储在HDFS的原理,使用MapReduce直接批量

生成HFile格式文件后,RegionServers再将HFile文件移动到相应的Region目录下。

1)、Extract,异构数据源数据导入到 HDFS 之上。

2)、Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。

3)、Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的

HDFS 目录上,该过程可能涉及到文件切分。

1、不会触发WAL预写日志,当表还没有数据时进行数据导入不会产生Flush和Split。

2、减少接口调用的消耗,是一种快速写入的优化方式。

Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase:

https://www.jianshu.com/p/b6c5a5ba30af

  • 直接使用 HBase 的 Java 客户端, 一条一条插入 HBase 的
  • 使用 BulkLoad

先说说要使用 BulkLoad 的原因

  • 从 Hive 抽取数据到 HBase 是将全量的数据抽取到 HBase, 日增量大概 260 G
  • 如果把这五千万条数据一条一条的插入 HBase, 会影响 HBase 的运行
  • 如果把这五千万条数据一条一条的插入 HBase, 会非常的慢

为什么会非常慢呢? 因为一条数据插入 HBase 的大致步骤如下

  1. 查询元数据表, 得到所要插入的表的 Region 信息
  2. 插入到对应 Region 的 WAL 预写日志
  3. 插入到 Region 中的 Memstore
  1. 达到条件后, Memstore 把数据刷写为 HFile
  2. 达到条件后, 触发 Minor Compaction
  3. 达到条件后, 触发 Major Compaction
  4. 达到条件后, 分裂 Region
  1. 达到条件后, 分裂 Region
  2. 分区再平衡

而我们有 260G 的数据要插入, 触发很多次 Compaction, 会分裂 Region 几百次, 这无疑会造成 HBase 集群的不稳定, 并且, 我们插入的速度也会很慢

所以, 当一次性要插入的数据太多时, 要通过 HBase 的 BulkLoad 方式加载

  1. Spark 读取 Hive 的数据
  1. 生成 HFile
  2. 通过 HBase 的 BulkLoad API 把 HFile 直接交给 RegionServer
  3. RegionServer 切分, 直接放入对应的 Region

1.3 hive导入Hbase

将MySQL数据库中表的数据导入到Hive表中,以便加载到HBase表中。

启动HiveMetastore服务和HiveServer2服务,使用beeline命令行连接,相关命令如下:

[root@bigdata-cdh01 ~]# /export/servers/hive/bin/beeline
Beeline version 1.1.0-cdh5.14.0 by Apache Hive
beeline> !connect jdbc:hive2://bigdata-cdh01.itcast.cn:10000
scan complete in 2ms
Connecting to jdbc:hive2://bigdata-cdh01.itcast.cn:10000
Enter username for jdbc:hive2://bigdata-cdh01.itcast.cn:10000: root
Enter password for jdbc:hive2://bigdata-cdh01.itcast.cn:10000: ****
Connected to: Apache Hive (version 1.1.0-cdh5.14.0)
Driver: Hive JDBC (version 1.1.0-cdh5.14.0)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://bigdata-cdh01.itcast.cn:10000>

1.3.1 创建表

创建Hive中数据库Database:

CREATE DATABASE tags_dat;

根据MySQL数据库表在Hive数据仓库中构建相应的表:

用户信息表: tbl_users

/export/servers/sqoop/bin/sqoop create-hive-table \
--connect jdbc:mysql://bigdata-cdh01.oldlu.cn:3306/tags_dat \
--table tbl_users \
--username root \
--password 123456 \
--hive-table tags_dat.tbl_users \
--fields-terminated-by '\t' \
--lines-terminated-by '\n'

1.3.2 导入数据至Hive表

使用Sqoop将MySQL数据库表中的数据导入到Hive表中(本质就是存储在HDFS上),具体命

令如下:

用户信息表: tbl_users

/export/servers/sqoop/bin/sqoop import \
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat \
--username root \
--password 123456 \
--table tbl_users \
--direct \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--hive-table tags_dat.tbl_users \
--hive-import \
--num-mappers 1

2. 从 Hive 中抽取数据到 HBase

  • 目标
  • 将 Hive 的表抽取到 HBase 中
  • 步骤
  1. 准备数据
  1. 导入 MySQL
  2. 导入 Hive
  1. 建立工程 tag-data
  2. 编写 Spark 任务
  3. 运行任务
  4. 查看结果

2.1. 准备数据

2.1.1. 将数据导入到 MySQL

步骤:

  1. 打开 SQL 脚本
  2. 运行 SQL 脚本

MySQL 密码 : itcastmysqlroot

详细解释:

  1. 通过 IDEA 打开脚本, 文件位置在 files/tags_data.sql

  1. 运行脚本

  1. 等待结果

2.1.2. 将数据导入到 Hive

步骤:

  1. 编写 Sqoop 任务脚本
  2. 通过 Hue 上传脚本
  3. 创建 Hive 数据库
  4. 创建 Oozie job 执行脚本, 导入数据

详细解释:

  1. 编写 Sqoop 任务脚本
#!/bin/sh
sqoop import \
  --hive-import \
  --create-hive-table \
  --hive-table tags_data.tbl_goods \
  --connect "jdbc:mysql://master01:3306/tags_dat" \
  --username root \
  --password itcastmysqlroot \
  --query "SELECT * FROM tags_dat.tbl_goods WHERE \$CONDITIONS" \
  --split-by id \
  --direct \
  --target-dir /user/admin/hive/tags_dat \
  --m 2
sqoop import \
  --hive-import \
  --create-hive-table \
  --hive-table tags_data.tbl_goods_new \
  --connect "jdbc:mysql://master01:3306/tags_dat" \
  --username root \
  --password itcastmysqlroot \
  --query "SELECT * FROM tags_dat.tbl_goods_new WHERE \$CONDITIONS" \
  --split-by id \
  --direct \
  --target-dir /user/admin/hive/tags_dat \
  --m 2
sqoop import \
  --hive-import \
  --create-hive-table \
  --hive-table tags_data.tbl_logs \
  --connect "jdbc:mysql://master01:3306/tags_dat?useUnicode=true" \
  --username root \
  --password itcastmysqlroot \
  --query "SELECT * FROM tags_dat.tbl_logs WHERE \$CONDITIONS" \
  --split-by id \
  --direct \
  --target-dir /user/admin/hive/tags_dat \
  --m 2
sqoop import \
  --hive-import \
  --create-hive-table \
  --hive-table tags_data.tbl_orders \
  --connect "jdbc:mysql://master01:3306/tags_dat" \
  --username root \
  --password itcastmysqlroot \
  --query "SELECT * FROM tags_dat.tbl_orders WHERE \$CONDITIONS" \
  --split-by id \
  --direct \
  --target-dir /user/admin/hive/tags_dat \
  --m 2
sqoop import \
  --hive-import \
  --create-hive-table \
  --hive-table tags_data.tbl_users \
  --connect "jdbc:mysql://master01:3306/tags_dat" \
  --username root \
  --password itcastmysqlroot \
  --query "SELECT * FROM tags_dat.tbl_users WHERE \$CONDITIONS" \
  --split-by id \
  --direct \
  --target-dir /user/admin/hive/tags_dat \
  --m 2
  1. 通过 Hue 上传脚本文件

  1. 创建 Hive 数据库

  1. 创建 Oozie Job 执行脚本

  1. 查看执行结果

2.2. 工程配置

数据抽取:

  1. 导入 MySQL
  2. MySQL to Hive
  3. Hive to HBase
  1. 建立工程, 导入 Maven 配置
  2. 代码编写, 通过 Spark 读取 Hive 数据, 落地成 HFile, 通过 BulkLoad 加载到 HBase
  3. 提交, 运行

步骤:

  1. 配置 Maven
  2. 导入 HBase 的配置文件到resource目录中
  • 配置文件在 Files/hbase_conf

从目标来看, 这个工程中需要 Spark, HBase, Hive 的相关依赖, pom.xml 如下

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <exclusions>
            <exclusion>
                <artifactId>jersey-container-servlet-core</artifactId>
                <groupId>org.glassfish.jersey.containers</groupId>
            </exclusion>
            <exclusion>
                <artifactId>guice-servlet</artifactId>
                <groupId>com.google.inject.extensions</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-mapreduce</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
        </plugin>
    </plugins>
</build>

2.3. Spark 任务

  • 步骤
  1. 从 Hive 中读取表数据
  2. 创建 HBase 和 Hadoop 的配置对象
  3. 将数据以 HFile 的形式写入到 HDFS 中
  1. 通过 HBase Client API 将数据 BulkLoad 到 HBase 对应的表中
  • 注意点
  • 如果希望程序要通用一些, 我们可以把 Hive 的表明和 RowKey 列的信息通过 main 方法传进来
  • 要先写 HFile 再 BulkLoad 才是 BulkLoad
  • Job 和 HBaseClient 的配置是模板代码, 不需要记忆
  • add framework support 的时候, 加载 Scala 的 SDK 不能使用 Maven 的 SDK, 要使用自己电脑上安装的 Scala SDK
object HiveToHBase {
  val defaultCF = "default"
  val defaultNameSpace = "default"
  val tempFileDir = "/user/admin/Spark/extra_temp/"
  def main(args: Array[String]): Unit = {
    if (args.length < 3) {
      return
    }
    val sourceDBName = args(0)
    val sourceTableName = args(1)
    val rkeyField = args(2)
    val conf = HBaseConfiguration.create
    conf.set(TableOutputFormat.OUTPUT_TABLE, sourceTableName)
    conf.set("hbase.mapreduce.hfileoutputformat.table.name", sourceTableName)
    val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    val hfilePath = tempFileDir + sourceTableName
    hive2HFile(sourceDBName, sourceTableName, rkeyField, defaultCF, conf, hfilePath)
    bulkLoad2Table(job, hfilePath, defaultNameSpace, sourceTableName, defaultCF)
  }
  def hive2HFile(sourceDB: String, sourceTable: String, rkeyField: String, cf: String, hadoopConfig: Configuration, hfilePath: String): Unit = {
    val fs = FileSystem.get(hadoopConfig)
    if (fs.exists(new Path(hfilePath))) {
      fs.delete(new Path(hfilePath), true)
    }
    val spark = SparkSession.builder()
      .appName("bulk load from hive")
      .enableHiveSupport()
      .getOrCreate()
    spark.read
      .table(sourceDB + "." + sourceTable)
      .rdd
      .filter(row => row.getAs(rkeyField) != null)
      .flatMap(row => {
        val cfBytes = Bytes.toBytes(cf)
        val rowKeyBytes = Bytes.toBytes(row.getAs(rkeyField).toString)
        row.schema
          .sortBy(field => field.name)
          .map(field => {
            val fieldNameBytes = Bytes.toBytes(field.name)
            val valueBytes = Bytes.toBytes(row.getAs(field.name).toString)
            val kv = new KeyValue(rowKeyBytes, cfBytes, fieldNameBytes, valueBytes)
            (new ImmutableBytesWritable(rowKeyBytes), kv)
          })
      })
      .filter(item => item != null)
      .saveAsNewAPIHadoopFile(
        hfilePath,
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        hadoopConfig
      )
  }
  def bulkLoad2Table(job: Job, hfilePath: String, namespace: String, name: String, cf: String): Unit = {
    val connection = ConnectionFactory.createConnection(job.getConfiguration)
    val admin = connection.getAdmin
    val tableName = TableName.valueOf(Bytes.toBytes(namespace), Bytes.toBytes(name))
    if (!admin.tableExists(tableName)) {
      admin.createTable(
        TableDescriptorBuilder.newBuilder(tableName)
          .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build())
          .build()
      )
    }
    val table = connection.getTable(tableName)
    val regionLocator = new HRegionLocator(tableName, connection.asInstanceOf[ClusterConnection])
    HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
    val loader = new LoadIncrementalHFiles(job.getConfiguration)
    loader.doBulkLoad(new Path(hfilePath), admin, table, regionLocator)
  }
}

2.4. 运行任务

一共有五张表需要导入

Hive table name RowKey field
商品表 tbl_goods id
商品表_new tbl_goods_new id
日志表 tbl_logs id
订单表 tbl_orders id
用户表 tbl_users id

需要五个 Oozie Job 去调度执行, 创建方式如下

  1. 打包 Spark 程序
  2. 上传 Spark Jar 包到 HDFS
  3. 创建 Workflow
  • 选择 Spark Jar 包
  • 加入参数, 一个是要导入 HBase 的 Hive 表名, 一个是表中的 RowKey 列
  • 创建 Cordinator
  • Cordinator 会每天执行一次导入增量数据的任务
  • 为了尽快看到效果, 可以先运行 Workflow
  • 如果是导入日增量数据, 可以在 Sqoop 任务的 SQL 中过滤当日数据

详细介绍如下

  1. 打包 Spark 程序


  1. 上传 Spark Jar 包

  1. 创建 Workflow

  1. 创建 Cordinator
相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
2月前
|
存储 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL 数据库课程设计:开启数据宇宙的传奇之旅
本文全面剖析数据库课程设计 MySQL,展现其奇幻魅力与严峻挑战。通过实际案例凸显数据库设计重要性,详述数据安全要点及学习目标。深入阐述备份与恢复方法,并分享优秀实践项目案例。为开发者提供 MySQL 数据库课程设计的全面指南,助力提升数据库设计与管理能力,保障数据安全稳定。
大数据新视界--大数据大厂之MySQL 数据库课程设计:开启数据宇宙的传奇之旅
|
1月前
|
SQL 分布式计算 大数据
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
|
1月前
|
存储 关系型数据库 MySQL
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
本文详细介绍了在 MySQL 中创建数据库和表的方法。包括安装 MySQL、用命令行和图形化工具创建数据库、选择数据库、创建表(含数据类型介绍与选择建议、案例分析、最佳实践与注意事项)以及查看数据库和表的内容。文章专业、严谨且具可操作性,对数据管理有实际帮助。
大数据新视界 --面向数据分析师的大数据大厂之 MySQL 基础秘籍:轻松创建数据库与表,踏入大数据殿堂
|
2月前
|
关系型数据库 MySQL 数据安全/隐私保护
大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
本文深入探讨数据库课程设计 MySQL 的数据安全。以医疗、电商、企业案例,详述用户管理、数据加密、备份恢复及网络安全等措施,结合数据安全技术发展趋势,与《大数据新视界 -- 大数据大厂之 MySQL 数据库课程设计》紧密关联,为 MySQL 数据安全提供全面指南。
大数据新视界--大数据大厂之MySQL 数据库课程设计:数据安全深度剖析与未来展望
|
2月前
|
负载均衡 算法 关系型数据库
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
|
2月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
2月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
8月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
101 3
|
8月前
|
消息中间件 分布式计算 关系型数据库
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
124 0

推荐镜像

更多