三、Sqoop 全量导入核心命令

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,1000CU*H 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 在大数据处理过程中,数据库表怎么高效导入到 Hadoop?这一篇我带大家实战讲解 Sqoop 全量导入 的用法,从基础命令到常用参数配置,再到导入到 HDFS、Hive 的各种格式案例,配合实操示例,帮你一步步掌握全量导入技巧。最后还有练习题,供大家动手巩固一下。

Sqoop 的全量导入 (Full Import) 是指将关系型数据库中整个表特定查询结果集所有数据一次性完整地导入到 Hadoop 生态系统 (通常是 HDFS 或 Hive) 中。

一、核心命令结构 (sqoop import)

执行全量导入的基本命令框架如下:

sqoop import \
--connect <jdbc-connect-string> \
--username <db-username> \
--password <db-password> \
# (或使用 --password-file / -P)
# --- 数据源指定 ---
[--table <db-table-name>] \
[--query <sql-query> --target-dir <hdfs-path> -m <num-mappers> [--split-by <column-name>]] \
# --- 导入目标指定 ---
[--target-dir <hdfs-path>] \
[--warehouse-dir <hdfs-base-path-for-hive>] \
[--hive-import --hive-table <hive-db.table_name> [--create-hive-table] [--hive-overwrite]] \
# --- 并行度与分割 ---
[-m <num-mappers>] \
[--split-by <column-name>] \
# --- 文件格式与压缩 ---
[--fields-terminated-by '<char>'] \
[--as-textfile | --as-sequencefile | --as-avrodatafile | --as-parquetfile] \
[--compress --compression-codec <codec>] \
# --- 其他常用选项 ---
[--delete-target-dir] \
[--verbose]
二、关键参数详解 (带示例)
  1. 连接参数 (Connection Parameters)
    • --connect <jdbc-connect-string>: 数据库的JDBC连接URL
      • 示例 (MySQL):
        --connect jdbc:mysql://localhost:3306/mydatabase
        
  • --username <db-username>: 数据库用户名

    • 示例:
      --username retail_user
      
  • --password <db-password>: 数据库密码生产环境强烈建议使用 --password-file /path/to/passwordfile-P (交互式输入密码)。

    • 示例 (不推荐直接写密码): --password 123456
    • 示例 (推荐): -P (执行时会提示输入)
  1. 数据源指定 (Data Source) - 至少提供一种

    • --table <db-table-name>: 要导入的数据库表名
      • 示例:
        --table products
        
    • --query "<sql-query>": 使用自定义的SQL查询来选择要导入的数据。
      • 示例:
        --query "SELECT id, name, price FROM items WHERE status = 'active' AND \$CONDITIONS"
        
    • 注意:使用 --query 时,必须同时提供 --target-dir <hdfs-path> (如果导入到HDFS) 和 -m <num-mappers>。如果 -m > 1,查询中必须包含 \$CONDITIONS 占位符,并通常需要 --split-by。如果 -m 1,则 \$CONDITIONS 不是必需的
  2. 导入目标 (Import Target) - 至少提供一种

    • --target-dir <hdfs-path>: 指定数据导入到HDFS的目录
      • 示例:
        --target-dir /user/cloudera/imported_data/products_text
        
    • --warehouse-dir <hdfs-base-path-for-hive>: 指定Hive仓库的根目录。当与 --hive-import 结合使用时,Sqoop 会在此目录下创建与表名对应的子目录

      • 示例:
        --warehouse-dir /user/hive/warehouse
        
    • --hive-import: 指示Sqoop将数据导入到Hive表中。

    • --hive-table <hive-db.table_name>: 指定目标Hive数据库和表名
      • 示例:
        --hive-table retail_stage.products_from_mysql
        
    • --create-hive-table: (可选) 如果目标Hive表不存在,Sqoop会根据源表结构自动创建它。
    • --hive-overwrite: (可选) 如果目标Hive表已存在,此选项会覆盖表中的现有数据
  3. 并行度与数据分割 (Parallelism & Splitting)

    • -m <num-mappers> (或 --num-mappers <num-mappers>): 指定用于导入数据的Map任务数量。默认通常是4。

    • --split-by <column-name>: 指定用于分割数据以供并行Map任务处理的列。该列最好是数值类型、日期类型或具有均匀分布的字符类型,并且有索引

  1. 文件格式与压缩 (File Format & Compression)

    • --fields-terminated-by '<char>': 指定字段分隔符

      • 示例:
        --fields-terminated-by '\t' # Tab分隔
        --fields-terminated-by '|' # 线分隔
        
    • --as-textfile | --as-sequencefile | --as-avrodatafile | --as-parquetfile: 指定存储格式

    • --compress --compression-codec <codec>: 启用压缩并指定压缩编解码器。

      • 示例:
        --compress --compression-codec snappy
        
  2. 其他常用选项

    • --delete-target-dir: 如果目标HDFS目录已存在,则先删除它再导入。谨慎使用!
    • --verbose: 输出更详细的执行日志。
    • --direct: (仅支持部分数据库如MySQL, PostgreSQL) 使用数据库特定的批量加载工具绕过MapReduce,通常速度更快
三、全量导入MySQL表数据示例

场景: 假设我们有一个MySQL数据库 testdb,其中有一个表 products (id INT PRIMARY KEY, name VARCHAR(255), price DECIMAL(10,2), category VARCHAR(100))。

1. 全量导入MySQL表数据到HDFS

(A) 导入为默认文本文件 (逗号分隔) 到HDFS

sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-password 123456 \
--table products \
--target-dir /user/cloudera/products_hdfs_text_default \
-m 2 \
--split-by id
  • 说明:数据将以逗号分隔的文本文件形式存储在HDFS的 /user/cloudera/products_hdfs_text_default 目录下。使用2个mapper并行导入,并根据 id 列进行数据切分。

(B) 导入为制表符分隔的文本文件到HDFS,并启用Snappy压缩

sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--target-dir /user/cloudera/products_hdfs_tsv_snappy \
--fields-terminated-by '\t' \
--compress \
--compression-codec snappy \
-m 4 \
--split-by id
  • 说明:数据将以制表符分隔,并使用Snappy压缩

(C) 导入为Avro数据文件到HDFS

sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--target-dir /user/cloudera/products_hdfs_avro \
--as-avrodatafile \
-m 2 \
--split-by id
  • 说明:数据将以Avro格式存储,Sqoop会自动生成相应的Avro schema。
2. 全量导入MySQL表数据到Hive

(A) 导入到Hive,如果表不存在则创建 (默认文本格式)

sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--hive-import \
--hive-table default.products_from_mysql_text \
--create-hive-table \
-m 2 \
--split-by id
  • 说明:数据将导入到 default 数据库下的 products_from_mysql_text Hive表中。如果表不存在,Sqoop会根据MySQL表结构自动创建。HDFS上的数据文件将位于默认的Hive仓库路径下(通常是 /user/hive/warehouse/products_from_mysql_text/)。

(B) 导入到Hive并覆盖现有表,指定存储为Parquet格式

sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--table products \
--hive-import \
--hive-table default.products_from_mysql_parquet \
--create-hive-table \
--hive-overwrite \
--as-parquetfile \
-m 4 \
--split-by id
  • 说明:数据将以Parquet列式存储格式导入到Hive。如果 products_from_mysql_parquet 表已存在,其内容将被覆盖

(C) 使用自定义查询导入特定数据到Hive

sqoop import \
--connect jdbc:mysql://localhost:3306/testdb \
--username root \
-P \
--query "SELECT id, name, category FROM products WHERE price > 100.00 AND \$CONDITIONS" \
--split-by id \
--hive-import \
--hive-table default.expensive_products \
--create-hive-table \
--target-dir /user/cloudera/temp/expensive_products_staging \
# --query时,若hive-import,也建议指定一个临时的--target-dir
-m 2
  • 说明:仅导入 products 表中价格大于100的产品的 id, name, 和 category 列。\$CONDITIONS 允许Sqoop并行化这个自定义查询。为 --query 导入到Hive时,显式指定 --target-dir 作为数据在HDFS上的暂存位置通常是个好习惯,Sqoop随后会将这些数据加载到Hive表中。
四、执行流程与核心点
  1. 元数据探查:Sqoop连接数据库,获取表结构信息。
  2. 代码生成:自动生成Java类以映射表记录。
  3. MapReduce作业:(非--direct模式) 将导入任务转化为MapReduce作业。Map任务并行读取数据库数据分片并写入HDFS/Hive。
  4. 数据一致性:全量导入期间,源表若持续写入新数据,导入结果可能非严格事务快照。建议在业务低峰期操作。
  5. 数据库压力高并发导入会增加源数据库负载,需合理设置 -m
  6. 类型映射:关注数据库类型到Hadoop类型的自动映射,特殊类型可能需要手动调整

练习题与解析

假设环境:

  • MySQL数据库 sales_db,包含表 transactions (trans_id INT PRIMARY KEY, product_sku VARCHAR(50), sale_amount DECIMAL(12,2), trans_date DATE, region_id INT)。
  • Hadoop集群已配置。
  • MySQL连接信息:jdbc:mysql://dbserver.mycompany.com:3306/sales_db,用户 sales_importer,密码通过密码文件 /user/sqoop_user/sales.password 获取。

题目:

  1. 练习题1:HDFS导入 - 指定分隔符和压缩
    请编写Sqoop命令,将 sales_db.transactions 表的所有数据全量导入到HDFS的 /data_lake/raw/sales_transactions 目录下。使用4个Map任务,并以 trans_id 列进行数据分割。文件字段间使用逗号 , 分隔,并且使用Gzip压缩

  2. 练习题2:Hive导入 - 创建新表并指定数据类型
    请编写Sqoop命令,将 sales_db.transactions 表中 region_id5sale_amount 大于 500所有交易数据全量导入到Hive。目标Hive表为 processed_sales.high_value_region5_txns。如果该Hive表不存在,则自动创建它,并确保 sale_amount 在Hive中映射为 DOUBLE 类型,trans_date 映射为 DATE 类型。如果表已存在,则覆盖其内容。数据在Hive中以Avro格式存储。

解析:

  1. 练习题1答案与解析:

    sqoop import \
    --connect jdbc:mysql://dbserver.mycompany.com:3306/sales_db \
    --username sales_importer \
    --password-file /user/sqoop_user/sales.password \
    --table transactions \
    --target-dir /data_lake/raw/sales_transactions \
    -m 4 \
    --split-by trans_id \
    --fields-terminated-by ',' \
    --compress \
    --compression-codec gzip
    
    • --password-file /user/sqoop_user/sales.password: 使用HDFS上的密码文件,这比直接在命令行写密码更安全
    • --table transactions: 指定导入源表
    • --target-dir /data_lake/raw/sales_transactions: HDFS目标路径
    • -m 4 --split-by trans_id: 使用4个mapper并行处理,按 trans_id 分割
    • --fields-terminated-by ',': 字段以逗号分隔
    • --compress --compression-codec gzip: 启用Gzip压缩
  2. 练习题2答案与解析:

    sqoop import \
    --connect jdbc:mysql://dbserver.mycompany.com:3306/sales_db \
    --username sales_importer \
    --password-file /user/sqoop_user/sales.password \
    --query "SELECT trans_id, product_sku, sale_amount, trans_date, region_id FROM transactions WHERE region_id = 5 AND sale_amount > 500 AND \$CONDITIONS" \
    --split-by trans_id \
    --map-column-hive sale_amount=DOUBLE,trans_date=DATE \
    --hive-import \
    --hive-table processed_sales.high_value_region5_txns \
    --create-hive-table \
    --hive-overwrite \
    --as-avrodatafile \
    --target-dir /user/hive/warehouse/processed_sales.db/high_value_region5_txns_temp \
    -m 2
    
    • --query "...": 使用自定义查询筛选数据,并包含 \$CONDITIONS 以支持并行。
    • --split-by trans_id: 指定查询结果集如何分割给mappers。
    • --map-column-hive sale_amount=DOUBLE,trans_date=DATE: 关键参数,用于在创建Hive表时,将源表中的 sale_amount 列映射为Hive的 DOUBLE 类型,trans_date 列映射为Hive的 DATE 类型
    • --hive-import --hive-table ... --create-hive-table --hive-overwrite: 导入到Hive,不存在则创建,存在则覆盖
    • --as-avrodatafile: 指定数据以Avro格式存储
    • --target-dir ..._temp: --query 导入到Hive指定一个临时的HDFS暂存目录
    • -m 2: 使用2个mapper。对于自定义查询,mapper数量需要根据数据量和 --split-by 列的特性来合理设置。
目录
相关文章
|
24天前
|
分布式计算 Java 关系型数据库
二、Sqoop 详细安装部署教程
在大数据开发实战中,Sqoop 是数据库与 Hadoop 生态之间不可或缺的数据传输工具。这篇文章将以 Sqoop 1.4.7 为例,结合官方站点截图,详细讲解 Sqoop 的下载路径、安装步骤、环境配置,以及常见 JDBC 驱动的准备过程,帮你一步步搭建出能正常运行的 Sqoop 环境,并通过 list-databases 命令验证安装是否成功。如果你正打算学习 Sqoop,或者在搭建大数据平台过程中遇到安装配置问题,本文将是非常实用的参考指南。
107 6
|
人工智能 搜索推荐 算法
【推荐系统】UserCF(基于用户的协同过滤)(理论+图解+代码实践)
【推荐系统】UserCF(基于用户的协同过滤)(理论+图解+代码实践)
2582 0
【推荐系统】UserCF(基于用户的协同过滤)(理论+图解+代码实践)
|
25天前
|
小程序 JavaScript 搜索推荐
基于springboot的考研互助小程序
本项目基于SpringBoot开发考研互助小程序,整合优质资源,提供真题、视频、学习计划等功能,构建交流社区,助力考生高效备考,促进教育公平与信息化发展。
|
28天前
|
人工智能 Java API
构建基于Java的AI智能体:使用LangChain4j与Spring AI实现RAG应用
当大模型需要处理私有、实时的数据时,检索增强生成(RAG)技术成为了核心解决方案。本文深入探讨如何在Java生态中构建具备RAG能力的AI智能体。我们将介绍新兴的Spring AI项目与成熟的LangChain4j框架,详细演示如何从零开始构建一个能够查询私有知识库的智能问答系统。内容涵盖文档加载与分块、向量数据库集成、语义检索以及与大模型的最终合成,并提供完整的代码实现,为Java开发者开启构建复杂AI智能体的大门。
785 58
|
1天前
|
人工智能 运维 Kubernetes
Serverless 应用引擎 SAE:为传统应用托底,为 AI 创新加速
在容器技术持续演进与 AI 全面爆发的当下,企业既要稳健托管传统业务,又要高效落地 AI 创新,如何在复杂的基础设施与频繁的版本变化中保持敏捷、稳定与低成本,成了所有技术团队的共同挑战。阿里云 Serverless 应用引擎(SAE)正是为应对这一时代挑战而生的破局者,SAE 以“免运维、强稳定、极致降本”为核心,通过一站式的应用级托管能力,同时支撑传统应用与 AI 应用,让企业把更多精力投入到业务创新。
|
24天前
|
SQL 关系型数据库 MySQL
四、Sqoop 导入表数据子集
在实际数据导入场景中,我们经常只需要数据库中的一部分数据,比如按条件筛选的行、特定的几列。这篇文章详细讲解了如何使用 Sqoop 的 --where、--columns、--query 等方式灵活实现子集导入,配有完整示例和注意事项,助你更精准地控制数据流向 HDFS 或 Hive。
62 1
|
24天前
|
分布式计算 关系型数据库 Hadoop
一、Sqoop历史发展及原理
在大数据系统中,Sqoop 就像是一位干练的“数据搬运工”,帮助我们把 MySQL、Oracle 等数据库里的数据快速、安全地导入到 Hadoop、Hive 或 HDFS 中,反之亦然。这个专栏从基础原理讲起,配合实战案例、参数详解和踩坑提醒,让你逐步掌握 Sqoop 的使用技巧。不管你是初学者,还是正在构建数据管道的工程师,都能在这里找到实用的经验和灵感。
83 6
|
28天前
|
存储 安全 Unix
七、Linux Shell 与脚本基础
别再一遍遍地敲重复的命令了,把它们写进Shell脚本,就能一键搞定。脚本本质上就是个存着一堆命令的文本文件,但要让它“活”起来,有几个关键点:文件开头最好用#!/usr/bin/env bash来指定解释器,并用chmod +x给它执行权限。执行时也有讲究:./script.sh是在一个新“房间”(子Shell)里跑,不影响你;而source script.sh是在当前“房间”里跑,适合用来加载环境变量和配置文件。
307 9
|
24天前
|
数据采集 关系型数据库 MySQL
python爬取数据存入数据库
Python爬虫结合Scrapy与SQLAlchemy,实现高效数据采集并存入MySQL/PostgreSQL/SQLite。通过ORM映射、连接池优化与批量提交,支持百万级数据高速写入,具备良好的可扩展性与稳定性。
|
9天前
|
存储 分布式计算 Hadoop
七、Sqoop Job:简化与自动化数据迁移任务及免密执行
平时用 Sqoop 导入导出时,命令一长就容易出错,特别是增量任务还得记 last-value,很麻烦。其实 Sqoop 有 Job 功能,能把命令“存档”,以后直接 --exec 执行,配合调度工具特别省心。本文手把手讲 Job 创建、管理、免密执行技巧(密码文件、Credential Provider),还带实战例子,搞完你就能写出稳稳当当的自动化 Sqoop 作业了!
85 0