大数据开发笔记(四):Hive数据仓库

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: Hive主要解决海量结构化日志的数据统计分析,它是hadoop上的一种数据仓库工具,可以将结构化的数据文件映射成一张表,并提供类似于SQL的查询方式,本质上来说是将Hive转化成MR程序。

Hive思维导图


a82a5c8df91547ec1ad4d350d710976c.png

Hive介绍


Hive主要解决海量结构化日志的数据统计分析,它是hadoop上的一种数据仓库工具,可以将结构化的数据文件映射成一张表,并提供类似于SQL的查询方式,本质上来说是将Hive转化成MR程序。


Hive与其它数据库的区别


Hive数据是存储在HDFS,本质上是转换成mr程序执行,因此查询效率比较慢,涉及mr程序的资源调度和任务计算;


HDFS的数据操作是支持覆盖追加,它不支持update和事务;

扩展性好,可以在多个集群上做应用开发;


Hive的读时速度快,因为在加载数据时并不会做数据校验,在读取数据时才会校验数据;处理数据规模大,适合于海量数据查询。


Hive的优缺点:

优点:操作接口采用类SQL语法;不用写MR程序来计算;支持用户自定义函数

缺点:不支持update和事务;查询延时严重。

Hive的架构原理:

Hive首先是一个客户端工具,它提供了一些用户可操作的接口,可以通过交互shell,JDBC和web UI方式连接Hive,在Hive的内部有个Driver驱动器,驱动器里面实现了解析器,编译器,优化器和执行器的功能,在用Hsql查询表时,sql语句在驱动器中会先做语法和语义解析,解析之后再进行相应的语法编译,然后在通过优化器时产生逻辑计划和物理计划,并进行优化,最后在执行器中转换成对应的mr jar包,打包给hadoop集群运行获得结果.

其中,在用SQL查询语句之前,Hive会将存放在hdfs的数据和对应的表建立映射关系,而记录这些映射信息和表结构信息的元数据,会存放在Hive指定的数据库中,比如mysql或者它自身warehourse目录下。

大体流程如下:

用户接口(shell、JDBC、Web UI) --> Driver(解析、编译、优化、执行)–> MR程序 --> hadoop集群


Hive的交互方式有三种:Hive交互Shell,Hive JDBC服务和Hive的命令


Hive交互Shell:直接输入在hive中/bin目录下的hive命令,进行sql查询


JDBC服务:


启动hiveServer2服务:bin/hive --service hiveserver2


然后 beeline命令连接hiveserver2:bin/beelinebeeline>!connect jdbc:hive2://node1:10000


Hive命令:在hive命令后面加 -e 选项后,接sql查询语句即可。


bin/hive -e "show databases;"


Hive的数据类型:

和mysql的类似,常用的int、bigint、double、string、date、boolean,还有smallint、tinyint、float、varchar、timestamp,另外还有三种复合数据类型:array(数组),map(键值对),struct(一组命名的字段),复合类型在建表时需要特别指定。


Hive的建表操作

Hive的表可以分为外部表和内部表,外部表创建时需要执行EXTERNAL关键字,它仅记录数据所在的存储路径,删除数据时外部表只是删除表的元数据,在重新建表后能直接关联上原来的数据,通常用作底层表;而内部表不需要指定EXTERNAL关键字,在删除数据后会把表的元数据和真实数据一起删除,通常用作中间表。


建表时建议指定分区,partitioned by,分区是把表的数据分目录存储在不同文件夹中,后期查询时可以避免全量扫描提升查询效率,可以指定一级分区,二级分区等;分区有动态和静态之分,在插入数据时可以给定具体的分区值做静态分区的方式插入,也可以只写明分区键做动态分区的方式插入,动态分区的方式会根据插入数据的值自动划分分区,但可能产生较多的小文件,浪费系统内存和IO。


还可以指定分桶,Clustered by,分桶类似于将文件切分,它是将整个数据内容按照分桶键的值做hash算法,得出的结果再和分桶数做模运算来进行切分,指定了分桶表之后,后续select查询条件要加tablesample(bucket x out of y),适用于需要抽样调查的情况。


最后建表时可以指定每一行中字段的分隔符,用”row format delimited fields terminated by“指定;可以指定文件存储类型,stored as 二进制文件,文本文件、或列式存储文件。以及指定表在hdfs上的存储位置location。


Hive数据的导入和导出

Hive数据导入:

第一种,用load data导入数据:load data [local] inpath 'dataPath' override | into table student [partition 分区值];


第二种,创建表时指定location数据路径,后面如果该路径本身有数据会导入到表中,如果是空文件可以用手动上传数据文件到hdfs中:hdfs fs -put /opt/bigdata/student.txt /user/hive/warehouse/student1


第三种,可以在建表时as select * from 某张表,也可以insert into|override table时select * from某张表。


最后一种是直接import table,导入某个数据文件,前提是数据文件要先export准备好。


Hive数据导出:

第一种是insert导出:可以insert导出到本地或者hdfs,还可以指定导出文件后的分隔符


 

#加local导出到本地路径,默认文件分隔符时“\001”,之后本地会生成一个日志型的文件。
 insert override local directory          '/opt/bigdata/student';
 #格式化导出文件
insert override local directory '/opt/bigdata/student' row format delimited fields   terminated by ',';
#这里没有local  insert override directory '/export/student' row format delimited fields terminated by ','


第二种是Hadoop命令直接下载,由于表和数据有映射关系,每张表在hdfs上都能找到对应数据存储位置,所以我们可以直接下载下来的,后期要检查下数据和分隔符是否有问题。

hdfs fs -get /usr/hive/warehouse/student/student.txt /opt/bigdata/data


第三种是Hive shell命令导出,hive命令后面加-e或-f选项,再加sql查询语句指定到某个目录下,比如:


#1.hive -e “sql语句” >> file; 这种是直接执行sql语句,把结果导出到文件中。
 #2.hive -f "sql文件" > file; 这种是执行完sql文件后,将查询结果写入到file中
 bin/hive -e 'select * from default.student;' >> /opt/bigdata/student.txt


最后一种是export导出到hdfs


hive>export table student to '/usr/hive/warehouse/student';


Hive的文件存储格式和压缩方式:企业有效方式文件存储压缩是采用orc + snappy方式。


Hive的SerDe 序列化和反序列化,是使用Serde对行对象序列化和反序列化,方便数据加载到表中,最后实现把文件内容映射到hive表。如下所示:


HDFS file -> InputFileFormat -> key,value -> Deserializer(反序列化) -> Row object
 Row object -> Serializer(序列化) -> key,value -> OutputFileFormat -> HDFS file


建表时可以指定row format来使用SerDe。常用于企业解决多字符分割场景


数据倾斜现象和解决办法?(重要)

2. 将数据直接传到HDFS分区目录上,怎么让分区表和数据产生关联?

   因为上传到hdfs后,hive没有对应元数据信息所以无法查询到对应数据。可以上传数据后给分区表添加该目录的分区


   dfs -mkdir -p 分区目录dfs -put 分区数据hive>altertable 表明 addpartition(分区);


3. 桶表是否可以直接通过load将数据导入?

   不可以,因为load数据的话hdfs下只会有一个文件无法完成分桶的效果,需要通过中间表导入数据


4. hive的分区可以提高效率,那么分区是否越多越好?为什么?

   不是越多越好


hive底层是存储在hdfs上的,hdfs是适合存储大文件而不适合小文件,如果有越多的分区,那么会增加namenode的负担。

hive会转化成mr程序,mr会转化为多个task任务,多个小文件的话,每个文件一个task,每个task运行一个JVM实例,JVM的开启和销毁都会降低系统性能。

        所以分区数要合理设计,一般在3个以内。


5. 什么情况下Hive可以避免进行mapreduce?

如果是进行简单的查询,直接select,不带count,sum这些聚合函数的,都不会走mapreduce,而是直接读取hdfs目录中的文件。(fetch抓取)

另外如果查询语句中的过滤条件只是分区字段的情况下,也不会走mapreduce(fetch抓取)

         

select*from order_partition wheremonth='2019-03';

还有就是可以手动设置,让hive使用本地模式,当然这种有限制,需要查询的文件不超过256M或者文件数量不超过4个,否则系统还是会自动走mapreduce

       

set hive.exec.mode.local.auto =true;


6. order by ,sort by , distribute by , cluster by 的区别?

Order by会对所给的全部数据进行全局排序,只启动一个reduce来处理。

Sort by是局部排序,它可以根据数据量的大小启动一到多个reducer来工作,并且在每个reduce中单独排序。

Distribute by 类似于mr中的partition,采用hash算法,在map端将查询结果中hash值相同的结果分发到对应的reduce中,结合sort by使用

Cluster by 可以看作是distribute by 和sort by的结合,当两者后面所跟的字段列名相同时,效果就等同于使用cluster by,但是cluster by最终的结果只能是降序,无法指定升序和降序。


7. 如何将数据以动态分区的方式插入分区表中?

1.首先创建对应分区表和一张普通表

2.然后将数据加载到普通表

         load data local inpath '/opt/bigdata/order_partition'intotable tt_order;


3.最后利用普通表来将数据加载到动态分区表中

   

#先设置使用动态分区的参数和使用非严格模式
         set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;
         #然后通过普通表导入分区表
         insert into table order_partition partition(year,month) select order_number,order_price,substring(order_time,0,4) as                     year,substring(order_time,6,12) as month from tt_order;
          #注意导入的字段顺序,分区键一定要放在最后,否则会报错。


8. 数据倾斜现象和解决办法?(重要)

1.什么是数据倾斜?

大量相同特征的key出现在同一个reduce任务中,或者某个key对应的数据量远超过其它key的数据量,这种导致数据分布不均匀的现象就叫做数据倾斜。


2.数据倾斜的现象

在执行任务的时候,任务进度长时间卡在99%左右,查看任务监控页面或者详细日志信息,发现只有少量,一个或者几个reduce子任务没有跑完,主要因为这几个reduce任务处理的数据量和其它reduce任务差异过大。这种单一reduce任务的记录数与平均记录数差异过大,就会极大拖长计算时间。


现实工作中可能会遇到这样的情况比较多:比如大表join小表,其中小表有特别的key值比较集中,这样分发到某一个reduce上的数据就会高于平均值;或者是大表join大表中,作为连接判断的字段0值或者空值较多的,这些0值和空值后续都会由一个reduce处理,导致这个reduce处理量过多;再有的情况就是group by、**count( distinct )**某个字段值数据多而导致reduce处理耗时的情况。


3.数据倾斜的原因

key分布不均匀,比如空值,0值

业务数据本身的特性。

建表时考虑不周,导致后期join操作时数据倾斜

某些sql语句本身就有数据倾斜。比如用count(distinct),它会单独用一个reduce来计算统计,如果数据量很大,就会导致整个job很难完成。这种情况可以先用group by分出需要统计的字段,再进行sum或者count


4.数据倾斜的解决方案,有三个层面可以思考处理:

第一,SQL语句调优

查询语句加上具体需要的列和分区键,有些复杂表的字段会存储json格式的文本,这些字段不一定是需要查询的就可以过滤掉,减轻reduce计算负担

大表join小表时用map jion,让小表先进内存,然后大表与小表在map端完成join操作,避免reduce端处理。

大表join大表中,可以把空值的key变成一个字符串然后加上rand()随机数,后续mr的分区操作会把倾斜的数据重新分发到不同的reduce上,从而避免数据倾斜。或者在join 的on条件中先让key为空的值 不参与关联,等key不为空的数据相互合并连接后再union all加回key为空的数据。

                   

select*from a leftouterjoin b oncasewhere id isnullthen concat('任意字符串',rand())else id end= b.id;
                         select*from log a join  users b on a.id isnotnulland a.id = b.idunionallselct *from log a where a.id isnull;
查询语句中count(distinct) 改成group by + sum(),比如
                        select count(distinct id) from test; ==> select sum(id) from (select id from test group by id);

                       这种可能会多开一个reduce来完成group by的操作,但会明显提高查询速度。


针对不同数据类型产生的数据倾斜,存在这样的情况,A表中的id字段的数据类型是int,但join的B表中id字段存在脏数据,有一些是int类型但也有string类型的,那么再join操作时,默认的hash操作就会对int类型的key进行分配,而对于string类型的key会被统一分配到一个reduce中,这种情况就需要先进行类型转换,如 a join b on a.id = cast(b.id as int);

还有一些时候可以把数据倾斜的数据单独拿出来处理,然后再union all回去。

第二,通过设置hive参数配置解决,这种主要是优化计算速度,避免数据倾斜发生

开启map端聚合

                      并不是所有的聚合操作都需要在reducec端完成,很多聚合操作都可以现在map端先进行部分聚合,最后在reduce端得出最终结                         果(类似于mr过程中的combiner,预先合并压缩数据,再提供给reduce统计计算)再hive开启map端聚合后,一旦发现数据倾                         斜,系统就能自动负载均衡,把相同特征的key分发到不同的reduce中,主要通过hive.groupby.skewindata参数完成。


#开启map端聚合的设置
#是否在map段進行聚合,默认是true
set hive.map.aggr = true
#在map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;#有数据倾斜的时候进行负载均衡,比如把相同特征的key分发到不同的reduce中(默认是false)set hive.groupby.skewindata = true;

设置并行执行

        和oracle一样也可以利用并行执行提高查询速度,不同的是hive是靠参数来空值的


     

#开启并行执行set hive.exec.parallel = true;#设置同一个sql允许的最大并行度,默认是8set hive.exec.parallel.thread.number = 16;

设置压缩

         压缩可以在map端要进行shuffle时压缩和在完成reduce输出时压缩


Hive表中间数据压缩

     

#设置为true为激活中间数据压缩功能,默认是false,没有开启set hive.exec.compress.intermediate = true;#设置中间数据的压缩算法               set mapred map.output.compression = codec = org.apache.hadoop.io.compress.SnappyCodec;
                                 Hive表最终输出结果压缩
            set hive.exec.compress.output =true;set mapred map.output.compression = codec = org.apache.hadoop.io.compress.SnappyCodec;


推测执行

         说简单点就是Hadoop用了一个备份任务来同时执行,跟原来的任务相比较,谁先执行完成就用谁的计算结果作为最终的计算结果。具体            定义如下: Hadoop采用了推测执行机制,它根据一定的法则推测出”拖后腿“的任务,并为这样的任务启动一个备份任务,让备份任务              和原始任务同时处理一份数据,并最后选择优先执行完成的任务计算结果作为最终结果。


                 

#开启推测执行机制set hive.mapred.reduce.tasks.speculative.exection = true;
JVM重用
                    JVM重用可以使得JVM实例在同一个job中重新使用多次,减少进程的启动和销毁时间
                   #设置jvm重用个数set mapred.job.reuse.jvm.num.tasks = 5;
合理设置map数和reduce数
在map执行之前将小文件合并可以减少map数
                   #系统默认的格式,可以不用设置。set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;


对复杂文件可以增加map数

                    增加map方法有一个公式:compute(SliteSize(Math.max(minSize,Math.min(maxSize,blocksizs))))


                   公式- 调整maxSize最大值,让maxSize小于blocksize就可以增加map数- minSize默认等于1,maxSize默认等于blockSize大小。


               

#比如这样设置就可以达到增加map数的效果#设置每个map处理的文件maxSize大小为10M,这样小于一个block128M的话,系                        统就会分配更多10M的map来处理复杂任务。
                     set mapreduce.input.fileinputformat.split.maxsize = 10485760;
合理设置Reduce数,比如设置每个job中reduce的个数为3个
                                set mapreduce.job.reduces =3;


第三,修改MR程序去避免数据倾斜

可以在MR程序的reduce方法中追踪每个键的最大值,并且设置阈值,当超过该阈值时就可以认为发生了数据倾斜,然后输出到日志文件进行分析。第二种是在编写MR程序时,从业务层面去考虑自定义的分区键是否合理。就跟ADS库建表时可以默认指定哪个字段作为分区键。

MR程序中改用TotalOrderPartitioner替换HashPartitioner,它可以通过对原始数据进行抽样得到的结果集来预设分区边界值,也就是能找出导致数据倾斜的key值,再分散处理。

MR程序中使用Combiner。

优化相关


1.MaxCompute SQL中用到哪些优化?


首先优化SQL的过程,实际上就是要尽可能减少IO读取,尽可能减少计算资源的使用,尽可能减少SQL复杂度,尽可能提升运行速度。


建分区表,但建议分区层数不超过3层,后续查询时为了避免全表扫描需要分区裁剪,分区值尽量常量化,避免不可确定值;插入数据时尽量采用写入静态分区的方式,优化数据存储,如果用动态分区,会生成较多的小文件,增加系统负担。

只select有效列,并用limit限制返回的条数。

读取相同源表时可以合并成一条sql,系统会优化只读取一次。


mapjoin优化

将full outer join 改为left outer join + union all 并对小表使用上mapjoin

尽可能保证表达式两边的数据类型一致,如果发生隐式转换容易造成精度问题,比如string和bigint都转成double来相等比较,悲观情况下,可能触发数据倾斜。这时要cast显式转换一下

少用distinct,容易触发数据倾斜,count(distinct)处理的时间会很长可以转换成count()+ group by

多个表join时,join顺序很重要,优先选择join结果输出小的表先关联,能有效减少中间数据量,节省IO和计算资源。

尽量使用内置的UDF函数和窗口函数,内置UDF在实现做了很多优化,运行块,省资源,窗口函数本身能处理很多复杂问题。

尽量避免Order by,order by会触发全局排序,只能单点运行,效率低,如果业务允许,可以改成distribute by + sort by


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
39 0
|
3月前
|
SQL JavaScript 前端开发
用Java来开发Hive应用
用Java来开发Hive应用
37 7
|
3月前
|
SQL JavaScript 前端开发
用Java、Python来开发Hive应用
用Java、Python来开发Hive应用
34 6
|
4月前
|
存储 机器学习/深度学习 数据采集
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
深入解析大数据核心概念:数据平台、数据中台、数据湖与数据仓库的异同与应用
|
4月前
|
消息中间件 存储 大数据
大数据-数据仓库-实时数仓架构分析
大数据-数据仓库-实时数仓架构分析
146 1
|
3月前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
50 0
|
3月前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
99 0
|
4月前
|
存储 SQL 分布式计算
MaxCompute 在大规模数据仓库中的应用
【8月更文第31天】随着大数据时代的到来,企业面临着海量数据的存储、处理和分析挑战。传统的数据仓库解决方案在面对PB级甚至EB级的数据规模时,往往显得力不从心。阿里云的 MaxCompute(原名 ODPS)是一个专为大规模数据处理设计的服务平台,它提供了强大的数据存储和计算能力,非常适合构建和管理大型数据仓库。本文将探讨 MaxCompute 在大规模数据仓库中的应用,并展示其相对于传统数据仓库的优势。
132 0
|
5月前
|
SQL 分布式计算 MaxCompute
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决
SQL开发问题之对于ODPS中的UNION操作,执行计划的问题如何解决