阿里云大数据MaxCompute基于UDTF解析JSON日志的案例

简介: 因为MaxCompute提供的系统函数有限,所以平台提供了强大的自定义函数(UDF)来进行复杂的数据处理,因为MaxCompute的沙箱机制,所以解析JSON日志串的时候需要使用GSON来进行解析,本例中原始数据可能是从其他DB通过数据集成同步到MaxCompute平台上的,所以MaxComput.

因为MaxCompute提供的系统函数有限,所以平台提供了强大的自定义函数(UDF)来进行复杂的数据处理,因为MaxCompute的沙箱机制,所以解析JSON日志串的时候需要使用GSON来进行解析,本例中原始数据可能是从其他DB通过数据集成同步到MaxCompute平台上的,所以MaxCompute可能有个待处理的原始表如下(按照天来同步日志会有个年月日的分区,根据实际业务加上即可):


create table t_biz_log(
    BIGINT id,
    STRING logcontent
)

上表logcontent待解析JSON日志串案例如下(字符串中的VALUE文本可能出现特殊字符如反斜线\等)


[{"acsRegion":"cn-huhehaote","apiVersion":"2016-04-28","errorCode":"Forbindden","errorMessage":"The specified Instance already bind eip",
"eventId":"01168520-E248-4949-84AC-48EF6FA59292","eventName":"CreateForwardEntry","eventSource":"aliyuncs.com","eventTime":"2018-04-11T07:32:47Z",
"eventType":"ApiCall","eventVersion":"1","isGlobal":false,"requestId":"01168520-E248-48EF6FA59292",
"requestParameters":{"_response_json_parse":"\"true\"","IpProtocol":"\"tcp\"","ExternalIp":"\"39.10.2.1\"","ForwardTableId":"\"ftb-hp3bbrmtlho\"",
"SecureTransport":"\"true\"","needarrayitemname":"\"true\"","RequestId":"\"01168-E248-4949-84AC-48EF6FA59292\"","ExternalPort":"\"12\"",
"RegionId":"\"cn-huhehaote\"","InternalPort":"\"112\"","HostId":"\"huhehaote.aliyuncs.com\"","InternalIp":"\"192.168.1.167\""},"serviceName":"Vpc",
"sourceIpAddress":"106.11.34.11","userIdentity":{"accountId":"44404","principalId":"44404",
"sessionContext":{"attributes":{"creationDate":"2018-04-11T07:32:47Z","mfaAuthenticated":"false"}},"type":"root-account","userName":"root"}}]

案例过程如下:

第一步:分析上面的JSON找出想要的关键信息并创建MaxCompute表:


create table t_analysis_log(
    String acsRegion,
    String apiVersion,
    String eventId,
    String eventName,
    String eventSource,
    String eventTime,
    String eventType,
    String eventVersion,
    String requestId,
    String SourceCidrIp,
    String SecurityGroupId,
    String IpProtocol,
    String NicType,
    String Policy,
    String PortRange,
    String serviceName,
    String sourceIpAddress,
    String userAgent,
    String accessKeyId,
    String accountId,
    String principalId,
    String type,
    String userName
)

第二步:创建实体类(因为上面的json有嵌套结构,所以需要类结构也是关联模式)


package com.kangyu;

public class AnalysisObj {

    private String            acsRegion;
    private String            apiVersion;
    private String            eventId;
    private String            eventName;
    private String            eventSource;
    private String            eventTime;
    private String            eventType;
    private String            eventVersion;
    private String            requestId;
    private RequestParameters requestParameters;
    private String            serviceName;
    private String            sourceIpAddress;
    private String            userAgent;
    private UserIdentity      userIdentity;

    public String getApiVersion() {
        return apiVersion;
    }

    public void setApiVersion(String apiVersion) {
        this.apiVersion = apiVersion;
    }

    public String getEventId() {
        return eventId;
    }

    public void setEventId(String eventId) {
        this.eventId = eventId;
    }

    public String getEventName() {
        return eventName;
    }

    public void setEventName(String eventName) {
        this.eventName = eventName;
    }

    public String getEventSource() {
        return eventSource;
    }

    public void setEventSource(String eventSource) {
        this.eventSource = eventSource;
    }

    public String getEventTime() {
        return eventTime;
    }

    public void setEventTime(String eventTime) {
        this.eventTime = eventTime;
    }

    public String getEventType() {
        return eventType;
    }

    public void setEventType(String eventType) {
        this.eventType = eventType;
    }

    public String getEventVersion() {
        return eventVersion;
    }

    public void setEventVersion(String eventVersion) {
        this.eventVersion = eventVersion;
    }

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public String getSourceIpAddress() {
        return sourceIpAddress;
    }

    public void setSourceIpAddress(String sourceIpAddress) {
        this.sourceIpAddress = sourceIpAddress;
    }

    public String getUserAgent() {
        return userAgent;
    }

    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }

    public RequestParameters getRequestParameters() {
        return requestParameters;
    }

    public void setRequestParameters(RequestParameters requestParameters) {
        this.requestParameters = requestParameters;
    }

    public UserIdentity getUserIdentity() {
        return userIdentity;
    }

    public void setUserIdentity(UserIdentity userIdentity) {
        this.userIdentity = userIdentity;
    }

    public String getAcsRegion() {
        return acsRegion;
    }

    public void setAcsRegion(String acsRegion) {
        this.acsRegion = acsRegion;
    }

}

class RequestParameters {

    private String SourceCidrIp;
    private String SecurityGroupId;
    private String IpProtocol;
    private String NicType;
    private String Policy;
    private String PortRange;

    public String getSourceCidrIp() {
        return SourceCidrIp;
    }

    public void setSourceCidrIp(String sourceCidrIp) {
        SourceCidrIp = sourceCidrIp;
    }

    public String getSecurityGroupId() {
        return SecurityGroupId;
    }

    public void setSecurityGroupId(String securityGroupId) {
        SecurityGroupId = securityGroupId;
    }

    public String getIpProtocol() {
        return IpProtocol;
    }

    public void setIpProtocol(String ipProtocol) {
        IpProtocol = ipProtocol;
    }

    public String getNicType() {
        return NicType;
    }

    public void setNicType(String nicType) {
        NicType = nicType;
    }

    public String getPolicy() {
        return Policy;
    }

    public void setPolicy(String policy) {
        Policy = policy;
    }

    public String getPortRange() {
        return PortRange;
    }

    public void setPortRange(String portRange) {
        PortRange = portRange;
    }

}

class UserIdentity {

    private String accessKeyId;
    private String accountId;
    private String principalId;
    private String type;
    private String userName;

    public String getAccessKeyId() {
        return accessKeyId;
    }

    public void setAccessKeyId(String accessKeyId) {
        this.accessKeyId = accessKeyId;
    }

    public String getAccountId() {
        return accountId;
    }

    public void setAccountId(String accountId) {
        this.accountId = accountId;
    }

    public String getPrincipalId() {
        return principalId;
    }

    public void setPrincipalId(String principalId) {
        this.principalId = principalId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

}

第三步:创建JSON处理业务

说明:

1.传参为一个String类型的JSON,传出参数为多个解析后的String类型

2.需要引用GSON包等,在帮助文档的下载客户端页面的odpscmd_public.zip的lib目录下就有对应jar包


package com.kangyu;

import java.util.ArrayList;
import java.util.List;

import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;

@Resolve({ "string->string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string" })
public class AnalysisLog extends UDTF {

    @Override
    public void process(Object[] arg0) throws UDFException {
        String log = (String) arg0[0];

        log = log.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"", "}").replace("\"\"", "\"");
        log = log.replace("\"[", "[").replace("]\"", "]");

        if (log.indexOf("\"errorCode\"") != -1) {
            return;
        }

        String acsRegion = null;
        String apiVersion = null;
        String eventId = null;
        String eventName = null;
        String eventSource = null;
        String eventTime = null;
        String eventType = null;
        String eventVersion = null;
        String requestId = null;

        // requestParameters;
        String sourceCidrIp = null;
        String securityGroupId = null;
        String ipProtocol = null;
        String nicType = null;
        String policy = null;
        String portRange = null;

        String serviceName = null;
        String sourceIpAddress = null;
        String userAgent = null;

        // userIdentity;
        String accessKeyId = null;
        String accountId = null;
        String principalId = null;
        String type = null;
        String userName = null;

        List<AnalysisObj> list = GsonUtil.fromJsonList(log, AnalysisObj.class);
        for (AnalysisObj obj : list) {
            acsRegion = obj.getAcsRegion();
            apiVersion = obj.getApiVersion();
            eventId = obj.getEventId();
            eventName = obj.getEventName();
            eventSource = obj.getEventSource();
            eventTime = obj.getEventTime();
            eventType = obj.getEventType();
            eventVersion = obj.getEventVersion();
            requestId = obj.getRequestId();
            serviceName = obj.getServiceName();
            sourceIpAddress = obj.getSourceIpAddress();
            userAgent = obj.getUserAgent();

            RequestParameters paramObj = obj.getRequestParameters();
            if (paramObj != null) {
                sourceCidrIp = paramObj.getSourceCidrIp();
                securityGroupId = paramObj.getSecurityGroupId();
                ipProtocol = paramObj.getIpProtocol();
                nicType = paramObj.getNicType();
                policy = paramObj.getPolicy();
                portRange = paramObj.getPortRange();
            }

            UserIdentity identityObj = obj.getUserIdentity();
            if (identityObj != null) {
                accessKeyId = identityObj.getAccessKeyId();
                accountId = identityObj.getAccountId();
                principalId = identityObj.getPrincipalId();
                type = identityObj.getType();
                userName = identityObj.getUserName();
            }

            forward(acsRegion, apiVersion, eventId, eventName, eventSource, eventTime, eventType, eventVersion,
                    requestId, serviceName, sourceIpAddress, userAgent, sourceCidrIp, securityGroupId, ipProtocol,
                    nicType, policy, portRange, accessKeyId, accountId, principalId, type, userName);
        }
    }

class GsonUtil {

    // 将Json数据解析成相应的映射对象
    public static <T> T parseJsonWithGson(String jsonData, Class<T> type) {
        Gson gson = new Gson();
        T result = gson.fromJson(jsonData, type);
        return result;
    }

    // 将Json数组解析成相应的映射对象列表
    public static <T> List<T> parseJsonArrayWithGson(String jsonData, Class<T> type) {
        Gson gson = new Gson();
        List<T> result = gson.fromJson(jsonData, new TypeToken<List<T>>() {
        }.getType());
        return result;
    }

    public static <T> ArrayList<T> fromJsonList(String json, Class<T> cls) {
        ArrayList<T> mList = new ArrayList<T>();
        Gson gson = new Gson();
        try {
            JsonArray array = new JsonParser().parse(json).getAsJsonArray();

            for (final JsonElement elem : array) {
                mList.add(gson.fromJson(elem, cls));
            }
        } catch (Exception e) {
            System.out.println("json=" + json);
            e.printStackTrace();
        }
        return mList;
    }
}

第四步:生成代码的UDTF的jar包,可以使用jar -cvf命令,也可以直接使用如eclipse工具导出

file--->export 选择 java下面的jar file

8afefdd53f0a2df9960645534c9a1124bc82d0a5


第五步:使用DataWorks上传上面导出的jar包

3e632ca57d8c81a83f92f3d072ac81f546030c0d


第六步:根据上传的jar包来创建自定义函数


create function analysis_log_udf as 'com.kangyu.AnalysisLog' using 'analysisLog.jar'

第七步:可以使用上面创建的函数进行查询


select analysis_log_udf(logcontent) 
as (acsRegion,
    apiVersion,
    eventId,
    eventName,
    eventSource,
    eventTime,
    eventType,
    eventVersion,
    requestId,
    SourceCidrIp,
    SecurityGroupId,
    IpProtocol,
    NicType,
    Policy,
    PortRange,
    serviceName,
    sourceIpAddress,
    userAgent,
    accessKeyId,
    accountId,
    principalId,
    type,
    userName)
from t_biz_log

第八步:可以在上面的SQL外面增加insert overwrite操作并在DataWorks中配置同步任务

注意:

如果您的两张表有分区请自行加上 partitioned by来进行分区并且查询的时候也增加where条件


insert overwrite table t_analysis_log
select analysis_log_udf(logcontent) 
as (acsRegion,
    apiVersion,
    eventId,
    eventName,
    eventSource,
    eventTime,
    eventType,
    eventVersion,
    requestId,
    SourceCidrIp,
    SecurityGroupId,
    IpProtocol,
    NicType,
    Policy,
    PortRange,
    serviceName,
    sourceIpAddress,
    userAgent,
    accessKeyId,
    accountId,
    principalId,
    type,
    userName)
from t_biz_log





相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
存储 Java 文件存储
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录—— logback.xml 配置文件解析
本文解析了 `logback.xml` 配置文件的详细内容,包括日志输出格式、存储路径、控制台输出及日志级别等关键配置。通过定义 `LOG_PATTERN` 和 `FILE_PATH`,设置日志格式与存储路径;利用 `&lt;appender&gt;` 节点配置控制台和文件输出,支持日志滚动策略(如文件大小限制和保存时长);最后通过 `&lt;logger&gt;` 和 `&lt;root&gt;` 定义日志级别与输出方式。此配置适用于精细化管理日志输出,满足不同场景需求。
3125 1
|
开发框架 .NET 中间件
.net8 使用 license 证书授权案例解析
本文介绍了如何使用 `.NET CLI` 创建并改造一个 `ASP.NET Core Web API` 项目,以实现基于许可证的授权机制。具体步骤包括创建项目、添加必要的 NuGet 包(如 `Standard.Licensing` 和 `Swashbuckle.AspNetCore`),以及修改 `Program.cs` 文件以集成自定义的许可证验证中间件。项目结构中新增了 `LicenseController` 接口用于处理授权相关操作,并通过测试流程验证了默认天气接口在未授权和授权状态下的响应情况。整个过程确保了应用程序能够在启动时正确验证许可证,保障系统的安全性与可控性。
718 10
.net8 使用 license 证书授权案例解析
|
数据采集 JSON 数据可视化
JSON数据解析实战:从嵌套结构到结构化表格
在信息爆炸的时代,从杂乱数据中提取精准知识图谱是数据侦探的挑战。本文以Google Scholar为例,解析嵌套JSON数据,提取文献信息并转换为结构化表格,通过Graphviz制作技术关系图谱,揭示文献间的隐秘联系。代码涵盖代理IP、请求头设置、JSON解析及可视化,提供完整实战案例。
826 4
JSON数据解析实战:从嵌套结构到结构化表格
|
监控 Java 应用服务中间件
Tomcat log日志解析
理解和解析Tomcat日志文件对于诊断和解决Web应用中的问题至关重要。通过分析 `catalina.out`、`localhost.log`、`localhost_access_log.*.txt`、`manager.log`和 `host-manager.log`等日志文件,可以快速定位和解决问题,确保Tomcat服务器的稳定运行。掌握这些日志解析技巧,可以显著提高运维和开发效率。
1639 13
|
监控 Shell Linux
Android调试终极指南:ADB安装+多设备连接+ANR日志抓取全流程解析,覆盖环境变量配置/多设备调试/ANR日志分析全流程,附Win/Mac/Linux三平台解决方案
ADB(Android Debug Bridge)是安卓开发中的重要工具,用于连接电脑与安卓设备,实现文件传输、应用管理、日志抓取等功能。本文介绍了 ADB 的基本概念、安装配置及常用命令。包括:1) 基本命令如 `adb version` 和 `adb devices`;2) 权限操作如 `adb root` 和 `adb shell`;3) APK 操作如安装、卸载应用;4) 文件传输如 `adb push` 和 `adb pull`;5) 日志记录如 `adb logcat`;6) 系统信息获取如屏幕截图和录屏。通过这些功能,用户可高效调试和管理安卓设备。
|
JSON 前端开发 搜索推荐
关于商品详情 API 接口 JSON 格式返回数据解析的示例
本文介绍商品详情API接口返回的JSON数据解析。最外层为`product`对象,包含商品基本信息(如id、name、price)、分类信息(category)、图片(images)、属性(attributes)、用户评价(reviews)、库存(stock)和卖家信息(seller)。每个字段详细描述了商品的不同方面,帮助开发者准确提取和展示数据。具体结构和字段含义需结合实际业务需求和API文档理解。
|
机器学习/深度学习 人工智能 搜索推荐
技术革新下的培训新趋势:案例解析
从最初的“试试看”,到如今的“非做不可”,企业培训已经成为央国企和上市公司不可或缺的战略环节。无论是AI与大模型的赋能,DeepSeek,还是具身智能、智算技术和数据科学的实战应用,这些课程都在为企业打开新的可能性。
|
JSON 小程序 UED
微信小程序 app.json 配置文件解析与应用
本文介绍了微信小程序中 `app.json` 配置文件的详细
2303 12
|
JSON 缓存 API
解析电商商品详情API接口系列,json数据示例参考
电商商品详情API接口是电商平台的重要组成部分,提供了商品的详细信息,支持用户进行商品浏览和购买决策。通过合理的API设计和优化,可以提升系统性能和用户体验。希望本文的解析和示例能够为开发者提供参考,帮助构建高效、可靠的电商系统。
678 12
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
884 140
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 推荐镜像

    更多
  • DNS