5)为每个用户的所有下单记录按照订单金额进行排名
(1)期望结果
(2)需求实现
select
order_id,
user_id,
user_name,
order_date,
order_amount,
rank() over(partition by user_id order by order_amount desc) rk,
dense_rank() over(partition by user_id order by order_amount desc) drk,
row_number() over(partition by user_id order by order_amount desc) rn
from order_info;
8.6 自定义函数
1)Hive自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。
2)当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。
3)根据用户自定义函数类别分为以下三种:
(1)UDF(User-Defined-Function)
一进一出。
(2)UDAF(User-Defined Aggregation Function)
用户自定义聚合函数,多进一出。
类似于:count/max/min
(3)UDTF(User-Defined Table-Generating Functions)
用户自定义表生成函数,一进多出。
如lateral view explode()
4)官方文档地址
https://cwiki.apache.org/confluence/display/Hive/HivePlugins
5)编程步骤
(1)继承Hive提供的类
org.apache.hadoop.hive.ql.udf.generic.GenericUDF
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
(2)实现类中的抽象方法
(3)在hive的命令行窗口创建函数
添加jar。
add jar linux_jar_path
创建function。
create [temporary] function [dbname.]function_name AS class_name;
(4)在hive的命令行窗口删除函数
drop [temporary] function [if exists] [dbname.]function_name;
8.7 自定义UDF函数
0)需求
自定义一个UDF实现计算给定基本数据类型的长度,例如:
hive(default)> select my_len("abcd");
4
1)创建一个Maven工程Hive
2)导入依赖
<dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.3</version> </dependency> </dependencies>
3)创建一个类
package com.atguigu.hive.udf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * 我们需计算一个要给定基本数据类型的长度 */ public class MyUDF extends GenericUDF { /** * 判断传进来的参数的类型和长度 * 约定返回的数据类型 */ @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length !=1) { throw new UDFArgumentLengthException("please give me only one arg"); } if (!arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE)){ throw new UDFArgumentTypeException(1, "i need primitive type arg"); } return PrimitiveObjectInspectorFactory.javaIntObjectInspector; } /** * 解决具体逻辑的 */ @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { Object o = arguments[0].get(); if(o==null){ return 0; } return o.toString().length(); } @Override // 用于获取解释的字符串 public String getDisplayString(String[] children) { return ""; } }
4)创建临时函数
(1)打成jar包上传到服务器/opt/module/hive/datas/myudf.jar
(2)将jar包添加到hive的classpath,临时生效
hive (default)> add jar /opt/module/hive/datas/myudf.jar;
(3)创建临时函数与开发好的java class关联
hive (default)>
create temporary function my_len
as "com.atguigu.hive.udf.MyUDF";
(4)即可在hql中使用自定义的临时函数
hive (default)>
select
ename,
my_len(ename) ename_len
from emp;
(5)删除临时函数
hive (default)> drop temporary function my_len;
注意:临时函数只跟会话有关系,跟库没有关系。只要创建临时函数的会话不断,在当前会话下,任意一个库都可以使用,其他会话全都不能使用。
创建永久函数
(1)创建永久函数
注意:因为add jar本身也是临时生效,所以在创建永久函数的时候,需要制定路径(并且因为元数据的原因,这个路径还得是HDFS上的路径)。
hive (default)>
create function my_len2
as "com.atguigu.hive.udf.MyUDF"
using jar "hdfs://hadoop102:8020/udf/myudf.jar";
(2)即可在hql中使用自定义的永久函数
hive (default)>
select
ename,
my_len2(ename) ename_len
from emp;
(3)删除永久函数
hive (default)> drop function my_len2;
注意:永久函数跟会话没有关系,创建函数的会话断了以后,其他会话也可以使用。
永久函数创建的时候,在函数名之前需要自己加上库名,如果不指定库名的话,会默认把当前库的库名给加上。
永久函数使用的时候,需要在指定的库里面操作,或者在其他库里面使用的话加上,库名.函数名。
第10章分区表和分桶表
10.1 分区表
Hive中的分区就是把一张大表的数据按照业务需要分散的存储到多个目录,每个目录就称为该表的一个分区。在查询时通过where子句中的表达式选择查询所需要的分区,这样的查询效率会提高很多。
简单来说就是本来在hdfs里最小的文件就是表,然后现在在表的目录下还有更小的文件就是分区,然后每个分区里又存储着相应的数据
这里的day=20220401就是分区目录
10.1.1 分区表基本语法
1. 创建分区表
hive (default)> create table dept_partition ( deptno int, --部门编号 dname string, --部门名称 loc string --部门位置 ) partitioned by (day string) row format delimited fields terminated by '\t';
2. 分区表读写数据
1)写数据
(1)load
1数据准备
在/opt/module/hive/datas/路径上创建文件dept_20220401.log,并输入如下内容。
[atguigu@hadoop102 datas]$ vim dept_20220401.log
10 行政部 1700
20 财务部 1800
2装载语句
hive (default)>
load data local inpath '/opt/module/hive/datas/dept_20220401.log' into table dept_partition
partition(day='20220401');
(2)insert
将day='20220401'分区的数据插入到day='20220402'分区,可执行如下装载语句
hive (default)>
insert overwrite table dept_partition partition (day = '20220402')
select deptno, dname, loc from dept_partition where day = '2020-04-01';
2)读数据
查询分区表数据时,可以将分区字段看作表的伪列,可像使用其他字段一样使用分区字段。(也就是读取分区表的所有数据的时候会显示day列的数据,也就是所有的分区信息)
select * from dept_partition where day = '20200401';
3. 分区表基本操作
1)查看所有分区信息
hive> show partitions dept_partition;
2)增加分区
(1)创建单个分区
hive (default)> alter table dept_partition add partition(day='20220403');
(2)同时创建多个分区(分区之间不能有逗号)
hive (default)> alter table dept_partition add partition(day='20220404') partition(day='20220405');
3)删除分区
(1)删除单个分区
hive (default)> alter table dept_partition drop partition (day='20220403');
(2)同时删除多个分区(分区之间必须有逗号)
hive (default)> alter table dept_partition drop partition (day='20220404'), partition(day='20220405');
4)修复分区
Hive将分区表的所有分区信息都保存在了元数据中,只有元数据与HDFS上的分区路径一致时,分区表才能正常读写数据。
若用户手动创建/删除分区路径,Hive都是感知不到的,这样就会导致Hive的元数据和HDFS的分区路径不一致。
再比如,若分区表为外部表,用户执行drop partition命令后,分区元数据会被删除,而HDFS的分区路径不会被删除,同样会导致Hive的元数据和HDFS的分区路径不一致。
若出现元数据和HDFS路径不一致的情况,可通过如下几种手段进行修复。以下第一第二两种方法在修复的时候都是根据hdfs的路径切修改mysql中的元数据信息,也就是把元数据信息删除或者新增,而不是hive去根据元数据信息然后去hdfs中去创建或者删除路径来进行修复
(1)add partition
若手动创建HDFS的分区路径,Hive无法识别,可通过add partition命令增加分区元数据信息,从而使元数据和分区路径保持一致。
(2)drop partition
若手动删除HDFS的分区路径,Hive无法识别,可通过drop partition命令删除分区元数据信息,从而使元数据和分区路径保持一致。
(3)msck
若分区元数据和HDFS的分区路径不一致,还可使用msck命令进行修复,以下是该命令的用法说明。
hive (default)>
msck repair table table_name [add/drop/sync partitions];
说明:
msck repair table table_name add partitions:该命令会增加HDFS路径存在但元数据缺失的分区信息。(这个跟上面的add partition效果是一样的,但是它可以批量的去修复,而add partition只能一条条精准的去修复)
msck repair table table_name drop partitions:该命令会删除HDFS路径已经删除但元数据仍然存在的分区信息。
msck repair table table_name sync partitions:该命令会同步HDFS路径和元数据分区信息,相当于同时执行上述的两个命令。
msck repair table table_name:等价于msck repair table table_name add partitions命令
10.1.2 二级分区表
思考:如果一天内的日志数据量也很大,如何再将数据拆分?答案是二级分区表,例如可以在按天分区的基础上,再对每天的数据按小时进行分区。
1)二级分区表建表语句
hive (default)>
create table dept_partition2(
deptno int, -- 部门编号
dname string, -- 部门名称
loc string -- 部门位置
)
partitioned by (day string, hour string)
row format delimited fields terminated by '\t';
2)数据装载语句
hive (default)>
load data local inpath '/opt/module/hive/datas/dept_20220401.log' into table dept_partition2
partition(day='20220401', hour='12');
3)查询分区数据
hive (default)>
select * from dept_partition2 where day='20220401' and hour='12';
10.1.3 动态分区
动态分区是指向分区表insert数据时,被写往的分区不由用户指定,而是由每行数据的最后一个字段的值来动态的决定。使用动态分区,可只用一个insert语句将数据写入多个分区。
动态分区是由每行数据的最后一个字段的值来动态的决定。这个字段其实就是你要手动添加一个分区字段loc,然后它根据这个字段来进行动态分区,跟下面的静态分区对比就发现下面的select汇总并没有day这个字段,因为已经指定了day是20220402
1)动态分区相关参数
(1)动态分区功能总开关(默认true,开启)
set hive.exec.dynamic.partition=true
(2)严格模式和非严格模式
动态分区的模式,默认strict(严格模式),要求必须指定至少一个分区为静态分区,nonstrict(非严格模式)允许所有的分区字段都使用动态分区。
set hive.exec.dynamic.partition.mode=nonstrict
(3)一条insert语句可同时创建的最大的分区个数,默认为1000。
set hive.exec.max.dynamic.partitions=1000
(4)单个Mapper或者Reducer可同时创建的最大的分区个数,默认为100。
set hive.exec.max.dynamic.partitions.pernode=100
(5)一条insert语句可以创建的最大的文件个数,默认100000。
hive.exec.max.created.files=100000
(6)当查询结果为空时且进行动态分区时,是否抛出异常,默认false。
hive.error.on.empty.partition=false
这里解释一下第六点,例如上面的select语句没有数据,那么就不会有分区产生,如果这个值设为false那就不会抛异常,反之(这个例子不是动态分区,但是意思都一样)
2)案例实操
需求:将dept表中的数据按照地区(loc字段),插入到目标表dept_partition_dynamic的相应分区中。
(1)创建目标分区表
hive (default)>
create table dept_partition_dynamic(
id int,
name string
)
partitioned by (loc int)
row format delimited fields terminated by '\t';
(2)设置动态分区
set hive.exec.dynamic.partition.mode = nonstrict;
hive (default)>
insert into table dept_partition_dynamic
partition(loc)
select
deptno,
dname,
loc
from dept;
(3)查看目标分区表的分区情况
hive (default)> show partitions dept_partition_dynamic;
10.2 分桶表
分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围划分,分区针对的是数据的存储路径,分桶针对的是数据文件。
分桶表的基本原理是,首先为每行数据计算一个指定字段的数据的hash值,然后模以一个指定的分桶数,最后将取模运算结果相同的行,写入同一个文件中,这个文件就称为一个分桶(bucket)。
10.2.1 分桶表基本语法
1)建表语句
hive (default)>
create table stu_buck(
id int,
name string
)
clustered by(id) into 4 buckets //这里注意分桶字段必须是表中字段,而上面的分区字段必须是表以外的其他字段
row format delimited fields terminated by '\t';
2)数据装载
(1)数据准备
在/opt/module/hive/datas/路径上创建student.txt文件,并输入如下内容。
1001 student1
1002 student2
1003 student3
1004 student4
1005 student5
1006 student6
1007 student7
1008 student8
1009 student9
1010 student10
1011 student11
1012 student12
1013 student13
1014 student14
1015 student15
1016 student16
(2)导入数据到分桶表中
说明:Hive新版本load数据可以直接跑MapReduce,老版的Hive需要将数据传到一张表里,再通过查询的方式导入到分桶表里面(也就是insert.....select)。
hive (default)>
load data local inpath '/opt/module/hive/datas/student.txt'
into table stu_buck;
(3)查看创建的分桶表中是否分成4个桶
(4)观察每个分桶中的数据
10.2.2 分桶排序表
1)建表语句
hive (default)>
create table stu_buck_sort(
id int,
name string
)
clustered by(id) sorted by(id) into 4 buckets
row format delimited fields terminated by '\t';
2)数据装载
(1)导入数据到分桶表中
hive (default)>
load data local inpath '/opt/module/hive/datas/student.txt' into table stu_buck_sort;
(2)查看创建的分桶表中是否分成4个桶
(3)观察每个分桶中的数据,分区排序表其实就是对桶里的内容按照id进行排序
第11章文件格式和压缩
11.1 Hadoop压缩概述
压缩格式 |
算法 |
文件扩展名 |
是否可切分 |
DEFLATE |
DEFLATE |
.deflate |
否 |
Gzip |
DEFLATE |
.gz |
否 |
bzip2 |
bzip2 |
.bz2 |
是 |
LZO |
LZO |
.lzo |
是 |
Snappy |
Snappy |
.snappy |
否 |
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示:
Hadoop查看支持压缩的方式hadoop checknative。
Hadoop在driver端设置压缩。
压缩性能的比较:
压缩算法 |
原始文件大小 |
压缩文件大小 |
压缩速度 |
解压速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
11.2 Hive文件格式
为Hive表中的数据选择一个合适的文件格式,对提高查询性能的提高是十分有益的。Hive表数据的存储格式,可以选择text file、orc、parquet、sequence file等。
11.2.1 Text File
文本文件是Hive默认使用的文件格式,文本文件中的一行内容,就对应Hive表中的一行记录。
可通过以下建表语句指定文件格式为文本文件:
create table textfile_table
(column_specs)
stored as textfile;
11.2.2 ORC
1)文件格式
ORC(Optimized Row Columnar)file format是Hive 0.11版里引入的一种列式存储的文件格式。ORC文件能够提高Hive读写数据和处理数据的性能。
与列式存储相对的是行式存储,下图是两者的对比:
如图所示左边为逻辑表,右边第一个为行式存储,第二个为列式存储。
(1)行存储的特点
查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。
(2)列存储的特点
因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。
前文提到的text file和sequence file都是基于行存储的,orc和parquet是基于列式存储的。
orc文件的具体结构如下图所示:
每个Orc文件由Header、Body和Tail三部分组成。
其中Header内容为ORC,用于表示文件类型。
Body由1个或多个stripe组成,每个stripe一般为HDFS的块大小,每一个stripe包含多条记录,这些记录按照列进行独立存储,每个stripe里有三部分组成,分别是Index Data,Row Data,Stripe Footer。
Index Data:一个轻量级的index,默认是为各列每隔1W行做一个索引。每个索引会记录第n万行的位置,和最近一万行的最大值和最小值等信息。
Row Data:存的是具体的数据,按列进行存储,并对每个列进行编码,分成多个Stream来存储。
Stripe Footer:存放的是各个Stream的位置以及各column的编码信息。
Tail由File Footer和PostScript组成。File Footer中保存了各Stripe的其实位置、索引长度、数据长度等信息,各Column的统计信息等;PostScript记录了整个文件的压缩类型以及File Footer的长度信息等。
在读取ORC文件时,会先从最后一个字节读取PostScript长度,进而读取到PostScript,从里面解析到File Footer长度,进而读取FileFooter,从中解析到各个Stripe信息,再读各个Stripe,即从后往前读。
3)建表语句
create table orc_table
(column_specs)
stored as orc
tblproperties (property_name=property_value, ...);//这里的property_value和property_name和下面的表格对应你
ORC文件格式支持的参数如下:
11.1.3 Parquet
Parquet文件是Hadoop生态中的一个通用的文件格式,它也是一个列式存储的文件格式。
Parquet文件的格式如下图所示:
上图展示了一个Parquet文件的基本结构,文件的首尾都是该文件的Magic Code,用于校验它是否是一个Parquet文件。
首尾中间由若干个Row Group和一个Footer(File Meta Data)组成。
每个Row Group包含多个Column Chunk,每个Column Chunk包含多个Page。以下是Row Group、Column Chunk和Page三个概念的说明:
行组(Row Group):一个行组对应逻辑表中的若干行。
列块(Column Chunk):一个行组中的一列保存在一个列块中。
页(Page):一个列块的数据会划分为若干个页。
Footer(File Meta Data)中存储了每个行组(Row Group)中的每个列快(Column Chunk)的元数据信息,元数据信息包含了该列的数据类型、该列的编码方式、该类的Data Page位置等信息。
3)建表语句
Create table parquet_table
(column_specs)
stored as parquet
tblproperties (property_name=property_value, ...);
支持的参数如下:
11.3 压缩
在Hive表中和计算过程中,保持数据的压缩,对磁盘空间的有效利用和提高查询性能都是十分有益的(这里的提高查询性能是因为压缩后会降低磁盘IO和降低网络IO)
11.2.1 Hive表数据进行压缩
在Hive中,不同文件类型的表,声明数据压缩的方式是不同的。
1)TextFile
若一张表的文件类型为TextFile,若需要对该表中的数据进行压缩,多数情况下,无需在建表语句做出声明。直接将压缩后的文件导入到该表即可,Hive在查询表中数据时,可自动识别其压缩格式,进行解压。
需要注意的是,在执行往表中导入数据的SQL语句时,用户需设置以下参数,来保证写入表中的数据是被压缩的。
--SQL语句的最终输出结果是否压缩 set hive.exec.compress.output=true; --输出结果的压缩格式(以下示例为snappy) set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec;
2)ORC
若一张表的文件类型为ORC,若需要对该表数据进行压缩,需在建表语句中声明压缩格式如下:
create table orc_table
(column_specs)
stored as orc
tblproperties ("orc.compress"="snappy");
3)Parquet
若一张表的文件类型为Parquet,若需要对该表数据进行压缩,需在建表语句中声明压缩格式如下:
create table orc_table
(column_specs)
stored as parquet
tblproperties ("parquet.compression"="snappy");
11.2.2 计算过程中使用压缩
1)单个MR的中间结果进行压缩
单个MR的中间结果是指Mapper输出的数据,对其进行压缩可降低shuffle阶段的网络IO,可通过以下参数进行配置:
--开启MapReduce中间数据压缩功能 set mapreduce.map.output.compress=true; --设置MapReduce中间数据数据的压缩方式(以下示例为snappy) set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
2)单条SQL语句的中间结果进行压缩
单条SQL语句的中间结果是指,两个MR(一条SQL语句可能需要通过MR进行计算)之间的临时数据,可通过以下参数进行配置:
--是否对两个MR之间的临时数据进行压缩 set hive.exec.compress.intermediate=true; --压缩格式(以下示例为snappy) set hive.intermediate.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
第12章 调优
12.1 计算资源配置
本教程的计算环境为Hive on MR。计算资源的调整主要包括Yarn和MR。
12.1.1 Yarn资源配置
1)Yarn配置说明
需要调整的Yarn参数均与CPU、内存等资源有关,核心配置参数如下
(1)yarn.nodemanager.resource.memory-mb
该参数的含义是,一个NodeManager节点分配给Container使用的内存。该参数的配置,取决于NodeManager所在节点的总内存容量和该节点运行的其他服务的数量(通常占总内存的二分之一到三分之二)。
考虑上述因素,此处可将该参数设置为64G,如下:
<property> <name>yarn.nodemanager.resource.memory-mb</name> <value>65536</value> </property>
(2)yarn.nodemanager.resource.cpu-vcores
该参数的含义是,一个NodeManager节点分配给Container使用的CPU核数。该参数的配置,同样取决于NodeManager所在节点的总CPU核数和该节点运行的其他服务。
考虑上述因素,此处可将该参数设置为16。
<property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>16</value> </property>
(3)yarn.scheduler.maximum-allocation-mb
该参数的含义是,单个Container能够使用的最大内存。推荐配置如下:
<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>16384</value> </property>
(4)yarn.scheduler.minimum-allocation-mb
该参数的含义是,单个Container能够使用的最小内存,推荐配置如下:
<property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>512</value> </property>
可以看出在yarn的配置中主要是调整Container容器的cpu和内存,因为hive最后转成的MapReduce job任务实际上都是在Container容器里运行的
2)Yarn配置实操
(1)修改$HADOOP_HOME/etc/hadoop/yarn-site.xml文件
(2)修改如下参数
<property> <name>yarn.nodemanager.resource.memory-mb</name> <value>65536</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>16</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>16384</value> </property> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>512</value> </property>
(3)分发该配置文件
(4)重启Yarn。