使用Hive进行OSS数据处理的一个最佳实践

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介:

本文主要介绍如何使用Hive来处理保存在OSS上的数据源,并通过E-MapReduce计算,最终的结果保存在OSS上,并能够每天自动的进行Hive的分区数据的调度

处理条件:

数据源:我们假设在OSS上我们的数据是按照一定的目录格式来保存的,比如时间,按照类似2016/06/01这样的年/月/日的方式存放。而原始数据内容都是一些非格式化的数据,完全没有经过处理。
类似如下的一个格式:

123|service control exceed 100. others content|192.168.0.1|2016-05-31

结果数据:我们需要把每个目录下的数据经过处理,写到OSS上类似2016/06/01的一个结果目录下

处理过程:

创建元数据表

CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';

通过这一步,我们有了一张Hive的分区表,Hive只是在它的元数据库中记录了这个表的信息,这个时候还没有数据的处理。而数据也还在我们的OSS上躺着。

接着把需要的分区都加入到表中,这里我假设我们有很多个分区

ALTER TABLE logoss ADD PARTITION (year='2016', month='05', day='31') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/05/31' PARTITION (year='2016', month='06', day='01') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/01' PARTITION (year='2016', month='06', day='02') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/02' PARTITION (year='2016', month='06', day='03') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/03';

接下来我们select数据看一下,执行如下

select * from logoss limit 100;

我们就会看到我们的分区中的内容了。

处理原始数据

我们要把原来OSS上的原始数据,经过处理然后写到一个HDFS上的表,然后用这个HDFS的表进行后续的一系列处理。这里把所有的中间步骤都在HDFS上走,这样速度会快很多。

首先建立一个基于HDFS的Hive表,目前数据也还是空

CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;

然后将OSS的数据进行处理并写入到HDFS的表中,这里我们使用IF NOT EXISTS,为了防止这个分区已经存在被我们覆盖掉,如果你希望数据直接覆盖,可以去掉这个条件判断。

INSERT OVERWRITE TABLE loghdfs PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;

业务处理

好了,到了这一步,我们就已经有了一个hdfs上的表了,我们可以对这个表进行任意的后续处理,
比如groupby 所有的ip,然后看他们的总数值

CREATE TABLE userip as select ip, count(id) from loghdfs group by ip;

中间可以进行类似的各种操作,由你的业务决定。
当所有的操作都完成以后,如果要把数据写到OSS上,那么来到最后一步

写回OSS

首先我们会创建一个对应OSS路径的Hive表,与第一步很类似

CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';

最后把我们的业务数据写入到对应的分区中去

INSERT OVERWRITE TABLE resultoss PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select ip, count FROM userip;

这样我们的结果数据就写到了OSS上对应的目录下,类似这样的路径

/path/year=2016/month=05/day=31/

如何自动化

看了上面的这个过程,会发现这中间这个时间的分区需要我们手工写在里面,实在是太麻烦了,完全没有办法自动跑啊,那么下面我们就来更加进化一下。

job上配置自动时间

我们首先在E-MapReduce控制台上编辑的时候使用hivevar来指定时间变量,如下

-hivevar year='2016' -hivevar month='05' -hivevar day='31' -f ossref://mypath/job.hql

然后,我们需要把这个里面的常量变成每天自动变化的时间,我们使用E-MapReduce提供的时间变量
如下

-hivevar year=' ${yyyy-1d}' -hivevar month=' ${MM-1d}' -hivevar day=' ${dd-1d}' -f ossref://mypath/job.hql

时间配置的说明请参考这里

完整的作业配置及代码

screenshot

现在我们看看修改完成以后的完整的代码,中间的分区时间都是用变量进行了替换

CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/';

ALTER TABLE logoss ADD PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/${hivevar:year}/${hivevar:month}/${hivevar:day}';

CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;

INSERT OVERWRITE TABLE loghdfs PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;

CREATE TABLE userip as select ip, count(id) as count from loghdfs group by ip;

CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/outpath/';

INSERT OVERWRITE TABLE resultoss PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select ip, count FROM userip;

然后你可以把这个作业加到一个周期执行的执行计划中,每天运行一次,就可以完全的自动每天跑数据啦。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
4月前
|
SQL 消息中间件 数据处理
DataX读取Hive Orc格式表丢失数据处理记录
DataX读取Hive Orc格式表丢失数据处理记录
212 0
|
1月前
|
SQL 物联网 数据处理
"颠覆传统,Hive SQL与Flink激情碰撞!解锁流批一体数据处理新纪元,让数据决策力瞬间爆表,你准备好了吗?"
【8月更文挑战第9天】数据时代,实时性和准确性至关重要。传统上,批处理与流处理各司其职,但Apache Flink打破了这一界限,尤其Flink与Hive SQL的结合,开创了流批一体的数据处理新时代。这不仅简化了数据处理流程,还极大提升了效率和灵活性。例如,通过Flink SQL,可以轻松实现流数据与批数据的融合分析,无需在两者间切换。这种融合不仅降低了技术门槛,还为企业提供了更强大的数据支持,无论是在金融、电商还是物联网领域,都将发挥巨大作用。
39 6
|
2月前
|
存储 JSON 自然语言处理
OSS数据源一站式RAG最佳实践
本文介绍了如何使用OpenSearch LLM智能问答版通过OSS数据源一站式构建RAG系统。
7061 11
|
4月前
|
存储 Cloud Native Serverless
云原生最佳实践系列 7:基于 OSS Object FC 实现非结构化文件实时处理
阿里云OSS对象存储方案利用函数计算FC,在不同终端请求时实时处理OSS中的原图,减少衍生图存储,降低成本。
|
12月前
|
SQL JSON 数据处理
大数据Hive JSON数据处理
大数据Hive JSON数据处理
159 0
|
4月前
|
SQL 关系型数据库 MySQL
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
295 0
|
4月前
|
SQL 分布式计算 关系型数据库
Sqoop数据导入到Hive表的最佳实践
Sqoop数据导入到Hive表的最佳实践
|
SQL 存储 监控
通过sdk查看oss投递(新版)延迟情况最佳实践
在投递任务中,日志服务会将运行日志写入到给定的logstore中,因而可以使用SDK来查看投递任务的当前状态,并进行批量查询,以了解多个Project和投递任务的状态。下面以查看oss投递的延迟为例,介绍客户提供操作步骤和常见的使用场景,以帮助客户更加方便地监控和管理投递任务。
通过sdk查看oss投递(新版)延迟情况最佳实践
|
4月前
|
存储 运维 监控
运维编排最佳实践:将运维编排任务执行记录投递到OSS/SLS
运维编排服务(Operation Orchestration Service),简称OOS,是全面、免费的云上自动化运维平台,提供运维任务的管理和执行。典型使用场景包括:事件驱动运维,批量操作运维,定时运维任务,跨地域运维等,OOS为重要运维场景提供审批,通知等功能。OOS帮您实现标准化运维任务,从...
运维编排最佳实践:将运维编排任务执行记录投递到OSS/SLS