使用 Spark 抽取 MySQL 数据到 Hive 时某列字段值出现异常(字段错位)

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 在 MySQL 的 `order_info` 表中,包含 `order_id` 等5个字段,主要存储订单信息。执行按 `create_time` 降序的查询,显示了部分结果。在 Hive 中复制此表结构时,所有字段除 `order_id` 外设为 `string` 类型,并添加了 `etl_date` 分区字段。然而,由于使用逗号作为字段分隔符,当 `address` 字段含逗号时,数据写入 Hive 出现错位,导致 `create_time` 值变为中文字符串。问题解决方法包括更换字段分隔符或使用 Hive 默认分隔符 `\u0001`。此案例提醒在建表时需谨慎选择字段分隔符。

@[toc]

源数据描述

在 MySQL 中建立了表 order_info ,其字段信息如下所示:

+--------------------+------------------+------+-----+-------------------+-----------------------------+
| Field              | Type             | Null | Key | Default           | Extra                       |
+--------------------+------------------+------+-----+-------------------+-----------------------------+
| order_id           | int(10) unsigned | NO   | PRI | NULL              | auto_increment              |
| order_sn           | varchar(100)     | NO   |     | NULL              |                             |
| address            | varchar(100)     | NO   |     | NULL              |                             |
| create_time        | varchar(100)     | NO   |     | NULL              |                             |
| pay_time           | varchar(100)     | NO   |     | NULL              |                             |
+--------------------+------------------+------+-----+-------------------+-----------------------------+

除了 order_id 字段,其余字段类型都为 varchar(100)

根据 create_time 降序查询前 20 列,输出结果如下:

+----------+------------------+----------------------------------------------------------------------------------------+----------------+----------------+
| order_id | order_sn         | address                                                                                | create_time    | pay_time       |
+----------+------------------+----------------------------------------------------------------------------------------+----------------+----------------+
|    36876 | 2022091196411460 | 江苏省南京市中央路2014271号第一层110店铺11| 20220911220617 | NULL           |
|    36877 | 2022091196411460 | 江苏省南京市中央路2014271号第一层110店铺11| 20220911220617 | 20220911232917 |
|    36878 | 2022091196411460 | 江苏省南京市中央路2014271号第一层110店铺11| 20220911220617 | 20220911232917 |
|    36879 | 2022091196411460 | 江苏省南京市中央路2014271号第一层110店铺11| 20220911220617 | 20220911232917 |
|    36880 | 2022091196411460 | 江苏省南京市中央路2014271号第一层110店铺11| 20220911220617 | 20220911232917 |
|    36798 | 2022091196213791 | 江苏省昆山市柏庐南路9991713号商业街商铺1171313| 20220911220552 | NULL           |
|    36799 | 2022091196213791 | 江苏省昆山市柏庐南路9991713号商业街商铺1171313| 20220911220552 | 20220912115152 |
|    36800 | 2022091196213791 | 江苏省昆山市柏庐南路9991713号商业街商铺1171313| 20220911220552 | 20220912115152 |
|    36801 | 2022091196213791 | 江苏省昆山市柏庐南路9991713号商业街商铺1171313| 20220911220552 | 20220912115152 |
|    36811 | 2022091010006041 | 江苏省南京市中山路3213071号鼓楼医院南扩新院区门诊大楼一楼5| 20220911220057 | NULL           |
|    36812 | 2022091010006041 | 江苏省南京市中山路3213071号鼓楼医院南扩新院区门诊大楼一楼5| 20220911220057 | 20220912103157 |
|    36813 | 2022091010006041 | 江苏省南京市中山路3213071号鼓楼医院南扩新院区门诊大楼一楼5| 20220911220057 | 20220912103157 |
|    36814 | 2022091010006041 | 江苏省南京市中山路3213071号鼓楼医院南扩新院区门诊大楼一楼5| 20220911220057 | 20220912103157 |
|    37448 | 2022091154427501 | 上海市上海市杨树浦路26887789号上海国际时尚中心18| 20220911213924 | NULL           |
|    37449 | 2022091154427501 | 上海市上海市杨树浦路26887789号上海国际时尚中心18| 20220911213924 | 20220912115924 |
|    37450 | 2022091154427501 | 上海市上海市杨树浦路26887789号上海国际时尚中心18| 20220911213924 | 20220912115924 |
|    37451 | 2022091154427501 | 上海市上海市杨树浦路26887789号上海国际时尚中心18| 20220911213924 | 20220912115924 |
|    37227 | 2022091178845429 | 浙江省宁波市鄞州区四明中路9996584号万达广场银泰百货1F15层                              | 20220911212820 | NULL           |
|    37228 | 2022091178845429 | 浙江省宁波市鄞州区四明中路9996584号万达广场银泰百货1F15层                              | 20220911212820 | 20220912110220 |
|    37229 | 2022091178845429 | 浙江省宁波市鄞州区四明中路9996584号万达广场银泰百货1F15层                              | 20220911212820 | 20220912110220 |
+----------+------------------+----------------------------------------------------------------------------------------+----------------+----------------+

可以在这里看到原始数据的形态。

问题复现

现在我对这个问题进行复现,在下面这段代码里,我在 Hive 中的 ods 层建立了 MySQL 的对应表 order_info,其字段信息如下所示:

order_id                int                                         
order_sn                string
address                 string
create_time             string
pay_time                string                                      
etl_date                string

同样除了 order_id 字段,其余字段类型都为 string,其中 etl_date 字段为该表的分区字段。

import org.apache.spark.sql.SparkSession

object Test{

    def main(args: Array[String]): Unit = {

        val spark: SparkSession = SparkSession
                .builder()
                .appName("Test")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate()

        // TODO 读取 MySQL 表数据
        spark.read.format("jdbc")
                .option("driver", "com.mysql.jdbc.Driver")
                .option("url", "jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8")
                .option("user", "root")
                .option("password", "123456")
                .option("dbtable", "order_info")
                .load()
                .createOrReplaceTempView("order_info")

        // TODO 在 Hive 中 ods 层创建表
        spark.sql("create database if not exists ods")

        spark.sql("drop table if exists ods.order_info")

        spark.sql(
            """
              |create table ods.order_info(
              |     order_id            int,
              |     order_sn            string,
              |     address             string,
              |     create_time         string,
              |     pay_time            string)
              |partitioned by(etl_date string)
              |row format delimited fields terminated by ','
              |""".stripMargin)

        // TODO 往表中写入数据
        spark.sql(
            """
              |insert into ods.order_info partition(etl_date="20230425")
              |select
              |     order_id,
              |     order_sn,
              |        address,
              |     create_time,
              |     pay_time
              |from
              |     order_info
              |""".stripMargin)

        // TODO 读取写入的数据
        spark.sql(
            """
              |select
              |     order_id,
              |     order_sn,
              |     address,
              |     create_time,
              |     pay_time
              |from
              |     ods.order_info
              |where
              |     order_sn = "2022031949619563"
              |""".stripMargin).show(20,truncate = false)

        spark.stop()


    }

}

抽取完成后,在 Hive 中查询 order_info 表订单编号值为 2022031949619563 的数据,输出结果如下:

+--------+----------------+-----------------------------------------------------------+-----------+--------------+
|order_id|order_sn        |address                                                    |create_time|pay_time      |
+--------+----------------+-----------------------------------------------------------+-----------+--------------+
|688     |2022031949619563|浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107         |10813|20220320184807|
|689     |2022031949619563|浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107         |10813|20220320184807|
|690     |2022031949619563|浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107         |10813|20220320184807|
|691     |2022031949619563|浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107         |10813|20220320184807|
+--------+----------------+-----------------------------------------------------------+-----------+--------------+

我们会发现,其中 create_time 的值变成了一个中文字符串。

我们去 MySQL 源数据中核查一下该订单编号的数据,查询结果如下:

+----------+------------------+-----------------------------------------------------------------------------------------------+----------------+----------------+
| order_id | order_sn         | address                                                                                       | create_time    | pay_time       |
+----------+------------------+-----------------------------------------------------------------------------------------------+----------------+----------------+
|      688 | 2022031949619563 | 浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107,10813| 20220320184807 | NULL           |
|      689 | 2022031949619563 | 浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107,10813| 20220320184807 | 20220321044607 |
|      690 | 2022031949619563 | 浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107,10813| 20220320184807 | 20220321044607 |
|      691 | 2022031949619563 | 浙江省杭州市教工路264370号世贸丽晶城欧美中心24370号楼G区107,10813| 20220320184807 | 20220321044607 |
+----------+------------------+-----------------------------------------------------------------------------------------------+----------------+----------------+

可以通过上下文对比轻松发现问题,数据抽取完成后,字段值出现了异常,那么为什么会这样呢?

问题解析

这是由于我们在创建 Hive 对应表时指定的字段分隔符是逗号 ",",所以在写入数据时,会默认将逗号作为字段分隔符。

        spark.sql(
            """
              |create table ods.order_info(
              |     order_id            int,
              |     order_sn            string,
              |     address             string,
              |     create_time         string,
              |     pay_time            string)
              |partitioned by(etl_date string)
              |row format delimited fields terminated by ','
              |""".stripMargin)

这里导致了字段 address 中的逗号被识别成了字段间隔符,在那之后的字段整体发生了错位,产生了数据异常。

问题解决

  1. 替换成其它分隔符

  2. 不指定分隔符,用 Hive 的默认分隔符 \u0001

在问题复现这里我们可以很容易的发现问题产生的原因,在实际开发中,字段特别多,而且不一定每个字段都会使用到,我这里是因为使用了发生错误的字段,所以才发现了这个问题,提醒各位在建表时需要格外注意指定的字段分隔符。

虽然是个很小的 BUG,但是往往容易错的就是这种小 BUG!

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
存储 SQL 关系型数据库
MySQL批量添加数据并取外表的某个字段值
MySQL批量添加数据并取外表的某个字段值
67 1
|
3月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
50 3
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
Hadoop-21 Sqoop 数据迁移工具 简介与环境配置 云服务器 ETL工具 MySQL与Hive数据互相迁移 导入导出
53 3
|
1月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
37 0
|
1月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
86 0
|
3月前
|
SQL 存储 关系型数据库
从Hive建表语句到MySQL的转换
【8月更文挑战第11天】
216 7
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL HIVE 分布式计算
|
SQL 分布式计算 HIVE
spark集成hbase与hive数据转换与代码练习
  帮一个朋友写个样例,顺便练手啦~一直在做平台的各种事,但是代码后续还要精进啊。。。 1 import java.util.Date 2 3 import org.apache.hadoop.
1259 0
下一篇
无影云桌面