利用MaxCompute内建函数及UDTF转换json格式日志数据

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 本文介绍了如何使用MaxCompute UDF对JSON格式的日志进行信息提取和转换。

一、业务场景分析:

由于业务的复杂性,数据开发者需要面对不同来源的不同类型数据,需要把这些数据抽取到数据平台,按照设计好的数据模型对关键业务字段进行抽取,形成一张二维表,以便后续在大数据平台/数据仓库中进行统计分析、关联计算。

本文结合一个具体的案例来介绍如何使用MaxCompute对json格式的日志数据进行转换处理。

1.数据来源:应用实时写入ECS主机的指定目录下的日志文件中;

2.数据格式:日志文件中,每条日志的格式如下图所示(示例中对数据进行了简化和脱敏),每一条日志中包含了设备信息,以及1个或多个Session信息,且每条日志中的Session数量是动态的:1个或多个Session。每条日志的内容示例如下:

1975d5bf9da1ca01f06c090833d312dd46d41696

3.数据处理需求:采集日志数据,对日志数据进行解析、转换,对转换后的日志数据在MaxCompute进行统计分析。由于日志数据是json格式的,其中包含了多个业务字段信息,需要将业务字段提前出来,才能在MaxCompute进行后续的业务统计(如进行按照时段进行PV/UV统计、按照设备类型进行统计、关联设备ID与会员信息进行统计等),所以本文的关键需求就是如何把json格式数据的关键信息解析为一张包含业务字段的二维表。

二、解决方案:

本文的解决方案中,选择使用阿里云的日志服务+MaxCompute产品组合来满足以上业务需求,其中日志服务仅仅完成日志采集和投递的职能,不做数据解析和转化工作。

1.日志采集:通过日志服务获取日志数据到logstore(这部分内容可参考日志服务帮助文档)58fd3efbab724e7e4b123109a2d9e944d4d71b91

2.通过日志服务的投递功能(帮助文档)将日志定时投递归档到MaxCompute的1张原始日志表,其中每条日志所有信息都写入到原始日志表的1个字段content中。

e381b2b726115b320116cb3a9da2fbd6d4b09326

3.利用MaxCompute对原始数据进行字段解析和提取。

1)利用内建函数get_json_object进行数据提取

select
get_json_object(content,'$.DeviceID') as DeviceID,
get_json_object(content,'$.UniqueIdentifier') as UniqueIdentifier,
get_json_object(content,'$.GameID') as GameID,
get_json_object(content,'$.Device') as Device,
get_json_object(content,'$.Sessions\[0].SessionID') as Session1_ID,
get_json_object(content,'$.Sessions\[0].Events\[0].Name') as Session1_EventName,
get_json_object(content,'$.Sessions\[1].SessionID') as Session2_ID,
get_json_object(content,'$.Sessions\[1].Events\[0].Name') as Session2_EventName
from log_target_json where pt='20180725' limit 10

提取的结果如下:

36f79492550a87845e50180ae9896ae8f6f080d0

方案总结:以上处理逻辑,是把一条日志的业务字段分别提取成为行字段,适合每个json记录中的信息固定且可以映射为表字段,例如上面的例子,把session1和session2的信息提取出来后,分别看做不同的列字段来处理。但如果每条日志记录包含的session数量是动态不固定的时候,这种处理逻辑就难以满足需要,例如下一条日志就包含了3个session,如果要提取每个session的信息,就要求解析的SQL增加Session3_ID, Session3_EventName逻辑,如果再下一条日志包含100个session呢?这种提取方式就很难处理了。

155533412bd56a44a6e3f055927ec8abcf1db6a4

这种情况,可以使用UDTF自定义函数来实现。

2)开发MaxCompute UDTF函数,对日志进行处理

根据数据特点,1条日志包含了多个session信息,属于1:N的关系,转换到数据仓库的二维表时,需要解析到最小粒度的session信息,把1行转成N行,提取所有session信息。业务目标如下所示:

bc8ec1ae61a9953f12d83b933d0d0342c7113d31

在MaxCompute中,对1行记录处理转换为多行记录需要使用UDTF来实现。

我们这里以JAVA UDTF为例,对content字段中的每条json记录进行解析,获取并返回需要提取的业务字段。这里的UDTF的处理逻辑会深入到json的第3级,循环解析出最小粒度的数据并返回多条记录。


package com.aliyun.odps;

import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import com.google.gson.Gson;

import java.io.IOException;
import java.util.List;
import java.util.Map;

@Resolve("string->string,string,string,string,string,string,string,string")
public class get_json_udtf extends UDTF {
    @Override
    public void process(Object[] objects) throws UDFException, IOException {
        String input = (String) objects[0];
        Map map = new Gson().fromJson(input, Map.class);

        Object deviceID = map.get("DeviceID");
        Object uniqueIdentifier = map.get("UniqueIdentifier");
        Object gameID = map.get("GameID");
        Object device = map.get("Device");

        List sessions = (List) map.get("Sessions");
        for (Object session : sessions) {
            Map sMap = (Map) session;
            Object sessionID = sMap.get("SessionID");
            List events = (List) sMap.get("Events");
            for (Object event : events) {
                String name = (String) ((Map) event).get("Name");
                String timestamp = (String) ((Map) event).get("Timestamp");
                String networkStatus = (String) ((Map) event).get("NetworkStatus");
                forward(deviceID, uniqueIdentifier,gameID,device,
                        sessionID,name,timestamp,networkStatus);
            }
        }
    }
}

注:关于UDF本身编写、打包上传、创建Function等知识请参阅官方文档https://help.aliyun.com/document_detail/27867.html。
程序编写完毕后,需要打包、上传UDTF并创建UDF函数:
对编译好的程序进行打包处理,生成jar包,在MaxCompute客户端(odpscmd)中,上传这个资源:
add jar maxcompute_demo-1.0-SNAPSHOT.jar -f;
然后通过命令行创建function:
create function get_json_udtf as com.aliyun.odps.get_json_udtf using maxcompute_demo-1.0-SNAPSHOT.jar';
创建后查看函数:
61b835bda61bca19230f38c6e335c23357d91adc
测试验证:
对包含原始日志的表进行查询,使用创建的get_json_udtf对content字段进行查询:30a15e6aadd76e9996edeb747f93b04fb15b5574
查询结果如下,UDFT函数对每条json记录进行处理,生成了多条记录,符合预期:03cbcc1cf7b051055a8a4b2c5b5c338781c4d441 
同时,如需要固化处理逻辑,还可以使用insert into语法,将解析后的结果查询到一张新表,通过作业调度来实现周期性的数据转换。


三、总结:

本文通过一个日志分析的大数据分析场景,通过一个常见的json日志处理的需求为例,介绍了通过日志服务采集日志到MaxCompute,同时使用MaxCompute的内建函数/UDF等方式,对json格式的日志数据进行解析和转换,提取关键业务字段、生成了可用于后续分析的日志表。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
19天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
128 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
2月前
|
PyTorch 算法框架/工具
Pytorch学习笔记(七):F.softmax()和F.log_softmax函数详解
本文介绍了PyTorch中的F.softmax()和F.log_softmax()函数的语法、参数和使用示例,解释了它们在进行归一化处理时的作用和区别。
439 1
Pytorch学习笔记(七):F.softmax()和F.log_softmax函数详解
|
19天前
|
SQL Oracle 关系型数据库
【赵渝强老师】Oracle的联机重做日志文件与数据写入过程
在Oracle数据库中,联机重做日志文件记录了数据库的变化,用于实例恢复。每个数据库有多组联机重做日志,每组建议至少有两个成员。通过SQL语句可查看日志文件信息。视频讲解和示意图进一步解释了这一过程。
|
2月前
|
数据采集 机器学习/深度学习 存储
使用 Python 清洗日志数据
使用 Python 清洗日志数据
39 2
|
3月前
|
SQL 人工智能 运维
在阿里云日志服务轻松落地您的AI模型服务——让您的数据更容易产生洞见和实现价值
您有大量的数据,数据的存储和管理消耗您大量的成本,您知道这些数据隐藏着巨大的价值,但是您总觉得还没有把数据的价值变现出来,对吗?来吧,我们用一系列的案例帮您轻松落地AI模型服务,实现数据价值的变现......
221 3
|
4月前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
58 0
|
4月前
|
开发者 前端开发 编解码
Vaadin解锁移动适配新境界:一招制胜,让你的应用征服所有屏幕!
【8月更文挑战第31天】在移动互联网时代,跨平台应用开发备受青睐。作为一款基于Java的Web应用框架,Vaadin凭借其组件化设计和强大的服务器端渲染能力,助力开发者轻松构建多设备适应的Web应用。本文探讨Vaadin与移动设备的适配策略,包括响应式布局、CSS媒体查询、TouchKit插件及服务器端优化,帮助开发者打造美观且实用的移动端体验。通过这些工具和策略的应用,可有效应对屏幕尺寸、分辨率及操作系统的多样性挑战,满足广大移动用户的使用需求。
67 0
|
4月前
|
存储 运维 监控
Entity Framework Core 实现审计日志记录超棒!多种方法助你跟踪数据变化、监控操作,超实用!
【8月更文挑战第31天】在软件开发中,审计日志记录对于跟踪数据变化、监控用户操作及故障排查至关重要。Entity Framework Core (EF Core) 作为强大的对象关系映射框架,提供了多种实现审计日志记录的方法。例如,可以使用 EF Core 的拦截器在数据库操作前后执行自定义逻辑,记录操作类型、时间和执行用户等信息。此外,也可通过在实体类中添加审计属性(如 `CreatedBy`、`CreatedDate` 等),并在保存实体时更新这些属性来记录审计信息。这两种方法都能有效帮助我们追踪数据变更并满足合规性和安全性需求。
86 0
|
存储 JSON 关系型数据库
【笔记】开发指南—JSON函数
本文介绍了PolarDB-X支持的JSON函数。
168 0
|
存储 JSON 关系型数据库
开发指南—JSON函数
本文介绍了PolarDB-X支持的JSON函数。
362 0

相关产品

  • 云原生大数据计算服务 MaxCompute