MaxCompute与OSS非结构化数据读写互通(及图像处理实例)

0. 前言

MaxCompute作为阿里巴巴集团内部绝大多数大数据处理需求的核心计算组件,拥有强大的计算能力,随着集团内外大数据业务的不断扩展,新的数据使用场景也在不断产生。在这样的背景下,MaxCompute(ODPS)计算框架持续演化,而原来主要面对内部特殊格式数据的强大计算能力,也正在一步步的通过新增的非结构化数据处理框架,开放给不同的外部数据。 我们相信阿里巴巴集团的这种需求,也代表着业界大数据领域的最前沿实践和走向,具有相当的普适性。在之前我们已经对MaxCompute 2.0新增的非结构化框架做过整体介绍,描述了在MaxCompute上如何处理存储在OSS上面的非结构化数据,侧重点在怎样从OSS读取各种非结构化数据并在MaxCompute上进行计算。 而一个完整数据链路,读取和计算处理之后,必然也会涉及到非结构化数据的 写出。 在这里我们着重介绍一下从MaxCompute往OSS输出非结构化数据,并提供一个具体的在MaxCompute上进行图像处理的实例, 来展示从【OSS->MaxCompute->OSS】的整个数据链路闭环的实现。 至于对于KV NoSQL类型数据的输出,在对TableStore数据处理介绍 中已经有所介绍,这里就不再重复。

1. 使用前提和假设

1.1 MaxCompute 2.0 功能

这里介绍的功能基于MaxCompute新一代的2.0计算框架,目前2.0计算框架已经全面上线,默认就可使用。

另外本文中使用了MaxCompute 2.0新引进的一个BINARY类型,目前在使用BINARY类型时,还需要显性设置set odps.sql.type.system.odps2=true

1.2 网络连通性与访问权限

另外因为MaxCompute与OSS是两个分开的云计算,与云存储服务,所以在不同的部署集群上的网络连通性有可能影响MaxCompute访问OSS的数据的可达性。 关于OSS的节点,实例,服务地址等概念,可以参见OSS相关介绍。 在MaxCompute公共云服务访问OSS存储,推荐使用OSS私网地址(即以-internal.aliyuncs.com结尾的host地址)。

此外需要指出的是,MaxCompute计算服务要访问TableStore数据需要有一个安全的授权通道。 在这个问题上,MaxCompute结合了阿里云的访问控制服务(RAM)和令牌服务(STS)来实现对数据的安全反问:

首先需要在RAM中授权MaxCompute访问OSS的权限。登录RAM控制台,创建角色AliyunODPSDefaultRole,并将策略内容设置为:

{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "odps.aliyuncs.com"
        ]
      }
    }
  ],
  "Version": "1"
}

然后编辑该角色的授权策略,将权限AliyunODPSRolePolicy授权给该角色。

如果觉得这些步骤太麻烦,还可以登录阿里云账号点击此处完成一键授权

2. MaxCompute内置的OSS数据输出handler

2.1 创建External Table

MaxCompute非结构化数据框架希望从根本上提供MaxCompute与各种数据的联通,这里的“各种数据”是两个维度上的:

  1. 各种存储介质,比如OSS
  2. 各种数据格式, 比如文本文件,视频,图像,音频,基因,气象等格式的数据

而数据的这两个维度的特征,都是通过EXTERNAL TABLE的概念来引入MaxCompute的计算体系的。 与读取OSS数据的使用方法类似,对OSS数据进行写操作,在如上打开安全授权通道后,也是先通过CREATE EXTERNAL TABLE语句创建出一个外部表,再通过标准MaxCompute SQL的INSERT INTO/OVERWRITE等语句来实现的,这里先用MaxCompute内置的TsvStorageHandler为例来说明一下用法:

DROP TABLE IF EXISTS tpch_lineitem_tsv_external;

CREATE EXTERNAL TABLE IF NOT EXISTS tpch_lineitem_tsv_external
(
orderkey BIGINT,
suppkey BIGINT,
discount DOUBLE,
tax DOUBLE,
shipdate STRING,
linestatus STRING,
shipmode STRING,
comment STRING
)
STORED BY 'com.aliyun.odps.TsvStorageHandler'    ----------------------------------------- (1)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/tsv_output_folder/';  --(2)

这个DDL语句建立了一个外部表tpch_lineitem_tsv_external,并将前面提到的两个维度的外部数据信息关联到这个外部表上。

  1. 数据存储介质: LOCATION 将一个OSS上的地址关联到外部表上,对这个外部表的进行读写操作都会反映到这个OSS地址上。
  2. 数据存储格式: StorageHandler用来表明对这些数据的读写操作方式,这里使用了MaxCompute内置的 com.aliyun.odps.TsvStorageHandler, 用户可以使用这个由系统自带的实现来读取和写出TSV文件。 同时用户也可以通过MaxCompute的SDK来自定义StorageHandler, 这个将在后面的章节介绍。

其中OSS数据存储的具体地址的URI格式为:

LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'

最后还要提到的是,在上面的DDL语句中定义了外部表的Schema, 对于数据输出而言,这表示输出的数据格式将由这个Schema描述。 就TSV格式而言,这个schema描述比较直观容易理解; 而在用户自定义的输出数据格式上,这个schema与输出数据的联系则更松散一些,有着更大的*度。 在后面介绍通过自定义StorageHandler/Outputer的时候会详细展开。

2.2 通过对External Table的 INSERT 操作实现TSV文本文件的写出

在将OSS数据通过External Table关联上后,对OSS文件的写出可以对External Table做标准的SQL INSERT OVERWRITE/INSERT INTO来操作。 具体输出数据的来源可以有两种

  1. 数据源为MaxCompute的内部表: 也就是说可以通过对外表INSERT操作来实现MaxCompute内部表数据到外部存储介质的写出
  2. 数据源为之前通过External Table引入MaxCompute计算体系的外部数据: 这可以用来将外部数据引入MaxCompute进行计算,然后再存储到(不同的)外部存储地址,或者甚至是不同的外部存储介质(比如将TableStore数据经由MaxCompute导出到OSS)。

2.2.1 从MaxCompute内部表输出数据到OSS

这里先来看第一种场景:假设我们已经有一个名为tpch_lineitem的MaxCompute内部表,其schema可以通过

DESCRIBE tpch_lineitem; 

得到:

+------------------------------------------------------------------------------------+
| InternalTable: YES      | Size: 241483831680                                       |
+------------------------------------------------------------------------------------+
| Native Columns:                                                                    |
+------------------------------------------------------------------------------------+
| Field           | Type       | Label | Comment                                     |
+------------------------------------------------------------------------------------+
| l_orderkey      | bigint     |       |                                             |
| l_partkey       | bigint     |       |                                             |
| l_suppkey       | bigint     |       |                                             |
| l_linenumber    | bigint     |       |                                             |
| l_quantity      | double     |       |                                             |
| l_extendedprice | double     |       |                                             |
| l_discount      | double     |       |                                             |
| l_tax           | double     |       |                                             |
| l_returnflag    | string     |       |                                             |
| l_linestatus    | string     |       |                                             |
| l_shipdate      | string     |       |                                             |
| l_commitdate    | string     |       |                                             |
| l_receiptdate   | string     |       |                                             |
| l_shipinstruct  | string     |       |                                             |
| l_shipmode      | string     |       |                                             |
| l_comment       | string     |       |                                             |
+------------------------------------------------------------------------------------+

其中有16个columns。 现在我们希望将其中的一部分数据以TSV格式导出到OSS上面。 那么在用上述DDL创建出External Table之后,使用如下INSERT OVERWRITE操作就可以实现:

INSERT OVERWRITE TABLE tpch_lineitem_tsv_external
SELECT l_orderkey, l_suppkey, l_discount, l_tax, l_shipdate, l_linestatus, l_shipmode, l_comment
    FROM tpch_lineitem
    WHERE l_discount = 0.07 and l_tax = 0.01;

这里将从内部的tpch_lineitem表中,在符合l_discount = 0.07 并 l_tax = 0.01的行中选出8个列(对应tpch_lineitem_tsv_external这个外部表的schema)按照TSV的格式写到OSS上。 在上面这个INSERT OVERWRITE操作成功完成后,就可以看到OSS上的对应LOCATION产生了一系列文件:

osscmd ls oss://oss-odps-test/tsv_output_folder/

2017-01-14 06:48:27 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta
2017-01-14 06:48:12 4.80MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_0_0-0.tsv
2017-01-14 06:48:05 4.78MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_1_0-0.tsv
2017-01-14 06:47:48 4.79MB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113224724561g9m6csz7/M1_2_0-0.tsv
...

这里可以看到,通过上面LOCATION指定的oss-odps-test这个OSS bucket下的tsv_output_folder文件夹下产生了一个.odps文件夹,这其中将有一些.tsv文件,以及一个.meta文件。 这样子的文件结构是MaxCompute(ODPS)往OSS上输出所特有的:

  1. 通过MaxCompute对一个OSS地址,使用INSERT INTO/OVERWRITE 外部表来做写出操作,所有的数据将在指定的LOCATION下的.odps文件夹产生;
  2. 其中.odps文件夹中的.meta文件为MaxCompute额外写出的宏数据文件,其中用于记录当前文件夹中有效的数据。 正常情况下,如果INSERT操作成功完成的话,可以认为当前文件夹的所有数据均是有效数据。 只有在有作业失败的情况下需要对这个宏数据进行解析。 即使是在作业中途失败或被kill的情况下,对于INSERT OVERWRITE操作,再跑一次成功即可。 如果对于高级用户,一定需要解析.meta文件的话,可以联系MaxCompute技术团队。

这里迅速看一下这些tsv文件的内容:

osscmd cat oss://oss-odps-test/tsv_output_folder/.odps/20170113232648738gam6csz7/M1_0_0-0.tsv  

4236000067      9992377 0.07    0.01    1992-11-06      F       RAIL     across the ideas nag
4236000290      3272628 0.07    0.01    1998-04-28      O       RAIL    uriously. furiously unusual dinos int
4236000386      8081402 0.07    0.01    1994-02-19      F       RAIL    its. express, iron
4236000710      3879271 0.07    0.01    1995-03-10      F       AIR     es are carefully fluffily spe
...

可以看到确实在OSS上产生了对应的TSV数据文件。

最后,大家可能也注意到了,这个INSERT OVERWRITE操作产生了多个TSV文件,对于MaxCompute内置的TSV/CSV处理来说,产生的文件数目与对应SQL stage的并发度是相同的,在上面这个例子中,INSER OVERWITE ... SELECT ... FROM ...; 的操作在源数据表(tpch_lineitem) 上分配了1000个mapper,所以最后产生了1000个TSV文件的。 如果需要控制TSV文件的数目,可以配合MaxCompute的各种灵活语义和配置来实现。 比如如果需要强制产生一个TSV文件,那在这个特定例子中,可以在INSER OVERWITE ... SELECT ... FROM ...最后加上一个DISTRIBUTE BY l_discount, 就可以在最后插入仅有一个Reducer的Reduce stage, 也就会只输出一个TSV文件了:

osscmd ls oss://oss-odps-test/tsv_output_folder/

2017-01-14 08:03:41 39.00B Standard oss://oss-odps-test/tsv_output_folder/.odps/.meta
2017-01-14 08:03:35 4.20GB Standard oss://oss-odps-test/tsv_output_folder/.odps/20170113234037735gcm6csz7/R2_1_33_0-0.tsv

可以看到在增加了DISTRIBUTE BY l_discount后,现在同样的数据只了一个输出TSV文件,当然这个文件的size就大多了。 这方面的调控技巧还有很多,都是可以依赖SQL语言的灵活性,数据本身的特性,以及MaxCompute计算相关设置来实现的,这里就不深入展开了。

2.2.2 以MaxCompute为计算介质,实现不同存储介质之间的数据转移

External Table作为一个MaxCompute与外部存储介质的一个切入点,之前已经介绍过对OSS数据的读取以及TableStore数据的操作,结合对外部数据读取和写出的功能,就可以实现通过External Table实现各种各样的数据计算/存储链路,比如:

  1. 读取External Table A关联的OSS数据,在MaxCompute上做复杂计算处理,并输出到External Table B关联的OSS地址
  2. 读取External Table A关联的TableStore数据,在MaxCompute上做复杂计算处理,并输出到External Table B关联的OSS地址

而这些操作与上面数据源为MaxCompute内部表的场景, 唯一的区别只是SELECT的来源变成一个External table,而不是MaxCompute内置表。

3. 通过自定义StorageHandler来实现数据输出

除了使用内置的StorageHandler来实现在OSS上输出TSV/CSV等常见文本格式,MaxCompute非结构化框架提供了通用的SDK,允许用户对外输出自定义数据格式文件,包括图像,音频,视频等等。 这种对于用户自定义的完全非结构化数据格式支持,也是MaxCompute从结构化/文本类数据的一个向外扩展,在这里我们会以一个图像处理的例子,来走通整个【OSS->MaxCompute->OSS】数据链路,尤其着重介绍对OSS输出文件的功能。

为了方便大家理解,这里先提供一个在使用用户自定义代码的场景下,数据在MaxCompute计算平台上的流程:

MaxCompute与OSS非结构化数据读写互通(及图像处理实例)

从上图可以看出,从数据的流动和处理逻辑上理解,用户可以简单地把非结构化处理框架理解成在MaxCompute计算平台两端有机耦合的的数据导入(Ingres)以及导出(Egress):

  1. 外部的(OSS)数据经过非结构化框架转换,会使用Java用户容易理解的InputStream类提供给自定义代码接口。 用户自实现Extract逻辑只需要负责对输入的InputStream做读取/解析/转化/计算,最终返回MaxCompute计算平台通用的Record格式;
  2. 这些Record可以*的参与MaxCompute的SQL逻辑运算,这一部分计算是基于MaxCompute内置的强大结构化SQL运算引擎,并可能产生新的Record
  3. 运算过后的Record中再传递给用户自定义的Output逻辑,用户在这里可以进行进一步的计算转换,并最终将Record里面需要输出的信息通过系统提供的OutputStream输出,由系统负责写到OSS。

值得指出的是,这里面所有的步骤都是可以由用户根据需要来进行*的选择与拼接的。 比如如果用户的输入就是MaxCompute的内部表,那步骤1.就没有必要了,事实上在前面的章节2中的例子,我们就实现了将内部表直接写成OSS上的TSV文件的流程。 同理, 如果用户没有输出的需求,步骤3. 就没有必要,比如我们之前介绍的OSS数据的读取。 最后,步骤2.也是可以省略的,比如如果用户的所有计算逻辑都是在自定义的Extract/Output中完成,没有进行SQL逻辑运算的需要,那步骤1.是可以直接连接到步骤3.的。

理解了上面这个数据变换的流程,我们就可以来通过一个图像处理例子来看看怎么具体的通过非结构化框架在MaxCompute SQL上完整的实现非结构化数据的读取,计算以及输出了:

3.1 范例:OSS图像文件 -> MaxCompute计算处理 -> OSS图像输出

这里我们先提供实现这整个【OSS->MaxCompute->OSS】数据链路需要用到的MaxCompute SQL query,并做简单的注解,详细的用户代码实现逻辑将在后面的3.2子章节中介绍SDK接口的时候做展开解释。

3.1.1 关联OSS上的原始输入图像到External Table: images_input_external

DROP TABLE IF EXISTS images_input_external;
CREATE EXTERNAL TABLE IF NOT EXISTS images_input_external
(
name STRING,
width BIGINT,
height BIGINT,
image BINARY
)
STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler'   --- (1)
WITH SERDEPROPERTIES ('inputImageFormat'='BMP' , 'transformedImageFormat' = 'JPG')  --- (2)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/SampleData/test_images/mixed_bmp/'  --- (3)
USING 'odps-udf-example.jar';   --- (4)

说明:

  1. 用户指明使用的用户代码wrapper class名字是com.aliyun.odps.udf.example.image.ImageStorageHandler,这个class及其依赖的三方库用户通过jar提供,具体jar名字会通过下面的USING语句(见第4点)指定。
  2. 通过SERDEPROPERTIES来实现参数传递,格式为'key'='value', 具体用法可以参见基本功能介绍 以及下面的用户代码说明
  3. 指定输入图像地址,这个地址上存放了一系列不同分辨率的bmp图像文件
  4. 指定包含用户JAR包,内含自定义的StorageHandler/Extractor/Outputer,以及需要的三方库(这里用到了Java ImageIO库,具体见下面用户代码范例)。JAR包通过ADD JAR命令上传,可以参见基本功能介绍

另外要说明的是这里指定的External Table的schema就是用户在进行Extract操作后构造的Record格式,具体怎么构造这个Schema用户可以根据需要自己根据能从输入数据中抽取到的信息定义。 在这里我们定义了对于输入图片数据,会将图片名称,图片的长和宽,以及图片的二进制bytes抽取出来放进Record(见后面的Extractor代码说明),所以就有了上面的【STRING,BIGINT,BIGINT,BINARY】的schema。

3.1.2 关联OSS输出地址到External Table: images_output_external

CREATE EXTERNAL TABLE IF NOT EXISTS images_output_external
(
image_name STRING,
image_width BIGINT,
image_height BIGINT,
outimage BINARY
)
STORED BY 'com.aliyun.odps.udf.example.image.ImageStorageHandler'
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/oss-odps-test/dev/output/images_output/' ---(1)
USING 'odps-udf-example.jar';

说明: 可以看到这里创建关联输出图像文件的External Table,使用的DDL语句,与前面关联输入图像时使用的DDL语句是非常类似的:只是LOCATION不一样,表明图像数据处理后将输出到另外一个地址。 另外还有一点就是这里我们没有使用SERDEPROPERTIES来进行传参,这个只是在这个场景上没有需求,在有需求的时候可以用同样的方法把参数传递给outputer。 当然这里两个DDL语句如此相似,有一个原因是因为我们这个例子中用户代码中对于Extract出的Record以及输入给Outputer的Record使用了一样的schema, 同时这一对Extractor和Outputer都被封装在了同一个ImageStorageHandler里放在同一个JAR包里。 在实际应用中,这些都是可以根据实际需求自己调整的,由用户自己选择组合和打包方式

3.1.3 从OSS读取原始图片数据到MaxCompute, 计算处理,并输出图像到OSS

在上面的3.1.1以及3.1.2子章节中的两个DDL语句,分别实现了把输入OSS数据,以及计划输出OSS数据,分别绑定到两个LOCATION以及指定对应的用户处理代码,参数等设置。 然而这两个DDL语句对系统而言,只是进行了一些宏数据的记录操作,并不会涉及具体的数据计算操作。 在这两个DDL语句运行成功后,运行如下SQL语句才会引发真正的运算。 换句话说,在Fig.1中描述的整个【OSS->MaxCompute->OSS】数据读取/计算/输出链路,实际上都是通过下面一个简单的SQL 语句完成的:

INSERT OVERWRITE TABLE images_output_external
SELECT * FROM images_input_external
WHERE width = 1024;

这看起来就是一个标准的MaxCompute SQL语句,只不过因为涉及了images_output_externalimages_input_external这两个外部表,所以真正进行的物理操作与传统的SQL操作会有一些区别:在这个过程中,涉及了读写OSS,以及通过ImageStorageHandler这个wrapper,调用自定义的Extractor,Outputer代码来对数据进行操作。 下面就来具体看看在这个例子中的用户自定义代码实现了怎样的功能,以及具体是如何实现的。

3.2 ImageStorageHandler实现

如同之前介绍过的,MaxCompute非结构化框架通过StorageHandler这个接口来描述对各种数据存储格式的处理。 具体来说,StorageHandler作为一个wrapper class, 让用户指定自己定义的Exatractor(用于数据的读入,解析,处理等) 以及Outputer(用于数据的处理和输出等)。 用户自定义的StorageHandler 应该继承 OdpsStorageHandler,实现getExtractorClass以及getOutputerClass 两个接口。

通常作为wrapper class, StorageHandler的实现都很简单,比如这里的ImageStorageHandler 就只是通过这两个接口指定了我们将使用ImageExtractor以及ImageOutputer:

package com.aliyun.odps.udf.example.image;

public class ImageStorageHandler extends OdpsStorageHandler {
  @Override
  public Class<? extends Extractor> getExtractorClass() {
    return ImageExtractor.class;
  }

  @Override
  public Class<? extends Outputer> getOutputerClass() {
    return ImageOutputer.class;
  }
}

另外要说明的是如果确定在使用某个StorageHandler的时候,只需要用到Extractor,或者只需要用到Outputer功能,那不需要的接口则不用实现。 比如如果我们只需要读取OSS数据而不需要做INSERT操作,那getOutputerClass()的实现只需要扔个NotImplemented exception就可以了,不会被调用到。

3.3 ImageExtractor实现

因为对于SDK中Extractor接口的介绍以及对用户如何写一个自定义的Extractor,在之前介绍的OSS数据的读取中已经有所涉及,所以这里就不再对这方面做深入的介绍。

Extractor的工作在于读取输入数据并进行用户自定义处理,那么我们首先来看看这里由images_input_external这个外表绑定的OSS输入LOCATION上存放的具体数据内容:

osscmd ls oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/                                                                                                                       

2017-01-09 14:02:01 1875.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/barbara.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/cameraman.bmp
2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/fishingboat.bmp
2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/goldhill.bmp
2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/house.bmp
2017-01-09 14:01:59 468.80KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/jetplane.bmp
2017-01-09 14:02:01 2.32MB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lake.bmp
2017-01-09 14:01:59 257.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/lena.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/livingroom.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/pirate.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/walkbridge.bmp
2017-01-09 14:02:00 1054.74KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_blonde.bmp
2017-01-09 14:02:00 768.05KB Standard oss://oss-odps-test/dev/SampleData/test_images/mixed_bmp/woman_darkhair.bmp

可以看到这个LOCATION存放了一系列bmp图像数据,分辨率从 400 x 400 到 1200 x 1200不等。 具体在这个例子中用到的ImageExtractor的详细代码在github上可以找到, 这里只做一些简单介绍说明该Extractor做了些什么工作:

  1. 从输入的OSS地址上使用非结构化框架提供的InputStream接口读取图像数据,并在本地进行如下操作

    • 对于图像宽度小于1024的图片,统一放大到1024 x 1024; 对于图像宽度大于1024的图片,跳过不进行处理
    • 处理过的图片,在内存中转存成由输入参数指定的格式(JPG)
  2. 把处理后在内存中的JPG数据的原始字节存入输出的Record中的BINARY field, 同一个Record中还将存放处理后图像的长和宽(都是1024), 以及原始的图像名字(这个可以从输入的InputStream上获取);
  3. 填充后的Record从Extract接口返回进入MaxCompute系统;
  4. 在这个过程中,用户可以灵活的进行各种操作,比如额外的参数验证等。

另外要说明的是,目前Record作为MaxCompute结构化数据处理的基本单元,有一些额外的限制,比如BINARY/STRING类型都有8MB大小的限制,但是在大部分场景下这个大小应该是能满足存储需求的。

3.4 ImageOutputer的实现

接下来我们着重讲一下ImageOutputer的实现。 首先所有的用户输出逻辑都必须实现Outputer接口,具体来说有如下三个:setup, output和close, 这和Extractor的setup, extract和close三个接口基本上是对称的。

// Base outputer class, custom outputer shall extend from this class
public abstract class Outputer{

  public abstract void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes);

  public abstract void output(Record record) throws IOException;

  public abstract void close() throws IOException;
}

这其中setup()和close()在一个outputer中只会调用一次。 用户可以在setup里面做初始化准备工作,另外通常需要把setup()传递进来的这三个参数保存成ouputerd的class variable, 方便之后output()或者close()接口中使用。 而close()这个接口用于方便用户代码的扫尾工作。

通常情况下大部分的数据处理发生在output(Record)这个接口内。 MaxCompute系统会根据当前outputer分配处理的Record数目不断调用,也就是对每个输入Record系统会调用一次 output(Record)。 系统假设在一个output(Record) 调用返回的时候,用户代码已经消费完这个Record, 因此在当前output(Record)返回后,系统可能将这个Record所使用的内存用作它用: 所以不推荐一个Record中的信息在跨多个output()函数调用被使用,如果一定有这个需求的话,用户必须把相关信息通过class variable等方式自行另外保存。

3.4.1 ImageOutputer.setup()

setup用于初始化整个outputer, 在这个接口上提供了整个outputer操作过程中可能需要的参数:

  • ExecutionContext: 用于提供一些系统信息和接口,比如读取resource等,在ImageOutputer这个例子中我们没有用到这个参数;
  • OutputStreamSet: 用户可以从这个类的next()接口获取对外输出所需要的OutputStream,具体用法我们在下面详细介绍;
  • DataAttributes: 用户通过SERDEPROPERTIES设置的key-value参数可以通过这个类获取,参数获取这里ImageOutputer例子中没有用到,但是Extractor上的setup参数中也有这个类,在上面的ImageExtractor用到了改功能,可以参考一下。 同时这个类上面还提供了一些helper接口,比如方便用户验证schema等。

在我们这个ImageOutputer里,setup()的实现比较简单:

@Override
  public void setup(ExecutionContext ctx, OutputStreamSet outputStreamSet, DataAttributes attributes) {
    this.outputStreamSet = outputStreamSet;
    this.attributes = attributes;
    this.attributes.verifySchema(new OdpsType[]{ OdpsType.STRING, OdpsType.BIGINT, OdpsType.BIGINT, OdpsType.BINARY });
  }

只是做了简单的初始化以及对schema的验证。

3.4.2 ImageOutputer.output(Record) 以及 OutputStreamSet的使用

在介绍具体output()接口之前,首先我们要来看看 OutputStreamSet, 这个类有两个接口:

public interface OutputStreamSet{

SinkOutputStream next();
SinkOutputStream next(String fileNamePostfix);

}

两个接口都是用来获取一个新的SinkOutputStream(一个Java OutputStream的实现,可以按照OutputStream使用),两个接口唯一的区别是next()获取的OutputStream写出的文件名完全由MaxCompute系统决定,而next(String fileNamePostfix)则允许用户提供文件名的postfix。 提供这个postfix的意义是,在输出文件具体地址和名字格式总体由MaxCompute系统决定的前提下,用户依然可以定制一个方便理解的postfix。 比如使用next("_boat.jpg") 得到的OutputStream可能对应如下一个输出文文件:

oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0_boat.jpg

这其中尾端的"_boat.jpg"可以帮助用户理解输出文件的涵义。 如果这个 OutputStream是由next()获得的话,那对应的输出文件可能就是这样的:

oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0

用户可能就需要具体读取这个文件才能知道这个文件中具体存放了什么内容。

前面提到output(Record)这个接口会由系统不断调用,但是应该强调的是,并不一定在每一个Record都需要调用一次OutputStreamSet.next()接口来获得一个新的OutputStream事实上在大多数情况下,我们建议在一个Outputer里面尽可能减少调用next()的次数(最好只调用一次)。 也就是说理想情况下,一个outpuer只应该产生一个输出文件。 比如处理TSV这种文本格式文件,假设有5000个Record对应5000行TSV数据,那么最理想的情况是应该把这5000行数据全部写到一个TSV文件中。 当然用户可能会有各种各样不同的切分输出文件的需求:比如希望每个文件大小控制在一定范围,或比如文件的边界有显著的意义等等。

具体到当前这个图像例子,从下面的ImageOutputer代码实现中可以看出,这个例子中确实是处理每个Record就调用一次next()的,因为在当前场景中,每一个输入的Record都表示一张图片的信息(binary bytes, 图像名字,图像长宽),所以这里通过多次调用next()来输出多个图片文件。 但是我们还是需要再次强调,调用next()的次数过多可能有一些其他弊端,比如造成碎片化小数据在OSS上的存储等等。 尤其在MaxCompute这种分布式计算系统上,因为系统本身就会调度起多个outputer进行并行计算处理,如果每个outpuer都输出过多文件的话,最后产生的文件数目会有一个乘性效应。 回头来看我们这个例子中,即使在这里,多个图像其实也可以通过一个OutputStream,按照tar/tar.gz的方式写到单个文件中,这些都是在实现具体系统中用户需要根据自己的场景, 以及处理逻辑,输出数据类型等信息来进行优化和tradeoff的。

在理解了这些之后,现在来具体看看ImageOutputer的实现output接口实现:

  @Override
  public void output(Record record) throws IOException {
    String name = record.getString(0);
    Long width = record.getBigint(1);
    Long height = record.getBigint(2);
    ByteArrayInputStream input =  new ByteArrayInputStream(record.getBytes(3));
    BufferedImage sobelEdgeImage = getEdgeImage(input);
    OutputStream outputStream = this.outputStreamSet.next(name + "_" + width + "x" + height + "." + outputFormat);
    ImageIO.write(sobelEdgeImage, this.outputFormat, outputStream);
  }

可以看到这里主要就做了三件事情:

  1. 根据之前保存的图像名字,长宽信息,和编码方式(".jpg")拼出一个带扩展名的输出文件名postfix。
  2. 读取图像binary bytes,并用getEdgeImage()来利用sobel算子对图像做边缘检测。 具体getEdgeImage()的实现这里就不进行深入解释了: 使用了标准的sobel模板卷积算法, 有兴趣看ImageOutputer源码即可。
  3. 对每一个图像产生一个新的OutputStream并将数据写出,至此当前Record处理完毕,写出一张图片到OSS,output()函数返回。

3.4.3 ImageOutputer.close()

在这个例子中,outputer.close()接口没有包含具体的实现逻辑,是个no-op。

至此我们就介绍完了一个output的实现,现在可以看看在运行完这个SQL query,对应OSS地址的数据:

osscmd ls oss://oss-odps-test/dev/output/images_output/ 

2017-01-15 14:36:50 215.19KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-0-barbara_1024x1024.jpg
2017-01-15 14:36:50 108.90KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-1-cameraman_1024x1024.jpg
2017-01-15 14:36:50 169.54KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-2-fishingboat_1024x1024.jpg
2017-01-15 14:36:50 214.94KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-3-goldhill_1024x1024.jpg
2017-01-15 14:36:50 71.00KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-4-house_1024x1024.jpg
2017-01-15 14:36:50 126.50KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-5-jetplane_1024x1024.jpg
2017-01-15 14:36:50 169.63KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-6-lake_1024x1024.jpg
2017-01-15 14:36:50 194.18KB Standard oss://oss-odps-test/dev/output/images_output/.odps/20170115148446219dicjab270/M1_0_-1--1-7-lena_1024x1024.jpg
...

可以看到图像数据按照期待格式写到了指定地址,这里我们就选一个输入图像(lena.bmp)以及对应的输出图像(M1_0_-1--1-7-lena_1024x1024.jpg)看一下对比:

MaxCompute与OSS非结构化数据读写互通(及图像处理实例)

这个例子中整个图像处理流程已经通过如上的SQL query完成。 而从上面展示的ImageExtractor以及ImageOutputer 源代码,我们可以看出整个过程中用户的逻辑基本与写单机图像处理程序无异,用户的代码只需要在Extractor上做InputStreamRecord的准换,而在Outputer上做反向的RecordOutputSteam的写出处理,其他核心的处理逻辑实现基本和单机算法实现相同,在用户的层面,并不用去操心底层分布式系统的细节以及MaxCompute和OSS的交互

3.5 数据处理步骤的灵活性

从上面这个例子中我们也可以看出,在一个完整的【OSS->MaxCompute->OSS】数据流程中,Extractor和Outputer中涉及的具体计算逻辑其实也并不一定会有一个非常明确的边界。 Extractor和Outputer只要各自完成所需的转换Record/Stream的转换,具体的额外算法逻辑在两个地方都有机会完成。 比如上面这个例子的整个流程涉及了如下图像处理相关的运算:

  1. 图像的缩放 (统一到 1024 x 1024)
  2. 图像格式的转换 (BMP -> JPG)
  3. 图像的Sobel边缘检测

上面的例子实现中,把1. 和 2. 放在ImageExtractor中完成,而3.则放在ImageOutputer中完成,但并不是唯一的选择。 我们完全可以把所有3个步骤都放在ImageExtractor中完成,让ImageOutputer只做Record到写出最后图像的操作;也可以在ImageExtractor中只做读取原始binary到Recrod, 而把所有3个图像处理步骤都放在ImageOutputer中进行,等等。 具体进行怎样的选择,用户可以完全根据需要自己实现。

另外一个系统设计的点是如果对于一个数据需要做重复的运算,那可以考虑将数据从OSS中通过Extractor读出进MaxCompute,然后存储成MaxCompute的内置表格再进行(多次)的计算。 这个对于MaxCompute和OSS没有进行混布,不在一个物理网络上的场景尤其有意义: MaxCompute从内置表中读取数据无疑要比从外部OSS存储服务中读出数据要有效得多。 在上面3.1.3子章节中的图像处理例子,这个INSER OVERWITE操作:

INSERT OVERWRITE TABLE images_output_external
SELECT * FROM images_input_external
WHERE width = 1024;

就可以改写成两个分开的语句:

INSERT OVERWRITE TABLE images_internal
SELECT * FROM images_input_external
WHERE width = 1024;

INSERT OVERWRITE TABLE images_output_external
SELECT * FROM image_internal;

通过把数据写到一个内部images_internal表中,后面如果有多次读取数据的需求的话,就可以不再去访问外部OSS了。 这里也可以看到MaxCompute非结构化框架以及SQL语法本身提供了非常高的灵活性和可扩展性,用户可以根据实际计算的不同模式/场景/需求,来在上面完成各种各样的数据计算工作流。

5. 结语

非结构化数据处理框架随着MaxCompute 2.0一起推出,意在丰富MaxCompute平台的数据处理生态,来打通阿里云核心计算平台与阿里云各个重要存储服务之间的数据链路。 在之前介绍过的读取OSS以及处理TableStore数据的整体方案后,本文侧重介绍数据往OSS的输出方案,并依托一个图像处理的处理实例,展示了【OSS->MaxCompute->OSS】整个数据链路的实现。 在这些新功能的基础上,我们希望实现整个阿里云计算与数据的生态融合: 在不同的项目上,我们已经看到了在MaxCompute上处理OSS上的海量视频,图像等非结构化数据的巨大潜力。 今后随着这个生态的丰富,我们期望OSS数据,TableStore数据以及MaxCompute内部存储的数据,都能在MaxCompute的核心计算引擎上进行融合,从而产生更大的价值。

上一篇:异步导入导出Redis数据(利用Hiredis、Libevent)


下一篇:实用 | Apache Kudu读写路径