因为MaxCompute提供的系统函数有限,所以平台提供了强大的自定义函数(UDF)来进行复杂的数据处理,因为MaxCompute的沙箱机制,所以解析JSON日志串的时候需要使用GSON来进行解析,本例中原始数据可能是从其他DB通过数据集成同步到MaxCompute平台上的,所以MaxCompute可能有个待处理的原始表如下(按照天来同步日志会有个年月日的分区,根据实际业务加上即可):
create table t_biz_log(
BIGINT id,
STRING logcontent
)
上表logcontent待解析JSON日志串案例如下(字符串中的VALUE文本可能出现特殊字符如反斜线\等)
[{"acsRegion":"cn-huhehaote","apiVersion":"2016-04-28","errorCode":"Forbindden","errorMessage":"The specified Instance already bind eip",
"eventId":"01168520-E248-4949-84AC-48EF6FA59292","eventName":"CreateForwardEntry","eventSource":"aliyuncs.com","eventTime":"2018-04-11T07:32:47Z",
"eventType":"ApiCall","eventVersion":"1","isGlobal":false,"requestId":"01168520-E248-48EF6FA59292",
"requestParameters":{"_response_json_parse":"\"true\"","IpProtocol":"\"tcp\"","ExternalIp":"\"39.10.2.1\"","ForwardTableId":"\"ftb-hp3bbrmtlho\"",
"SecureTransport":"\"true\"","needarrayitemname":"\"true\"","RequestId":"\"01168-E248-4949-84AC-48EF6FA59292\"","ExternalPort":"\"12\"",
"RegionId":"\"cn-huhehaote\"","InternalPort":"\"112\"","HostId":"\"huhehaote.aliyuncs.com\"","InternalIp":"\"192.168.1.167\""},"serviceName":"Vpc",
"sourceIpAddress":"106.11.34.11","userIdentity":{"accountId":"44404","principalId":"44404",
"sessionContext":{"attributes":{"creationDate":"2018-04-11T07:32:47Z","mfaAuthenticated":"false"}},"type":"root-account","userName":"root"}}]
案例过程如下:
第一步:分析上面的JSON找出想要的关键信息并创建MaxCompute表:
create table t_analysis_log(
String acsRegion,
String apiVersion,
String eventId,
String eventName,
String eventSource,
String eventTime,
String eventType,
String eventVersion,
String requestId,
String SourceCidrIp,
String SecurityGroupId,
String IpProtocol,
String NicType,
String Policy,
String PortRange,
String serviceName,
String sourceIpAddress,
String userAgent,
String accessKeyId,
String accountId,
String principalId,
String type,
String userName
)
第二步:创建实体类(因为上面的json有嵌套结构,所以需要类结构也是关联模式)
package com.kangyu;
public class AnalysisObj {
private String acsRegion;
private String apiVersion;
private String eventId;
private String eventName;
private String eventSource;
private String eventTime;
private String eventType;
private String eventVersion;
private String requestId;
private RequestParameters requestParameters;
private String serviceName;
private String sourceIpAddress;
private String userAgent;
private UserIdentity userIdentity;
public String getApiVersion() {
return apiVersion;
}
public void setApiVersion(String apiVersion) {
this.apiVersion = apiVersion;
}
public String getEventId() {
return eventId;
}
public void setEventId(String eventId) {
this.eventId = eventId;
}
public String getEventName() {
return eventName;
}
public void setEventName(String eventName) {
this.eventName = eventName;
}
public String getEventSource() {
return eventSource;
}
public void setEventSource(String eventSource) {
this.eventSource = eventSource;
}
public String getEventTime() {
return eventTime;
}
public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}
public String getEventType() {
return eventType;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public String getEventVersion() {
return eventVersion;
}
public void setEventVersion(String eventVersion) {
this.eventVersion = eventVersion;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public String getSourceIpAddress() {
return sourceIpAddress;
}
public void setSourceIpAddress(String sourceIpAddress) {
this.sourceIpAddress = sourceIpAddress;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
this.userAgent = userAgent;
}
public RequestParameters getRequestParameters() {
return requestParameters;
}
public void setRequestParameters(RequestParameters requestParameters) {
this.requestParameters = requestParameters;
}
public UserIdentity getUserIdentity() {
return userIdentity;
}
public void setUserIdentity(UserIdentity userIdentity) {
this.userIdentity = userIdentity;
}
public String getAcsRegion() {
return acsRegion;
}
public void setAcsRegion(String acsRegion) {
this.acsRegion = acsRegion;
}
}
class RequestParameters {
private String SourceCidrIp;
private String SecurityGroupId;
private String IpProtocol;
private String NicType;
private String Policy;
private String PortRange;
public String getSourceCidrIp() {
return SourceCidrIp;
}
public void setSourceCidrIp(String sourceCidrIp) {
SourceCidrIp = sourceCidrIp;
}
public String getSecurityGroupId() {
return SecurityGroupId;
}
public void setSecurityGroupId(String securityGroupId) {
SecurityGroupId = securityGroupId;
}
public String getIpProtocol() {
return IpProtocol;
}
public void setIpProtocol(String ipProtocol) {
IpProtocol = ipProtocol;
}
public String getNicType() {
return NicType;
}
public void setNicType(String nicType) {
NicType = nicType;
}
public String getPolicy() {
return Policy;
}
public void setPolicy(String policy) {
Policy = policy;
}
public String getPortRange() {
return PortRange;
}
public void setPortRange(String portRange) {
PortRange = portRange;
}
}
class UserIdentity {
private String accessKeyId;
private String accountId;
private String principalId;
private String type;
private String userName;
public String getAccessKeyId() {
return accessKeyId;
}
public void setAccessKeyId(String accessKeyId) {
this.accessKeyId = accessKeyId;
}
public String getAccountId() {
return accountId;
}
public void setAccountId(String accountId) {
this.accountId = accountId;
}
public String getPrincipalId() {
return principalId;
}
public void setPrincipalId(String principalId) {
this.principalId = principalId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
第三步:创建JSON处理业务
说明:
1.传参为一个String类型的JSON,传出参数为多个解析后的String类型
2.需要引用GSON包等,在帮助文档的下载客户端页面的odpscmd_public.zip的lib目录下就有对应jar包
package com.kangyu;
import java.util.ArrayList;
import java.util.List;
import com.aliyun.odps.udf.UDFException;
import com.aliyun.odps.udf.UDTF;
import com.aliyun.odps.udf.annotation.Resolve;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
@Resolve({ "string->string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string" })
public class AnalysisLog extends UDTF {
@Override
public void process(Object[] arg0) throws UDFException {
String log = (String) arg0[0];
log = log.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"", "}").replace("\"\"", "\"");
log = log.replace("\"[", "[").replace("]\"", "]");
if (log.indexOf("\"errorCode\"") != -1) {
return;
}
String acsRegion = null;
String apiVersion = null;
String eventId = null;
String eventName = null;
String eventSource = null;
String eventTime = null;
String eventType = null;
String eventVersion = null;
String requestId = null;
// requestParameters;
String sourceCidrIp = null;
String securityGroupId = null;
String ipProtocol = null;
String nicType = null;
String policy = null;
String portRange = null;
String serviceName = null;
String sourceIpAddress = null;
String userAgent = null;
// userIdentity;
String accessKeyId = null;
String accountId = null;
String principalId = null;
String type = null;
String userName = null;
List<AnalysisObj> list = GsonUtil.fromJsonList(log, AnalysisObj.class);
for (AnalysisObj obj : list) {
acsRegion = obj.getAcsRegion();
apiVersion = obj.getApiVersion();
eventId = obj.getEventId();
eventName = obj.getEventName();
eventSource = obj.getEventSource();
eventTime = obj.getEventTime();
eventType = obj.getEventType();
eventVersion = obj.getEventVersion();
requestId = obj.getRequestId();
serviceName = obj.getServiceName();
sourceIpAddress = obj.getSourceIpAddress();
userAgent = obj.getUserAgent();
RequestParameters paramObj = obj.getRequestParameters();
if (paramObj != null) {
sourceCidrIp = paramObj.getSourceCidrIp();
securityGroupId = paramObj.getSecurityGroupId();
ipProtocol = paramObj.getIpProtocol();
nicType = paramObj.getNicType();
policy = paramObj.getPolicy();
portRange = paramObj.getPortRange();
}
UserIdentity identityObj = obj.getUserIdentity();
if (identityObj != null) {
accessKeyId = identityObj.getAccessKeyId();
accountId = identityObj.getAccountId();
principalId = identityObj.getPrincipalId();
type = identityObj.getType();
userName = identityObj.getUserName();
}
forward(acsRegion, apiVersion, eventId, eventName, eventSource, eventTime, eventType, eventVersion,
requestId, serviceName, sourceIpAddress, userAgent, sourceCidrIp, securityGroupId, ipProtocol,
nicType, policy, portRange, accessKeyId, accountId, principalId, type, userName);
}
}
class GsonUtil {
// 将Json数据解析成相应的映射对象
public static <T> T parseJsonWithGson(String jsonData, Class<T> type) {
Gson gson = new Gson();
T result = gson.fromJson(jsonData, type);
return result;
}
// 将Json数组解析成相应的映射对象列表
public static <T> List<T> parseJsonArrayWithGson(String jsonData, Class<T> type) {
Gson gson = new Gson();
List<T> result = gson.fromJson(jsonData, new TypeToken<List<T>>() {
}.getType());
return result;
}
public static <T> ArrayList<T> fromJsonList(String json, Class<T> cls) {
ArrayList<T> mList = new ArrayList<T>();
Gson gson = new Gson();
try {
JsonArray array = new JsonParser().parse(json).getAsJsonArray();
for (final JsonElement elem : array) {
mList.add(gson.fromJson(elem, cls));
}
} catch (Exception e) {
System.out.println("json=" + json);
e.printStackTrace();
}
return mList;
}
}
第四步:生成代码的UDTF的jar包,可以使用jar -cvf命令,也可以直接使用如eclipse工具导出
file--->export 选择 java下面的jar file
第五步:使用DataWorks上传上面导出的jar包
第六步:根据上传的jar包来创建自定义函数
create function analysis_log_udf as 'com.kangyu.AnalysisLog' using 'analysisLog.jar'
第七步:可以使用上面创建的函数进行查询
select analysis_log_udf(logcontent)
as (acsRegion,
apiVersion,
eventId,
eventName,
eventSource,
eventTime,
eventType,
eventVersion,
requestId,
SourceCidrIp,
SecurityGroupId,
IpProtocol,
NicType,
Policy,
PortRange,
serviceName,
sourceIpAddress,
userAgent,
accessKeyId,
accountId,
principalId,
type,
userName)
from t_biz_log
第八步:可以在上面的SQL外面增加insert overwrite操作并在DataWorks中配置同步任务
注意:
如果您的两张表有分区请自行加上 partitioned by来进行分区并且查询的时候也增加where条件
insert overwrite table t_analysis_log
select analysis_log_udf(logcontent)
as (acsRegion,
apiVersion,
eventId,
eventName,
eventSource,
eventTime,
eventType,
eventVersion,
requestId,
SourceCidrIp,
SecurityGroupId,
IpProtocol,
NicType,
Policy,
PortRange,
serviceName,
sourceIpAddress,
userAgent,
accessKeyId,
accountId,
principalId,
type,
userName)
from t_biz_log