Hive过滤脏数据的一些经验

简介: 如下文件需要处理,每个文件大概13G,其中字段以空格(32)分隔的7个字段;最麻烦的是中间有脏数据: -rw-r--r-- 1 hadoop ifengdev 1895843464 May 6 14:56 feedback201503_201.

如下文件需要处理,每个文件大概13G,其中字段以空格(32)分隔的7个字段;最麻烦的是中间有脏数据:

-rw-r--r-- 1 hadoop ifengdev 1895843464 May  6 14:56 feedback201503_201.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1896885848 May  6 14:59 feedback201503_202.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1891790676 May  6 15:00 feedback201503_203.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1894197100 May  6 15:01 feedback201503_204.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1894074074 May  6 15:02 feedback201503_205.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1829224750 May  6 16:13 feedback201504_201.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1831709571 May  6 16:14 feedback201504_202.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1824710879 May  6 16:30 feedback201504_203.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1827164031 May  6 16:31 feedback201504_204.tar.gz
-rw-r--r-- 1 hadoop ifengdev 1827911208 May  6 16:31 feedback201504_205.tar.gz

直接Load进Hive报错:

Loading data to table default.tmp_20150506
Failed with exception Wrong file format. Please check the file's format.
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.MoveTask

没办法中间格式有问题:

网上说改变存储格式可以避免报错:

CREATE  TABLE tmp_20150506(
  dt string,
  unknown1 string,
  unknown2 string,
  reurl string,
  uid string,
  num1 int,
  num2 int)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '32'
  LINES TERMINATED BY '10'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'

改为:

CREATE  TABLE tmp_20150506(
  dt string,
  unknown1 string,
  unknown2 string,
  reurl string,
  uid string,
  num1 int,
  num2 int)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '32'
  LINES TERMINATED BY '10'
STORED AS TEXTFILE;

确实不报错了,根据具体需求也算一个方法;

 

最直接的方法:

zcat feedback201503_201.tar.gz|gawk -F ' ' 'NF==7 {print $1, "\t", $2, "\t", $3, "\t", $4, "\t", $5, F ' ' 'NF==7 {print $1, "\t", $2, "\t", $3, "\t", $4, "\t", $5, "\t", $6, "\t", $7}' >> feedback20150, "\t", $6, "\t", $7}' >> feedback201503_204.log

功能:替换空格为制表符;并且过滤字段不满足要求的脏数据;

接着Load进Hive即可;

上述方法比较直接,但觉得“体力劳动“过多,可能我比较懒,所以相对喜欢下边的方法:

基本思路就是把一行作为一个字段load进Hive,利用Hive本身筛选数据:

CREATE  TABLE tmp_20150506_raw(
  allfilds string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '10'
  LINES TERMINATED BY '10'
STORED AS TEXTFILE;
FIELDS TERMINATED BY '10'
LINES TERMINATED BY '10'
都设置成换行符即可,进入Hive以后使用Hive筛选数据即可。
筛选数据并存入另外一张表中,
本例的后续处理过程如下
from
(
from
(
select allfilds from tmp_20150506_raw where size(split(allfilds, ' ')) = 7) a
select split(allfilds, ' ')[0] as dt, split(allfilds, ' ')[1] as unknown1, split(allfilds, ' ')[2] as unknown2, split(allfilds, ' ')[3] as reurl, split(allfilds, ' ')[4] as uid, split(allfilds, ' ')[5] as num1, split(allfilds, ' ')[6] as num2) b
insert overwrite table tmp_20150506 partition(month = '2015-04')
select *




 

目录
相关文章
|
SQL HIVE
Hive 常用的窗口函数【高频重点】(上)
Hive 常用的窗口函数【高频重点】
194 0
|
SQL HIVE
Hive 常用的窗口函数【高频重点】(下)
Hive 常用的窗口函数【高频重点】(下)
109 0
|
SQL Java Apache
【原创】hive关联hbase表后导致统计数据报错
环境说明: 搭建好的hadoop+hbase+zookeeper集群,因为hbase里面查询数据不支持select语句,所以搭建起了hive(数据仓库)。我的hive搭建过程也不做太多的介绍,用的是第三方数据库mysql存储hive的元数据。
1066 0
|
SQL 存储 分布式计算
数仓面试高频考点--解决hive小文件过多问题
小文件产生原因、小文件过多产生的影响以及怎么解决小文件过多问题
1089 0
|
9月前
|
SQL Java 数据库连接
Flink报错问题之查询维表报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
SQL 大数据 流计算
Flink SQL 功能解密系列 —— 解决热点问题的大杀器MiniBatch
在Blink的流式任务中,State相关的操作通常都会成为整个任务的性能瓶颈。实时计算部-查询和优化团队开发了MiniBatch功能,大幅降低了State操作的开销,在今年的双11中,几乎所有适用的任务都启用了MiniBatch功能。
|
SQL 消息中间件 监控
​实战:Flink 1.12 维表 Join Hive 最新分区功能体验
我们生产常有将实时数据流与 Hive 维表 join 来丰富数据的需求,其中 Hive 表是分区表,业务上需要关联上 Hive 最新分区的数据。上周 Flink 1.12 发布了,刚好支撑了这种业务场景,我也将 1.12 版本部署后做了一个线上需求并上线。对比之前生产环境中实现方案,最新分区直接作为时态表提升了很多开发效率,在这里做一些小的分享。
​实战:Flink 1.12 维表 Join Hive 最新分区功能体验
|
SQL 分布式计算 负载均衡
Hive SQL优化思路
Hive的优化主要分为:配置优化、SQL语句优化、任务优化等方案。其中在开发过程中主要涉及到的可能是SQL优化这块。
655 0
|
6月前
|
数据采集 Oracle 关系型数据库
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
JSON API 流计算
flink 读取iceberg 表数据流程
flink 如何读取iceberg 表数据,包括删除的数据文件如何合并
flink 读取iceberg 表数据流程

热门文章

最新文章