在DataWorks标准模式下统计个人账号使用资源情况

背景
在使用MaxCompute的时候通常情况下,用户会通过Information Schema的task_history视图表来分析具体某个账号执行的SQL任务情况,来做到SQL成本分摊或SQL的时间成本优化。但大多数用户通过DataWorks标准模式下使用MaxCompute,这样在MaxCompute提供的元数据视图信息中将记录所有的生产作业执行账号为同一个主账号,只有小部分的开发作业执行账号为个人RAM子账号。本文主要介绍如何在DataWorks标准模式下统计个人账号使用资源情况。
在DataWorks标准模式下统计个人账号使用资源情况
如上图所示,这样大多数成本都无法升级到具体RAM账号上,进而做不到成本审计和分摊。
解决方案
在使用DataWorks开发MaxCompute作业时,每个节点都有一个责任人,默认为当前开发者即当前RAM子账号。如果能够获取到这个信息与Information Schema匹配即可。
获取节点责任人ID
在DataWorks页面中,一个节点的责任人如下示意图:
在DataWorks标准模式下统计个人账号使用资源情况

以一个真实的生产环境执行的作业logview为例,如下:
在DataWorks标准模式下统计个人账号使用资源情况
其中"SKYNET_ONDUTY":"219292777233523137"即为我们要获取的值。SKYNET_ONDUTY表示该节点的责任人,后面的value为该节点责任人(云账号&子账号)的ID。或者我们可以通过Information Schema的task_history表去解析获取到该ID。该值被记录在Information Schema的task_history表的settings字段里,需要进行一次解析。解析得到的账号ID就是该节点责任人(云账号&子账号)的ID。
在DataWorks标准模式下统计个人账号使用资源情况

--示例代码:
SELECT  inst_id,settings,
REGEXP_EXTRACT(settings,'"SKYNET_ONDUTY":"(.*?)"',1) AS SKYNET_ONDUTY
FROM information_schema.tasks_history
WHERE ds = 20200402 and inst_id='20200402064857130g8zorjim';

获取RAM子账号
通过上述步骤只能获取一个ID,还需要通过RAM API 来获取ID对应的子账号或云账号是哪一个。

{
    "requestId":"1CFF97CC-DD17-4C69-9AC7-7E869B8857AC",
    "isTruncated":false,
        "users":[
            {
                "userId":"219292777233523137",
                "userName":"mc_oss",
            "displayName":"mc_oss",
            "comments":"",
            "createDate":"2019-12-25T00:25:23Z",
            "updateDate":"2020-02-24T03:44:04Z"
          },
          {
            "userId":"218915375439469278",
            "userName":"bigdata_wei",
            "displayName":"bigdata_wei",
            "comments":"",
            "createDate":"2019-12-04T06:04:29Z",
            "updateDate":"2020-02-21T01:43:37Z"
          }
        ]
}

这里需要我们自己将结果抓取下来存储并解析出UserName和UserId,UserId即为节点责任人ID。
API实现代码逻辑参考如下
(1)创建一张表用来存储获取到的UserName和UserID

create table users_list(
  UserName string,
  UserId string
);

(2)ListUsers.java


import Utils.Configurations;
import Utils.ODPS;
import com.aliyun.odps.Odps;
import com.aliyun.odps.Table;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.ram.model.v20150501.ListUsersRequest;
import com.aliyuncs.ram.model.v20150501.ListUsersResponse;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

public class ListUsers {

    public static void main(String[] args) {
        Properties properties = Configurations.properties();
        String ai = properties.getProperty("access_id");
        String ak = properties.getProperty("access_key");
        String project = properties.getProperty("project_name");
        DefaultProfile profile = DefaultProfile.getProfile("cn-beijing", ai, ak);
        IAcsClient client = new DefaultAcsClient(profile);
        ListUsersRequest request = new ListUsersRequest();
        request.setRegionId("cn-beijing");
        try {
            ListUsersResponse response = client.getAcsResponse(request);
            List<ListUsersResponse.User> users = response.getUsers();
            //打开表
            Odps odps = ODPS.newOdps(ai, ak, project);
            Table table = odps.tables().get("users_list");
            TableTunnel.UploadSession uploadSession = new TableTunnel(odps).createUploadSession(project, table.getName());
            //遍历写到表中
            RecordWriter writer = uploadSession.openBufferedWriter();
            for (ListUsersResponse.User user : users) {
                Record record = uploadSession.newRecord();
                record.setString("username", user.getUserName());
                record.setString("userid", user.getUserId());
                System.out.println("写入数据" + user.getUserId() + ":" + user.getUserName());
                writer.write(record);
            }
            writer.close();
            uploadSession.commit();
            System.out.println("写入完成...");
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        } catch (TunnelException e) {
            System.out.println("创建table上传session失败");
            System.out.println(e.getMessage());
        } catch (IOException e) {
            System.out.println("写数据到表失败");
            System.out.println(e.getMessage());
        }

    }
}

(3)在src下创建一个Utils目录,放以下文件Configurations.java、ODPS.java。

 a、Configurations.java
 package Utils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 import java.util.ResourceBundle;
 public class Configurations {
  * 获取配置文件信息
  *
  * @return
  */
 public static Properties properties() {
     Properties properties = new Properties();
     // 使用ClassLoader加载properties配置文件生成对应的输入流
     InputStream in = Configurations.class.getClassLoader().getResourceAsStream("common.properties");
     // 使用properties对象加载输入流
     try {
         properties.load(in);
         return properties;
     } catch (IOException e) {
         System.out.println("配置文件读取有误" + e.getMessage());
     }
     return null;

 }

 /**
  * 读取配置文件
  *
  * @return
  */
 public static ResourceBundle read() {
     //config为属性文件名,放在包com.test.config下,如果是放在src下,直接用config即可
     ResourceBundle resourceBundle = ResourceBundle.getBundle("common");
     return resourceBundle;
 }
}
b、ODPS.java
package Utils;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;

public class ODPS {
    /**
     * 在外网中使用,获取odps的连接对象
     *
     * @param ai
     * @param ak
     * @param project
     * @return
     */
    static public Odps newOdps(String ai, String ak, String project) {
        Account account = new AliyunAccount(ai, ak);
        Odps odps = new Odps(account);
        odps.setDefaultProject(project);
        return odps;
    }

    /**
     * 在内网中使用,获取odps的连接对象
     *
     * @param ai
     * @param ak
     * @param project
     * @param odpsUrl
     * @return
     */
    static public Odps newOdps(String ai, String ak, String project, String odpsUrl) {
        Account account = new AliyunAccount(ai, ak);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);
        return odps;
    }
}

(4)common.properties放resource路径

# 主账号:
project_name=
access_id=
access_key=

通过“获取节点责任ID”和“获取RAM子账号”步骤后,基础信息已经可以拿到,还需要进行一次join拿到instanceid对应的具体云账号or子账号
在DataWorks标准模式下统计个人账号使用资源情况

--示例代码:
select  a.UserName,b.SKYNET_ONDUTY 
from    users_list a
join 
    (
     SELECT inst_id,settings,REGEXP_EXTRACT(settings,'"SKYNET_ONDUTY":"(.*?)"',1) AS SKYNET_ONDUTY
     FROM   information_schema.tasks_history
     WHERE  ds = 20200402 and inst_id='20200402064857130g8zorjim'
)b
on a.UserId = b.SKYNET_ONDUTY
;

⚠️:此处任务执行日期ds和inst_id需要根据自己的任务做相应的替换。
计算成本分摊/审计需求
1、MaxCompute提供了开放元数据的Information_Schema服务,通过元数据服务Information_Schema里面的作业历史表tasks_history,可以查询到准实时的项目作业历史明细。包括:项目名称、任务名称、Instance id、开始时间、结束时间、任务复杂度、任务CPU使用情况等字段。
2、用户可以通过费用中心账号总览消费记录去查询具体的消费情况。
同时,阿里云交易和账单管理OpenAPI为用户提供管理阿里云产品售卖和财资能力,通过该API可以程序化获取MaxCompute作业计费明细数据。
调用QueryUserOmsData接口(阿里云的账单系统OMS),可以查询到具体计量信息编号、数据分类、存储、SQL读取量、公网上下行流量等字段信息。
通过Information_Schema.tasks_history和账单系统进一步统计计算成本分摊/审计需求。
欢迎加入“MaxCompute开发者社区2群”,点击链接MaxCompute开发者社区2群申请申请加入或扫描以下二维码加入。
在DataWorks标准模式下统计个人账号使用资源情况

上一篇:服务器异常关闭导致msyql InnoDB数据库崩溃,数据库重启失败的问题


下一篇:MaxCompute SQL与Hive对比分析及使用注意事项