阿里云大数据MaxCompute基于UDTF解析JSON日志的案例

简介: 因为MaxCompute提供的系统函数有限,所以平台提供了强大的自定义函数(UDF)来进行复杂的数据处理,因为MaxCompute的沙箱机制,所以解析JSON日志串的时候需要使用GSON来进行解析,本例中原始数据可能是从其他DB通过数据集成同步到MaxCompute平台上的,所以MaxComput.

因为MaxCompute提供的系统函数有限,所以平台提供了强大的自定义函数(UDF)来进行复杂的数据处理,因为MaxCompute的沙箱机制,所以解析JSON日志串的时候需要使用GSON来进行解析,本例中原始数据可能是从其他DB通过数据集成同步到MaxCompute平台上的,所以MaxCompute可能有个待处理的原始表如下(按照天来同步日志会有个年月日的分区,根据实际业务加上即可):


create table t_biz_log(
    BIGINT id,
    STRING logcontent
)

上表logcontent待解析JSON日志串案例如下(字符串中的VALUE文本可能出现特殊字符如反斜线\等)


[{"acsRegion":"cn-huhehaote","apiVersion":"2016-04-28","errorCode":"Forbindden","errorMessage":"The specified Instance already bind eip",
"eventId":"01168520-E248-4949-84AC-48EF6FA59292","eventName":"CreateForwardEntry","eventSource":"aliyuncs.com","eventTime":"2018-04-11T07:32:47Z",
"eventType":"ApiCall","eventVersion":"1","isGlobal":false,"requestId":"01168520-E248-48EF6FA59292",
"requestParameters":{"_response_json_parse":"\"true\"","IpProtocol":"\"tcp\"","ExternalIp":"\"39.10.2.1\"","ForwardTableId":"\"ftb-hp3bbrmtlho\"",
"SecureTransport":"\"true\"","needarrayitemname":"\"true\"","RequestId":"\"01168-E248-4949-84AC-48EF6FA59292\"","ExternalPort":"\"12\"",
"RegionId":"\"cn-huhehaote\"","InternalPort":"\"112\"","HostId":"\"huhehaote.aliyuncs.com\"","InternalIp":"\"192.168.1.167\""},"serviceName":"Vpc",
"sourceIpAddress":"106.11.34.11","userIdentity":{"accountId":"44404","principalId":"44404",
"sessionContext":{"attributes":{"creationDate":"2018-04-11T07:32:47Z","mfaAuthenticated":"false"}},"type":"root-account","userName":"root"}}]

案例过程如下:

第一步:分析上面的JSON找出想要的关键信息并创建MaxCompute表:


create table t_analysis_log(
    String acsRegion,
    String apiVersion,
    String eventId,
    String eventName,
    String eventSource,
    String eventTime,
    String eventType,
    String eventVersion,
    String requestId,
    String SourceCidrIp,
    String SecurityGroupId,
    String IpProtocol,
    String NicType,
    String Policy,
    String PortRange,
    String serviceName,
    String sourceIpAddress,
    String userAgent,
    String accessKeyId,
    String accountId,
    String principalId,
    String type,
    String userName
)

第二步:创建实体类(因为上面的json有嵌套结构,所以需要类结构也是关联模式)


package com.kangyu;

public class AnalysisObj {

    private String            acsRegion;
    private String            apiVersion;
    private String            eventId;
    private String            eventName;
    private String            eventSource;
    private String            eventTime;
    private String            eventType;
    private String            eventVersion;
    private String            requestId;
    private RequestParameters requestParameters;
    private String            serviceName;
    private String            sourceIpAddress;
    private String            userAgent;
    private UserIdentity      userIdentity;

    public String getApiVersion() {
        return apiVersion;
    }

    public void setApiVersion(String apiVersion) {
        this.apiVersion = apiVersion;
    }

    public String getEventId() {
        return eventId;
    }

    public void setEventId(String eventId) {
        this.eventId = eventId;
    }

    public String getEventName() {
        return eventName;
    }

    public void setEventName(String eventName) {
        this.eventName = eventName;
    }

    public String getEventSource() {
        return eventSource;
    }

    public void setEventSource(String eventSource) {
        this.eventSource = eventSource;
    }

    public String getEventTime() {
        return eventTime;
    }

    public void setEventTime(String eventTime) {
        this.eventTime = eventTime;
    }

    public String getEventType() {
        return eventType;
    }

    public void setEventType(String eventType) {
        this.eventType = eventType;
    }

    public String getEventVersion() {
        return eventVersion;
    }

    public void setEventVersion(String eventVersion) {
        this.eventVersion = eventVersion;
    }

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public String getServiceName() {
        return serviceName;
    }

    public void setServiceName(String serviceName) {
        this.serviceName = serviceName;
    }

    public String getSourceIpAddress() {
        return sourceIpAddress;
    }

    public void setSourceIpAddress(String sourceIpAddress) {
        this.sourceIpAddress = sourceIpAddress;
    }

    public String getUserAgent() {
        return userAgent;
    }

    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }

    public RequestParameters getRequestParameters() {
        return requestParameters;
    }

    public void setRequestParameters(RequestParameters requestParameters) {
        this.requestParameters = requestParameters;
    }

    public UserIdentity getUserIdentity() {
        return userIdentity;
    }

    public void setUserIdentity(UserIdentity userIdentity) {
        this.userIdentity = userIdentity;
    }

    public String getAcsRegion() {
        return acsRegion;
    }

    public void setAcsRegion(String acsRegion) {
        this.acsRegion = acsRegion;
    }

}

class RequestParameters {

    private String SourceCidrIp;
    private String SecurityGroupId;
    private String IpProtocol;
    private String NicType;
    private String Policy;
    private String PortRange;

    public String getSourceCidrIp() {
        return SourceCidrIp;
    }

    public void setSourceCidrIp(String sourceCidrIp) {
        SourceCidrIp = sourceCidrIp;
    }

    public String getSecurityGroupId() {
        return SecurityGroupId;
    }

    public void setSecurityGroupId(String securityGroupId) {
        SecurityGroupId = securityGroupId;
    }

    public String getIpProtocol() {
        return IpProtocol;
    }

    public void setIpProtocol(String ipProtocol) {
        IpProtocol = ipProtocol;
    }

    public String getNicType() {
        return NicType;
    }

    public void setNicType(String nicType) {
        NicType = nicType;
    }

    public String getPolicy() {
        return Policy;
    }

    public void setPolicy(String policy) {
        Policy = policy;
    }

    public String getPortRange() {
        return PortRange;
    }

    public void setPortRange(String portRange) {
        PortRange = portRange;
    }

}

class UserIdentity {

    private String accessKeyId;
    private String accountId;
    private String principalId;
    private String type;
    private String userName;

    public String getAccessKeyId() {
        return accessKeyId;
    }

    public void setAccessKeyId(String accessKeyId) {
        this.accessKeyId = accessKeyId;
    }

    public String getAccountId() {
        return accountId;
    }

    public void setAccountId(String accountId) {
        this.accountId = accountId;
    }

    public String getPrincipalId() {
        return principalId;
    }

    public void setPrincipalId(String principalId) {
        this.principalId = principalId;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

}

第三步:创建JSON处理业务

说明:

1.传参为一个String类型的JSON,传出参数为多个解析后的String类型

2.需要引用GSON包等,在帮助文档的下载客户端页面的odpscmd_public.zip的lib目录下就有对应jar包


package com.kangyu;

import java.util.ArrayList;
import java.util.List;

import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;

@Resolve({ "string->string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string" })
public class AnalysisLog extends UDTF {

    @Override
    public void process(Object[] arg0) throws UDFException {
        String log = (String) arg0[0];

        log = log.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"", "}").replace("\"\"", "\"");
        log = log.replace("\"[", "[").replace("]\"", "]");

        if (log.indexOf("\"errorCode\"") != -1) {
            return;
        }

        String acsRegion = null;
        String apiVersion = null;
        String eventId = null;
        String eventName = null;
        String eventSource = null;
        String eventTime = null;
        String eventType = null;
        String eventVersion = null;
        String requestId = null;

        // requestParameters;
        String sourceCidrIp = null;
        String securityGroupId = null;
        String ipProtocol = null;
        String nicType = null;
        String policy = null;
        String portRange = null;

        String serviceName = null;
        String sourceIpAddress = null;
        String userAgent = null;

        // userIdentity;
        String accessKeyId = null;
        String accountId = null;
        String principalId = null;
        String type = null;
        String userName = null;

        List<AnalysisObj> list = GsonUtil.fromJsonList(log, AnalysisObj.class);
        for (AnalysisObj obj : list) {
            acsRegion = obj.getAcsRegion();
            apiVersion = obj.getApiVersion();
            eventId = obj.getEventId();
            eventName = obj.getEventName();
            eventSource = obj.getEventSource();
            eventTime = obj.getEventTime();
            eventType = obj.getEventType();
            eventVersion = obj.getEventVersion();
            requestId = obj.getRequestId();
            serviceName = obj.getServiceName();
            sourceIpAddress = obj.getSourceIpAddress();
            userAgent = obj.getUserAgent();

            RequestParameters paramObj = obj.getRequestParameters();
            if (paramObj != null) {
                sourceCidrIp = paramObj.getSourceCidrIp();
                securityGroupId = paramObj.getSecurityGroupId();
                ipProtocol = paramObj.getIpProtocol();
                nicType = paramObj.getNicType();
                policy = paramObj.getPolicy();
                portRange = paramObj.getPortRange();
            }

            UserIdentity identityObj = obj.getUserIdentity();
            if (identityObj != null) {
                accessKeyId = identityObj.getAccessKeyId();
                accountId = identityObj.getAccountId();
                principalId = identityObj.getPrincipalId();
                type = identityObj.getType();
                userName = identityObj.getUserName();
            }

            forward(acsRegion, apiVersion, eventId, eventName, eventSource, eventTime, eventType, eventVersion,
                    requestId, serviceName, sourceIpAddress, userAgent, sourceCidrIp, securityGroupId, ipProtocol,
                    nicType, policy, portRange, accessKeyId, accountId, principalId, type, userName);
        }
    }

class GsonUtil {

    // 将Json数据解析成相应的映射对象
    public static <T> T parseJsonWithGson(String jsonData, Class<T> type) {
        Gson gson = new Gson();
        T result = gson.fromJson(jsonData, type);
        return result;
    }

    // 将Json数组解析成相应的映射对象列表
    public static <T> List<T> parseJsonArrayWithGson(String jsonData, Class<T> type) {
        Gson gson = new Gson();
        List<T> result = gson.fromJson(jsonData, new TypeToken<List<T>>() {
        }.getType());
        return result;
    }

    public static <T> ArrayList<T> fromJsonList(String json, Class<T> cls) {
        ArrayList<T> mList = new ArrayList<T>();
        Gson gson = new Gson();
        try {
            JsonArray array = new JsonParser().parse(json).getAsJsonArray();

            for (final JsonElement elem : array) {
                mList.add(gson.fromJson(elem, cls));
            }
        } catch (Exception e) {
            System.out.println("json=" + json);
            e.printStackTrace();
        }
        return mList;
    }
}

第四步:生成代码的UDTF的jar包,可以使用jar -cvf命令,也可以直接使用如eclipse工具导出

file--->export 选择 java下面的jar file

8afefdd53f0a2df9960645534c9a1124bc82d0a5


第五步:使用DataWorks上传上面导出的jar包

3e632ca57d8c81a83f92f3d072ac81f546030c0d


第六步:根据上传的jar包来创建自定义函数


create function analysis_log_udf as 'com.kangyu.AnalysisLog' using 'analysisLog.jar'

第七步:可以使用上面创建的函数进行查询


select analysis_log_udf(logcontent) 
as (acsRegion,
    apiVersion,
    eventId,
    eventName,
    eventSource,
    eventTime,
    eventType,
    eventVersion,
    requestId,
    SourceCidrIp,
    SecurityGroupId,
    IpProtocol,
    NicType,
    Policy,
    PortRange,
    serviceName,
    sourceIpAddress,
    userAgent,
    accessKeyId,
    accountId,
    principalId,
    type,
    userName)
from t_biz_log

第八步:可以在上面的SQL外面增加insert overwrite操作并在DataWorks中配置同步任务

注意:

如果您的两张表有分区请自行加上 partitioned by来进行分区并且查询的时候也增加where条件


insert overwrite table t_analysis_log
select analysis_log_udf(logcontent) 
as (acsRegion,
    apiVersion,
    eventId,
    eventName,
    eventSource,
    eventTime,
    eventType,
    eventVersion,
    requestId,
    SourceCidrIp,
    SecurityGroupId,
    IpProtocol,
    NicType,
    Policy,
    PortRange,
    serviceName,
    sourceIpAddress,
    userAgent,
    accessKeyId,
    accountId,
    principalId,
    type,
    userName)
from t_biz_log





相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
9月前
|
SQL 人工智能 分布式计算
ODPS十五周年实录|构建 AI 时代的大数据基础设施
本文根据 ODPS 十五周年·年度升级发布实录整理而成,演讲信息如下: 张治国:阿里云智能集团技术研究员、阿里云智能计算平台事业部 ODPS-MaxCompute 负责人 活动:【数据进化·AI 启航】ODPS 年度升级发布
421 9
|
11月前
|
存储 分布式计算 大数据
【赵渝强老师】阿里云大数据存储计算服务:MaxCompute
阿里云MaxCompute是快速、全托管的TB/PB级数据仓库解决方案,提供海量数据存储与计算服务。支持多种计算模型,适用于大规模离线数据分析,具备高安全性、低成本、易用性强等特点,助力企业高效处理大数据。
510 0
|
9月前
|
SQL 存储 分布式计算
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
本文旨在帮助非专业数据研发但是有高频ODPS使用需求的同学们(如数分、算法、产品等)能够快速上手ODPS查询优化,实现高性能查数看数,避免日常工作中因SQL任务卡壳、失败等情况造成的工作产出delay甚至集群资源稳定性问题。
1639 36
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
|
9月前
|
存储 分布式计算 资源调度
【赵渝强老师】阿里云大数据MaxCompute的体系架构
阿里云MaxCompute是快速、全托管的EB级数据仓库解决方案,适用于离线计算场景。它由计算与存储层、逻辑层、接入层和客户端四部分组成,支持多种计算任务的统一调度与管理。
771 1
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
4977 32
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
603 9
|
监控 容灾 算法
阿里云 SLS 多云日志接入最佳实践:链路、成本与高可用性优化
本文探讨了如何高效、经济且可靠地将海外应用与基础设施日志统一采集至阿里云日志服务(SLS),解决全球化业务扩展中的关键挑战。重点介绍了高性能日志采集Agent(iLogtail/LoongCollector)在海外场景的应用,推荐使用LoongCollector以获得更优的稳定性和网络容错能力。同时分析了多种网络接入方案,包括公网直连、全球加速优化、阿里云内网及专线/CEN/VPN接入等,并提供了成本优化策略和多目标发送配置指导,帮助企业构建稳定、低成本、高可用的全球日志系统。
1208 55
|
存储 缓存 关系型数据库
图解MySQL【日志】——Redo Log
Redo Log(重做日志)是数据库中用于记录数据页修改的物理日志,确保事务的持久性和一致性。其主要作用包括崩溃恢复、提高性能和保证事务一致性。Redo Log 通过先写日志的方式,在内存中缓存修改操作,并在适当时候刷入磁盘,减少随机写入带来的性能损耗。WAL(Write-Ahead Logging)技术的核心思想是先将修改操作记录到日志文件中,再择机写入磁盘,从而实现高效且安全的数据持久化。Redo Log 的持久化过程涉及 Redo Log Buffer 和不同刷盘时机的控制参数(如 `innodb_flush_log_at_trx_commit`),以平衡性能与数据安全性。
900 5
图解MySQL【日志】——Redo Log

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute