一、需求场景
很多数据开发者在使用MaxCompute开发过程中需要统计每个账号所属任务的费用使用情况以及每个任务耗时来做任务的合理性规划和调整。但是在使用MaxCompute的时候通常情况下大多数用户通过DataWorks标准模式下使用MaxCompute,这样在MaxCompute提供的元数据视图信息中将记录所有的生产作业执行账号为同一个主账号,只有小部分的开发作业执行账号为个人RAM子账号。那么如何去做到各个账户的费用分摊和任务时间成本的统计是大部分MaxCompute使用者所关注的问题。本文主要介绍如何通过MaxCompute元数据统计账号费用及任务耗时,同时定时通过钉钉推送到客户群。
二、需求实现
1、目前任务的费用可以通过账单详情中的用量明细来查询,但是没有归属到对应的子账号。我们需要通过元数据Information_Schema视图中的历史使用信息TASKS_HISTORY来统计。
2、任务耗时需要通过元数据来统计。
三、MaxCompute账号费用及任务耗时TOPN统计
1、元数据介绍
MaxCompute的Information Schema提供了项目元数据及使用历史数据等信息
注意:
(1)目前Information Schema提供的是当前项目的元数据视图,不支持跨项目的元数据访问。如果需要对多个项目的元数据进行统一查询、分析,需要分别获取各个项目中的元数据并整合在一起进行跨项目元数据分析。
(2)元数据及作业历史数据保存在Information_Schema空间下,如需对历史数据进行快照备份或获得超过14天的作业历史,您可以定期将Information Schema的数据保存备份到用户指定项目空间。
2、如何根据元数据去实现账号费用TOPN统计
(1)元数据下载
元数据Information_Schema视图中的历史使用信息TASKS_HISTORY记录MaxCompute项目内已完成的作业历史,保留近14天数据。需要通过元数据来做任务费用统计,因此需要定期将Information Schema的数据保存备份到用户指定项目空间。
开始使用前,需要以Project Owner身份安装Information Schema的权限包,获得访问本项目元数据的权限。安装方式有如下两种:
a、在MaxCompute客户端(odpscmd)中执行如下命令。
odps@myproject1>install package information_schema.systables;
b、在DataWorks中的数据开发 > 临时查询中执行如下语句。
install package information_schema.systables;
Information Schema的视图包含了项目级别的所有用户数据,默认Project Owner可以访问查看。如果项目内其他用户或角色访问查看,需要进行授权。
语法如下。
grant actions on package <pkgName> to user <username>;
grant actions on package <pkgName> to role <role_name>;
(2)元数据下载备份
--创建元数据数据备份表information_history
use project1;
CREATE TABLE IF NOT EXISTS project1.information_history
(
task_catalog STRING
,task_schema STRING
,task_name STRING
,task_type STRING
,inst_id STRING
,`status` STRING
,owner_id STRING
,owner_name STRING
,result STRING
,start_time DATETIME
,end_time DATETIME
,input_records BIGINT
,output_records BIGINT
,input_bytes BIGINT
,output_bytes BIGINT
,input_tables STRING
,output_tables STRING
,operation_text STRING
,signature STRING
,complexity DOUBLE
,cost_cpu DOUBLE
,cost_mem DOUBLE
,settings STRING
,ds STRING
)
STORED AS ALIORC
;
--定时将数据写入备份表information_history
use project1;
insert into table project1.information_history
select * from information_schema.tasks_history where ds ='${datetime1}';
注意:${datetime1}为DataWorks调度参数,参数配置如下:datetime1=${yyyymmdd}
如果要统计多个项目空间的元数据需要分别去各个项目空间安装元数据package。之后在其他工作空间执行相同的操作。把各个工作空间的元数据备份数据插入到同一个表中做集中统计分析。
本文将所有project元数据维护在project1.information_history表中。
use project2;
insert into table project1.information_history
select * from information_schema.tasks_history where ds ='${datetime1}';
备注:${datetime1}为DataWorks调度参数,参数配置如下:datetime1=${yyyymmdd}之后遇到的所有参数都是如此后续不在重复描述。
(3)通过元数据计算账号所属任务费用
通过DataWorks标准模式下使用MaxCompute,这样在MaxCompute提供的元数据视图信息中将记录所有的生产作业执行账号为同一个主账号,只有小部分的开发作业执行账号为个人RAM子账号。元数据视图TASKS_HISTORY中的字段settings记录上层调度或用户传入的信息,以JSON格式存储。包含字段:useragent、bizid、skynet_id和skynet_nodename。通过该字段可以具体到创建任务的子账号信息。
a、维护一张子账号明细表user_ram,记录需要统计的账号及账号ID
CREATE TABLE IF NOT EXISTS project1.user_ram
(
user_id STRING
,user_name STRING
)
STORED AS ALIORC
;
b、账号所属任务消费(按量付费)TOPN统计
CREATE TABLE IF NOT EXISTS project1.cost_topn
(
cost_sum DECIMAL(38,5)
,task_owner STRING
)
PARTITIONED BY
(
ds STRING
)
STORED AS ALIORC
;
---元数据TOPN按量付费消费统计
set odps.sql.decimal.odps2=true;
insert into table project1.cost_topn PARTITION (ds = '${datetime1}')
SELECT
nvl(cost_sum,0) cost_sum
,CASE WHEN a.task_owner='13************' OR a.task_owner='23************' OR a.task_owner='21************' THEN b.user_name
ELSE a.task_owner
END task_owner
---注释部分为账号ID
FROM (
SELECT inst_id
,owner_name
,task_type
,a.input_bytes
,a.cost_cpu
,a.STATUS
,CASE WHEN a.task_type = 'SQL' THEN CAST(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 AS DECIMAL(18,5) )
WHEN a.task_type = 'SQLRT' THEN CAST(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 AS DECIMAL(18,5) )
WHEN a.task_type = 'CUPID' AND a.STATUS='Terminated'THEN CAST(a.cost_cpu/100/3600 * 0.66 AS DECIMAL(18,5) )
ELSE 0
END cost_sum
,a.settings
,GET_JSON_OBJECT(settings, "$.SKYNET_ONDUTY") OWNER
,CASE WHEN GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") IS NULL THEN owner_name
ELSE GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY")
END task_owner
FROM project1.information_history a
WHERE ds = '${datetime1}'
) a
LEFT JOIN project1.user_ram b
ON a.task_owner = b.user_id
;
备注:
task_type = 'SQL' 为SQL任务、task_type = 'SQLRT' 为查询加速(MCQA)任务,task_type = 'CUPID' 为Spark任务,其他计费任务如MapReduce、Lightning(交互式分析)、Mras计算公式如下,详细介绍请参考计算费用(按量计费)
MapReduce作业的计费公式为:
MapReduce作业当日计算费用=当日总计算时×单价(0.46元/计算时)
一个执行成功的MapReduce作业计算时=作业运行时间(小时)×作业调用的Core数量。
Lightning查询作业的计费公式为:
一次Lightning查询作业费用=查询输入数据量×单价(0.03元/GB)
Mars作业的计费公式为:
Mars作业当日计算费用=当日总计算时×单价(0.66元/计算时)
3、如何根据元数据去实现任务耗时TOPN统计
---创建任务耗时TOPN表time_topn
CREATE TABLE IF NOT EXISTS project1.time_topn
(
inst_id STRING
,cost_time BIGINT
,task_owner STRING
)
PARTITIONED BY
(
ds STRING
)
STORED AS ALIORC
;
---任务耗时TOPN统计
INSERT INTO TABLE project1.time_topn PARTITION(ds = '${datetime1}')
SELECT inst_id
,cost_time
,CASE WHEN a.task_owner='13**********' OR a.task_owner='23**********' OR a.task_owner='21**********' THEN b.user_name
ELSE a.task_owner
END task_owner
FROM (
SELECT inst_id
-- ,task_type
-- ,status
,datediff(a.end_time, a.start_time, 'ss') AS cost_time
,CASE WHEN GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY") IS NULL THEN owner_name
ELSE GET_JSON_OBJECT(a.settings, "$.SKYNET_ONDUTY")
END task_owner
FROM project1.information_history a
WHERE ds = '${datetime1}'
) a
LEFT JOIN project1.user_ram b
ON a.task_owner = b.user_id
;
4、通过钉钉机器人推送到钉群
(1)群机器人开发API
a、获取自定义机器人webhook
打开机器人管理页面。以PC端为例,打开PC端钉钉,点击头像,选择“机器人管理”。
在机器人管理页面选择“自定义”机器人,输入机器人名字并选择要发送消息的群,同时可以为机器人设置机器人头像。
完成必要的安全设置(至少选择一种),勾选 我已阅读并同意《自定义机器人服务及免责条款》,点击“完成”。安全设置目前有3种方式,设置说明参考 安全设置
完成安全设置后,复制出机器人的Webhook地址,可用于向这个群发送消息,格式如下:
https://oapi.dingtalk.com/robot/send?access_token=XXXXXX
注意:请保管好此Webhook 地址,不要公布在外部网站上,泄露后有安全风险。
(2)钉群消息推送demo
a、代码实现
package com.alibaba.sgri.message;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.task.SQLTask;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.dingtalk.api.response.OapiRobotSendResponse;
import com.taobao.api.ApiException;
/**
* @class: OdpsMessageSendNew
* @description:
* @author: Liujianwei
* @date: 2020-10-16 18:26:12
**/
public class test {
public static void main(String[] args) throws ApiException {
if (args.length < 1) {
System.out.println("请输入日期参数");
System.exit(0);
}
System.out.println("开始读取数据");
DingTalkClient client = new DefaultDingTalkClient(
"https://oapi.dingtalk"
+ ".com/robot/send?access_token=Webhook地址\n");
OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown");
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
//这里的日期作为参数
markdown.setText(getContent(args[0]));
markdown.setTitle("任务消费TOPN");
request.setMarkdown(markdown);
OapiRobotSendResponse response = client.execute(request);
System.out.println("消息发送成功");
}
/**
* 读取ODPS,获取要发送的数据
*
* @return
*/
public static String getContent(String day) {
Odps odps = createOdps();
StringBuilder sb = new StringBuilder();
try {
//==================这是任务消费=====================
String costTopnSql = "select sum(cost_sum)cost_sum,task_owner from cost_topn where ds='" + day + "' " + "group by task_owner order by cost_sum desc limit 5;";
Instance costInstance = SQLTask.run(odps, costTopnSql);
costInstance.waitForSuccess();
ResultSet costTopnRecords = SQLTask.getResultSet(costInstance);
sb.append("<font color=#FF0000 size=4>").append("任务消费TOPN(").append(day).append(
")[按照阿里云按量付费计算]").append("</font>").append("\n\n");
AtomicInteger costIndex = new AtomicInteger(1);
costTopnRecords.forEach(item -> {
sb.append(costIndex.getAndIncrement()).append(".").append("账号:");
sb.append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
sb.append(" ").append(" ").append("消费:").append("<font color=#2E64FE>").append(item.get("cost_sum"))
.append("元").append(
"</font>").append("\n\n")
.append("</font>");
});
//==================这是任务耗时=====================
String timeTopnSql = "select * from time_topn where ds='" + day + "' ORDER BY cost_time DESC limit 5;";
Instance timeInstance = SQLTask.run(odps, timeTopnSql);
timeInstance.waitForSuccess();
ResultSet timeTopnRecords = SQLTask.getResultSet(timeInstance);
sb.append("<font color=#FF8C00 size=4>").append("任务耗时TOPN(").append(day).append(")")
.append("\n\n").append("</font>");
AtomicInteger timeIndex = new AtomicInteger(1);
timeTopnRecords.forEach(item -> {
sb.append(timeIndex.getAndIncrement()).append(".").append("任务:");
sb.append("<font color=#2E64FE>").append(item.getString("inst_id")).append("\n\n").append("</font>");
sb.append(" ").append("账号:").append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
sb.append(" ").append("耗时:").append("<font color=#2E64FE>").append(item.get("cost_time"))
.append("秒").append(
"</font>").append("\n\n");
});
} catch (OdpsException | IOException e) {
e.printStackTrace();
}
return sb.toString();
}
/**
* 创建ODPS
*
* @return
*/
public static Odps createOdps() {
String project = "******";
String access_id = ""******";";
String access_key = ""******";";
String endPoint = "http://service.odps.aliyun.com/api";
Account account = new AliyunAccount(access_id, access_key);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
return odps;
}
}
备注:自定义钉钉群机器人开发API参考:钉钉开发平台
b、pom文件参考
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>DingTalk_Information</groupId>
<artifactId>DingTalk_Information</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.35.5-public</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-jdbc</artifactId>
<version>3.0.1</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<!-- get all project dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- MainClass in mainfest make a executable jar -->
<archive>
<manifest>
<mainClass>com.alibaba.sgri.message.test</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
c、定时调度执行
程序调度可以打包提交服务器去设置定时调度。本文打包程序提交至DataWorks执行定时调度,同时元数据获取也是每日调度执行。
上传jar包为MaxCompute资源,然后引用资源执行jar包且配置定时调度。
上传MaxCompute资源及引用参考MaxCompute资源,这里不做详细介绍
各项目元数据采集、任务消费和耗时TOPN计算及钉群机器人推送上下游节点调度配置如下:
DataWorks节点上下游配置参考节点上下文
四、每日钉群推送任务消费及耗时TOPN效果展示
五、相关费用统计参考文档
1、MaxCompute账单分析最佳实践:MaxCompute账单分析最佳实践
2、查看账单详情:查看账单详情
3、在DataWorks标准模式下统计个人账号使用资源情况:在DataWorks标准模式下统计个人账号使用资源情况
4、利用InformationSchema与阿里云交易和账单管理API实现MaxCompute费用对账分摊统计:利用InformationSchema与阿里云交易和账单管理API实现MaxCompute费用对账分摊统计
六、MaxCompute开发者社区交流群
欢迎加入“MaxCompute开发者社区2群”,点击链接MaxCompute开发者社区2群申请申请加入或扫描以下二维码加入。