大数据Hive多字节分隔符

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
简介: 大数据Hive多字节分隔符

1 应用场景

1.1 Hive中的分隔符

Hive中默认使用单字节分隔符来加载文本数据,例如逗号、制表符、空格等等,默认的分隔符为\001。根据不同文件的不同分隔符,我们可以通过在创建表时使用 row format delimited fields terminated by ‘单字节分隔符’ 来指定文件中的分割符,确保正确将表中的每一列与文件中的每一列实现一一对应的关系。

1.2 特殊数据

在实际工作中,我们遇到的数据往往不是非常规范化的数据,例如我们会遇到以下的两种情况

➢ 上图中每列的分隔符为||,为多字节分隔符

➢ 情况二:数据的字段中包含了分隔符

➢ 上图中每列的分隔符为空格,但是数据中包含了分割符,时间字段中也有空格

192.168.88.134 [08/Nov/2020:10:44:32 +0800] “GET / HTTP/1.1” 404 951

2.2 问题与需求

2.2.1 问题

基于上述的两种特殊数据,我们如果使用正常的加载数据的方式将数据加载到表中,就会出以下两种错误:

➢ 情况一:加载数据的分隔符为多字节分隔符

➢ 创建表

–如果表已存在就删除表

drop table if exists singer;
--创建表
create table singer(
 id string,--歌手id
 name string,--歌手名称
 country string,--国家
 province string,--省份
 gender string,--性别
 works string--作品
)
--指定列的分隔符为||
row format delimited fields terminated by '||';

➢ 加载数据

load data local inpath ‘/export/data/test01.txt’ into table singer;

➢ 查看结果

select * from singer;

➢ 问题

数据发生了错位,没有正确的加载每一列的数据

➢ 原因

Hive中默认只支持单字节分隔符,无法识别多字节分隔符

➢ 情况二:数据中包含了分隔符

➢ 创建表

--如果表存在,就删除表
drop table if exists apachelog;
--创建表
create table apachelog(
 ip string,      --IP地址
 stime string,    --时间
 mothed string,  --请求方式
 url string,     --请求地址
 policy string,  --请求协议
 stat string,    --请求状态
 body string     --字节大小
)
--指定列的分隔符为空格
row format delimited fields terminated by ' ';

➢ 加载数据

load data local inpath ‘/export/data/apache_web_access.log’ into table apachelog;

➢ 查看结果

select * from apachelog;

➢ 问题

时间字段被切分成了两个字段,后面所有的字段出现了错位

➢ 原因

时间数据中包含了分隔符,导致Hive认为这是两个字段,但实际业务需求中,为一个字段

2.2 需求

基于上面两种情况的测试发现,当数据中出现了多字节分隔符或者数据中的某个字段包含了分隔符,就会导致数据加载错位的问题。基于出现的问题,我们需要通过特殊的方法来解决该问题,即使当数据中出现多字节分隔符等情况时,Hive也能正确的加载数据,实现列与数据的一一对应。

3 解决方案一:替换分隔符

3.1 方案概述

面对情况一,如果数据中的分隔符是多字节分隔符,可以使用程序提前将数据中的多字节分隔符替换为单字节分隔符,然后使用Hive加载,就可以实现正确加载对应的数据。

例如:原始数据中的分隔符为“||”

3.2 程序开发

可以在ETL阶段通过一个MapReduce程序,将“||”替换为单字节的分隔符“|”,示例程序如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
 * @ClassName ChangeSplitCharMR
 * @Description TODO MapReduce实现将多字节分隔符转换为单字节符
 * @Create By  itcast
 */
public class ChangeSplitCharMR extends Configured implements Tool {
    public int run(String[] arg) throws Exception {
        /**
         * 构建Job
         */
        Job job = Job.getInstance(this.getConf(),"changeSplit");
        job.setJarByClass(ChangeSplitCharMR.class);
        /**
         * 配置Job
         */
        //input:读取需要转换的文件
        job.setInputFormatClass(TextInputFormat.class);
        Path inputPath = new Path("datas/split/test01.txt");
        FileInputFormat.setInputPaths(job,inputPath);
        //map:调用Mapper
        job.setMapperClass(ChangeSplitMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        //reduce:不需要Reduce过程
        job.setNumReduceTasks(0);
        //output
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path("datas/output/changeSplit");
        TextOutputFormat.setOutputPath(job,outputPath);
        /**
         * 提交Job
         */
        return job.waitForCompletion(true) ? 0 : -1;
    }
    //程序入口
    public static void main(String[] args) throws Exception {
        //调用run
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new ChangeSplitCharMR(), args);
        System.exit(status);
    }
    public static class ChangeSplitMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
        //定义输出的Key
        private Text outputKey = new Text();
        //定义输出的Value
        private NullWritable outputValue = NullWritable.get();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取每条数据
            String line = value.toString();
            //将里面的||转换为|
            String newLine = line.replaceAll("\\|\\|", "|");
            //替换后的内容作为Key
            this.outputKey.set(newLine);
            //输出结果
            context.write(this.outputKey,this.outputValue);
        }
    }
}

➢ 程序执行结果如下:

3.3 重新建表加载数据

➢ 重新创建Hive表

--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
 id string,--歌手id
 name string,--歌手名称
 country string,--国家
 province string,--省份
 gender string,--性别
 works string--作品
)
--指定列的分隔符为||
row format delimited fields terminated by '|';

➢ 在Hive中重新加载数据

load data local inpath ‘/export/data/part-m-00000’ into table singer;

3.4 查看结果

➢ 查看结果

3.5 总结

在ETL阶段可以直接对数据进行分隔符的替换,通过替换分隔符将多字节分隔符更改为单字节分隔符,就可以解决数据加载的问题,但是这种方式有对应的优缺点,并不是所有的场景适用于该方法。

优点:实现方式较为简单,基于字符串替换即可

缺点:无法满足情况2的需求

4 解决方案二:RegexSerDe正则加载

4.1 方案概述

面对情况一和情况二的问题,Hive中提供了一种特殊的方式来解决,Hive提供了一种特殊的Serde来加载特殊数据的问题,使用正则匹配来加载数据,匹配每一列的数据。

官网地址:https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-ApacheWeblogData

4.2 什么是SerDe?

Hive的SerDe提供了序列化和反序列化两个功能,SerDe是英文Serialize和Deserilize的组合缩写,用于实现将Hive中的对象进行序列化和将数据进行反序列化。

Serialize就是序列化,用于将Hive中使用的java object转换成能写入hdfs的字节序列,或者其他系统能识别的流文件。Hive中的insert语句用于将数据写入HDFS,所以就会调用序列化实现。Hive中的调用过程如下:

e523894eb6674c029a1ebfc203fabcd7.png

Deserilize就是反序列化,用于将字符串或者二进制数据流转换成Hive能识别的java object对

象。所有Hive中的Select语句在查询数据时,需要将HDFS中的数据解析为Hive中对象,就需要进行

反序列化。Hive可以方便的将数据加载到表中而不需要对数据进行转换,这样在处理海量数据时可

以节省大量的时间。Hive中的调用过程如下:


2ea4daad2ec54febae068a3256a39a27.png

4.3 Hive中包含的SerDe

官网地址:https://cwiki.apache.org/confluence/display/Hive/SerDe

Hive中默认提供了多种SerDe用于解析和加载不同类型的数据文件,常用的有ORCSerde 、RegexSerde、JsonSerDe等。

4.4 RegexSerDe的功能

RegexSerde是Hive中专门为了满足复杂数据场景所提供的正则加载和解析数据的接口,使用RegexSerde可以指定正则表达式加载数据,根据正则表达式匹配每一列数据。上述过程中遇到的情况一和情况二的问题,都可以通过RegexSerDe使用正则表达式来加载实现。

4.5 RegexSerDe解决多字节分隔符

➢ 分析数据格式,构建正则表达式

➢ 原始数据格式

01||周杰伦||中国||台湾||男||七里香

➢ 正则表达式定义每一列

([0-9])\|\|(.)\|\|(.)\|\|(.)\|\|(.)\|\|(.)

➢ 正则校验

➢ 基于正则表达式,使用RegexSerde建表

–如果表已存在就删除表

drop table if exists singer;
--创建表
create table singer(
 id string,--歌手id
 name string,--歌手名称
 country string,--国家
 province string,--省份
 gender string,--性别
 works string--作品
)
--指定使用RegexSerde加载数据
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
--指定正则表达式
WITH SERDEPROPERTIES (
  "input.regex" = "([0-9]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)\\|\\|([^}]*)"
);

➢ 加载数据

load data local inpath ‘/export/data/test01.txt’ into table singer;

➢ 查看数据结果

select * from singer;

每一列的数据都被正常的加载,没有错位

4.6 RegexSerDe解决数据中包含分割符

➢ 分析数据格式,构建正则表达式

➢ 原始数据格式

192.168.88.100 [08/Nov/2020:10:44:33 +0800] “GET /hpsk_sdk/index.html HTTP/1.1” 200 328


➢ 正则表达式定义每一列

([^ ]) ([^}]) ([^ ]) ([^ ]) ([^ ]) ([0-9]) ([^ ]*)

➢ 正则校验

➢ 基于正则表达式,使用RegexSerde建表

–如果表存在,就删除表

drop table if exists apachelog;

–创建表

create table apachelog(

ip string, --IP地址

stime string, --时间

mothed string, --请求方式

url string, --请求地址

policy string, --请求协议

stat string, --请求状态

body string --字节大小

)

–指定使用RegexSerde加载数据

ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.RegexSerDe’

–指定正则表达式

WITH SERDEPROPERTIES (

“input.regex” = “([^ ]) ([^}]) ([^ ]) ([^ ]) ([^ ]) ([0-9]) ([^ ]*)”

);


➢ 加载数据

load data local inpath ‘/export/data/apache_web_access.log’ into table apachelog;

➢ 查看数据结果

4.7 总结

RegexSerde使用简单,对于各种复杂的数据场景,都可以通过正则定义匹配每行中的每个字段,基本上可以满足大多数场景的需求,工作中推荐使用该方式来实现对于复杂数据的加载。

5 解决方案三:自定义InputFormat

5.1 方案概述

Hive中也允许使用自定义InputFormat来解决以上问题,通过在自定义InputFormat,来自定义解析逻辑实现读取每一行的数据。

5.2 自定义InputFormat

➢ 自定义InputFormat继承自TextInputFormat,读取数据时将每条数据中的”||”全部替换成“|”

➢ 自定义InputFormat

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
/**
 * @ClassName UserInputFormat
 * @Description TODO 用于实现自定义InputFormat,读取每行数据
 * @Create By     Itcast
 */
public class UserInputFormat extends TextInputFormat {
    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job,
                                                            Reporter reporter) throws IOException {
        reporter.setStatus(genericSplit.toString());
        UserRecordReader reader = new UserRecordReader(job,(FileSplit)genericSplit);
        return reader;
    }
}

➢ 自定义RecordReader

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
import java.io.InputStream;
/**
 * @ClassName UserRecordReader
 * @Description TODO 用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
 * @Create By     Itcast
 */
public class UserRecordReader implements RecordReader<LongWritable, Text> {
    private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName());
    int maxLineLength;
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private Seekable filePosition;
    private CompressionCodec codec;
    private Decompressor decompressor;
    public UserRecordReader(Configuration job, FileSplit split) throws IOException {
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        codec = compressionCodecs.getCodec(file);
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        if (isCompressedInput()) {
            decompressor = CodecPool.getDecompressor(codec);
            if (codec instanceof SplittableCompressionCodec) {
                final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)
                        .createInputStream(fileIn, decompressor, start, end,
                                SplittableCompressionCodec.READ_MODE.BYBLOCK);
                in = new LineReader(cIn, job);
                start = cIn.getAdjustedStart();
                end = cIn.getAdjustedEnd();
                filePosition = cIn; // take pos from compressed stream
            } else {
                in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
                filePosition = fileIn;
            }
        } else {
            fileIn.seek(start);
            in = new LineReader(fileIn, job);
            filePosition = fileIn;
        }
        if (start != 0) {
            start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
    }
    private boolean isCompressedInput() {
        return (codec != null);
    }
    private int maxBytesToConsume(long pos) {
        return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
    }
    private long getFilePosition() throws IOException {
        long retVal;
        if (isCompressedInput() && null != filePosition) {
            retVal = filePosition.getPos();
        } else {
            retVal = pos;
        }
        return retVal;
    }
    public LongWritable createKey() {
        return new LongWritable();
    }
    public Text createValue() {
        return new Text();
    }
    /**
     * Read a line.
     */
    public synchronized boolean next(LongWritable key, Text value) throws IOException {
        while (getFilePosition() <= end) {
            key.set(pos);
            int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
            String str = value.toString().replaceAll("\\|\\|", "\\|");
            value.set(str);
            pos += newSize;
            if (newSize == 0) {
                return false;
            }
            if (newSize < maxLineLength) {
                return true;
            }
            LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
        return false;
    }
    public float getProgress() throws IOException {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));
        }
    }
    public synchronized long getPos() throws IOException {
        return pos;
    }
    public synchronized void close() throws IOException {
        try {
            if (in != null) {
                in.close();
            }
        } finally {
            if (decompressor != null) {
                CodecPool.returnDecompressor(decompressor);
            }
        }
    }
    public static class LineReader extends org.apache.hadoop.util.LineReader {
        LineReader(InputStream in) {
            super(in);
        }
        LineReader(InputStream in, int bufferSize) {
            super(in, bufferSize);
        }
        public LineReader(InputStream in, Configuration conf) throws IOException {
            super(in, conf);
        }
    }
}

5.3 基于自定义Input创建表

➢ 将开发好的InputFormat打成jar包,放入Hive的lib目录中

➢ 在Hive中,将jar包添加到环境变量中

add jar /export/server/hive-3.1.2-bin/lib/HiveUserInputFormat.jar;

该方法可以实现临时添加,如果希望永久生效,重启Hive即可

➢ 创建表,指定自定义的InputFormat读取数据

--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
 id string,--歌手id
 name string,--歌手名称
 country string,--国家
 province string,--省份
 gender string,--性别
 works string--作品
)
--指定使用分隔符为|
row format delimited fields terminated by '|'
stored as 
--指定使用自定义的类实现解析
inputformat 'bigdata.itcast.cn.hive.mr.UserInputFormat' 
outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';

➢ 加载数据

load data local inpath ‘/export/data/test01.txt’ into table singer;

5.4 查看结果

select * from singer;

数据正常匹配,没有出现错位。

6 总结

当数据文件中出现多字节分隔符或者数据中包含了分隔符时,会导致数据加载与实际表的字段不匹配的问题,基于这个问题我们提供了三种方案:替换分隔符、正则加载及自定义InputFormat来实现,其中替换分隔符无法解决数据中存在分隔符的问题,自定义InputFormat的开发成本较高,所以整体推荐使用正则加载的方式来实现对于特殊数据的处理。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
29天前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
30 0
|
4月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。
|
4月前
|
分布式计算 Java 调度
MaxCompute产品使用合集之使用Tunnel Java SDK上传BINARY数据类型时,应该使用什么作为数据类字节
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
4月前
|
分布式计算 DataWorks 调度
MaxCompute产品使用合集之如何将数据迁移到CDH Hive
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
SQL 数据采集 数据可视化
基于Hive的招聘网站的大数据分析系统
基于Hive的招聘网站的大数据分析系统
128 2
|
5月前
|
SQL 关系型数据库 MySQL
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
基于Hive的天气情况大数据分析系统(通过hive进行大数据分析将分析的数据通过sqoop导入到mysql,通过Django基于mysql的数据做可视化)
174 0
|
5月前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之启用hive兼容的时候,某个字段是null,是否会把这个字段当成空白连起来
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
681 1
|
6月前
|
SQL 分布式计算 大数据
[AIGC 大数据基础]hive浅谈
[AIGC 大数据基础]hive浅谈
|
28天前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势