MaxCompute(原ODPS)任务优化之列裁剪

免费开通大数据服务:https://www.aliyun.com/product/odps

转自kaiding


最近因为几个ODPS任务节点扣分严重,计算健康度一度堕落至85分的红线以下,上了一次黑榜,立马开始了艰苦的优化之旅。刚刚前几天搞定了两个OpenMR列裁剪优化,略作记录。

什么是列裁剪以及为什么要做列裁剪

列裁剪,即针对OpenMR任务Map阶段的输入,如果只使用了其中的某几列,则裁剪掉不需要使用的列,只指定需要使用的列。这样做的好处也就很明显了,减少网络I/O,提升Map计算效率等等。其实从使用上来看或许叫做输入列指定更顾名思义一些。

如何看任务是否需要列裁剪

这里贴一个本次重点做了列裁剪的节点,在列裁剪优化之前的logview(由于系统只保留最近几天的日志,所以这里就不贴链接了),这里就贴部分内容吧,具体查看路径为:
1. 在待优化节点右键查看运行日志
2. 找到logview轻点打开
3. ODPS Tasks->Detail
4. Main Content->JSONSummary

其中某一路输入的相关统计数据如下:

"M1_U13" :
                {
                    "writer_dumps" :
                    {
                        "R2_1" :
                        [
                            0,
                            0,
                            0
                        ]
                    },
                    "planned_memory" : 4096,
                    "input_attrs" :
                    {
                        "split" : 256
                    },
                    "user_counters" :
                    {
                        "INPUTS-TOTAL" :
                        {
                            "name" : "INPUTS-TOTAL",
                            "counters" :
                            [
                                {
                                    "name" : "one_of_your_input_table[用户XX信息表][ds=20161009]",
                                    "value" : 1776171427
                                }
                            ]
                        },
                        "INPUTS-SUCCESS" :
                        {
                            "name" : "INPUTS-SUCCESS",
                            "counters" :
                            [
                                {
                                    "name" : "one_of_your_input_table[用户XX信息表]_SUCCESS",
                                    "value" : 1776171425
                                }
                            ]
                        },
                        "ODPS_SDK_FRAMEWORK_COUNTER_GROUP" :
                        {
                            "name" : "ODPS_SDK_FRAMEWORK_COUNTER_GROUP",
                            "counters" :
                            [
                                {
                                    "name" : "input_col_total_num",
                                    "value" : 14896
                                },
                                {
                                    "name" : "input_col_used_num",
                                    "value" : 3724
                                }
                            ]
                        }
                    },
                    "total_instance_run_time" : 77213,
                    "output_record_counts" :
                    {
                        "R2_1" :
                        {
                            "R2_1" : 1776171425
                        }
                    },
                    "custom_info" :
                    {
                        "StringMemoryPoolSize" : "41963520"
                    },
                    "input_record_counts" :
                    {
                        "input" : 1776171427
                    },
                    "writer_bytes" :
                    {
                        "R2_1" :
                        {
                            "R2_1" : 54021298354
                        }
                    },
                    "reader_dumps" :
                    {

                    },
                    "input_record_count_stats" :
                    {
                        "input" :
                        [
                            830754,
                            1017349,
                            953937
                        ]
                    },
                    "reader_bytes" :
                    {
                        "input" : 484089191296
                    },
                    "instance_run_times" :
                    [
                        23,
                        74,
                        41
                    ],
                    "output_record_count_stats" :
                    {
                        "R2_1" :
                        [
                            830754,
                            1017349,
                            953937
                        ]
                    },
                    "task_run_time" : 173,
                    "planned_cpu" : 100,
                    "instance_count" : 1862,
                    "output_bytes_stats" :
                    {
                        "R2_1" :
                        [
                            25216988,
                            31049145,
                            29013458
                        ]
                    }
                }

OK,没错,input_col_total_numinput_col_used_num两个字段的数据就是我们此次优化需要关注的要点。可以看到这张输入表在Map阶段总共输入了14896列(这里的意思不是说输入表有1万多列,而是map个数乘以表的列数得到的Map阶段总的输入列数),但是实际使用了3724列(同理),利用率非常低,扣分就是从这里开始的。。。

其实回到FuxiJobs点击M1_U13,然后在列出的任意一个FuxInstance中点击Debug可以看到这个Map任务中的统计数据:

{
    "endTime" : 1476055269,
    "finishedPercentage" : 100,
    "gblCounter" : "{"Counters":"{\"CountersValue\":[]}","CustomInfo":{"StringMemoryPoolSize":"41963520","UserCounters":"{\n    \"INPUTS-SUCCESS\": {\n        \"counters\": [{\n                \"name\": \"one_of_your_input_table[用户XX信息表]_SUCCESS\",\n                \"value\": 963577}],\n        \"name\": \"INPUTS-SUCCESS\"},\n    \"INPUTS-TOTAL\": {\n        \"counters\": [{\n                \"name\": \"one_of_your_input_table[用户XX信息表][ds=20161009]\",\n                \"value\": 963577}],\n        \"name\": \"INPUTS-TOTAL\"},\n    \"ODPS_SDK_FRAMEWORK_COUNTER_GROUP\": {\n        \"counters\": [{\n                \"name\": \"input_col_total_num\",\n                \"value\": 8},\n            {\n                \"name\": \"input_col_used_num\",\n                \"value\": 2}],\n        \"name\": \"ODPS_SDK_FRAMEWORK_COUNTER_GROUP\"}}"},"InputCounters":{"input":{"bytes":248006624,"records":963577}},"OutputCounters":{"R2_1":{"bytes":29108313,"dumps":0,"records":963577}},"PanguReadBytes":[0,48051280,48,199955317],"PanguWriteBytes":[0,0,0,0]}",
    "history" :
    [

    ],
    "id" : "Odps/your_project_20161009231931375gj8pn6sb1_LOT_0_0_0_job0/M1_U13#1045_0",
    "input_bytes" : 248006624,
    "input_records" : 963577,
    "logId" : "PU1UQXVNVEF6TGpFd01TNHhPRGdzVDJSd2N5OTBZbDl0WkdOZk1qQXhOakV3TURreU16RTVNekV6TnpWbmFqaHdialp6WWpGZlRFOVVYekJmTUY4d1gycHZZakF2VFRGZlZURXpRSEpoWVdJeE1qTTRPUzVsZERFak1qUWFiYw==",
    "output_bytes" : 29108313,
    "output_records" : 963577,
    "startTime" : 1476055236,
    "status" : "Terminated",
    "st" : "2016-10-10 07:20:36",
    "et" : "2016-10-10 07:21:09",
    "latencynum" : 33,
    "latency" : 33,
    "barleft" : "8%",
    "barlen" : "19%"
}

从这个Map统计数据可以看到,输入表共输入8列,但实际使用了2列,即使用率只有1/4而已,浪费严重。

如何进行列裁剪优化

  • 如果你使用的是Java SDK,那么可以在指定输入表的同时指定输入列,如下:

    InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);
    
  • 如果你的MR任务输入表确定而不变,则可以参数化进行设置,如下:

    set odps.mapred.input.columns.test.wc_in=c1,c2; //设置test这个project的wc_in表的输入column为c1,c2这俩列
    

第二种方式我没有用过,目前只是实践了第一种方式,而这种方式的灵活性在于,如果你是通过配置文件来表达输入表和输入列时,可以实现只改配置而不改代码

由于目前提供的sdk不支持使用列索引来做列裁剪(个人觉得应该封装一下),如果因为历史原因,只在输入表的自定义配置文件中指定了输入表的列索引也没关系(如果配置文件比较大,输入表有几十个,我想你也不希望去重新找每张表列索引对应的列名),可以通过获取TableTableSchema,然后通过下标来获取到列名,简单看下代码:

Odps odps = SessionState.get().getOdps();
Tables tables = odps.tables();
Table table = tables.get(projectName, tableName);
TableSchema tableSchema = table.getSchema();

List<String> inputColumnNameList = new LinkedList<String>();
for(int index : oneInputTable.getInputTableIndexes) { // 这里是你从配置文件中获得的某张输入表的数据
    Column column = tableSchema.getColumn(index);
    if(column != null && !StringUtil.isBlank(column.getName())) {
        inputColumnNameList.add(column.getName());
    }
}

// 接下来拼成一个字符串数组就OK了

已经做了列裁剪优化,为啥利用率反而降低了

这里有一个注意点就是,如果优化前在Map任务中是通过输入表的列索引去获取记录值的话,需要同步做相应修改。这是因为指定了输入列之后,实际在Map阶段的输入表列数比原输入表要少,你可以理解为在Map阶段是一张全新的表,此时列索引应该使用这张经过裁剪后的新表的列索引,因此使用原来的列索引可能会取不到值或者取到错误列的值。建议在使用时需要使用列名来访问或者通过新的列索引来获取记录值。

其实我上面贴的logview中的内容就是因为我在Map阶段使用了原输入表的列索引来访问,导致了利用率反而降低了。比如表wc_in一共有0-9共10列,你只需要读索引为7、8、9这三列,但是你在进行了列裁剪优化之后,实际在Map阶段的输入表只有0、1、2这三个索引对应的值,此时如果你还是使用原来的列索引去读的话就读不到数据,导致该表的输入列利用率从原来的30%变成了0%。

因此在完成了列裁剪优化之后务必检查Map阶段读取Record的方式:
1. 如果是使用列索引的务必改成列名,
2. 或者如果不方便获取列名的,在setup阶段读取配置文件建立列索引的映射来解决该问题。


欢迎加入“数加·MaxCompute购买咨询”钉钉群(群号: 11782920)进行咨询,群二维码如下:

MaxCompute(原ODPS)任务优化之列裁剪 
上一篇:SSH 和 EasyUI结合的 combobox默认值问题解决方法


下一篇:简明python教程的读书笔记(一)