前言
在大规模订单系统中,离线的数据分析必不可少。工程师和运营人员使用批处理工具如 Spark 对历史数据进行计算分析,其计算结果可以用于用户画像、产品推荐等多个场景。
传统的 MySQL 分库分表架构是无法应对这种需求的,一般会引入 Hive 对数据进行归档存储,然后利用 Hive 来对接 Spark 或者 HiveSQL 等分析工具。这样的架构会带来很高的维护成本,首先 Hive、HDFS、Hadoop 等大数据工具的搭建和维护需要专业的大数据运维团队;其次,从 MySQL 或 Oracle 向 Hive 中导入数据的 ETL 作业需要开发、维护、监控。运维成本高,系统链路复杂。
阿里云的表格存储,结合对象存储服务 OSS,实现了数据的自动投递归档,开发者无需关注运维细节,这极大降低了归档和离线数据分析的运维成本。Tablestore 结合 OSS 为此类订单场景下的离线数据分析提供了一种更为低成本、便捷的解决方案。
OSS 简介
对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。
使用 Tablestore 结合 OSS 系统,架构如下:
MySQL 中数据通过 DTS 同步进入 Tablestore,数据再从 Tablestore 被投递进入 OSS。这样 OSS 中可以存储全量历史数据,成为 Tablestore 的归档备份库,其中数据分区,使用列式存储,更适合对接批处理工具。这样的架构可以实现如下场景需求:
- 全量数据备份数据湖投递可以自动将表格存储的全表数据投递到 OSS Bucket 中,作为备份归档数据。
- 投递的数据按系统时间分区、Parquet 列存存储;利用 OSS 的高读带宽和列存面向扫描场景优化,对接批处理工具如 Spark,实现历史数据分析、画像分析等需求。
下文,我们将展示如何配置 Tablestore 向 OSS 中的数据自动同步任务。
OSS 数据投递
OSS 服务开通
首先需要开通 OSS 服务。在阿里云官网,进入对象存储 OSS 首页。点击立即开通。
勾选服务协议,然后点击立即开通。
然后需要在 OSS 创建 BUCKET。进入 OSS 管理控制台,点击 Bucket 列表,然后点击创建 Bucket。
填入 Bucket 名称,选择对应地域,其余选项选用默认项即可,点击确定,完成 Bucket 的创建。
至此,OSS 服务开通完毕,然后要在 Tablestore 中配置投递任务。
投递服务开通
进入表格存储首页。
点击管理控制台,进入控制台页面。点击进入实例。可以看到我们前面创建的订单相关表。
我们要将订单表 order_contract 投递到 OSS 。点击数据湖投递。
点击创建投递任务。
各参数如图。任务名称自定。目标 Table 选择 order_contract。目标 OSS BUCKET 选择 otssearch-vpc-hangzhou。投递路径自定义,这里使用 order_contract/year=$yyyy/month=$MM。投递类型选择全量&增量,Schema 采用自动生成。
点击确定,完成投递任务的创建。
此时,在 Tablestore 数据湖投递这一页里,可以看到刚刚新建的同步任务。任务会先同步历史数据,完成后,其状态显示为增量同步中。
同步结束后,可以在 OSS 控制台首页,对应 Bucket 下可以看到投递到 OSS 下的数据文件。
至此投递服务创建完毕。Tablestore 也支持使用 SDK 创建同步任务,具体可以参考数据湖投递->使用SDK。
DLA 查询
自动发现元数据
进入 DLA 控制台首页。点击元信息发现,然后点击 OSS数据源旁的进入向导。这里如果提示要开通 DLA 到 OSS 的访问权限,按照提示开通即可。
选择数仓模式。选择 parquet。填入 Schema 名称。
选择 OSS 目录位置,弹出
选择对应的 bucket。选择到对应路径,这里 order_contract 数据就在根路径/下,因此这里选择到根路径,不需要勾选 order_contract。然后点击确定,完成 OSS 目录位置的配置。
最后点击外层的创建,点击立即发现,完成元数据发现的创建。
此时,可以在元信息发现页面看到刚刚创建的任务。
SQL 查询数据
在 DLA 控制台首页,点击 SQL 执行,点击登录 DML 执行 SQL,进入 DML 页面。
在 DML 页面,可以看到刚刚自动创建出的 Schema “test_tablestore_oss”以及创建出的映射表 order_contract。对映射表执行对应的 SQL,可以得到对应结果。这里也可以使用程序读取表中的数据,相关内容在前面关于 DLA 的文章中已经讲述过,这里不再展开。
数据分析
下面将展示如何使用 Spark 对接 OSS,并完成如下需求,
- 统计各店铺历史总交易额,并将结果数据入库
创建集群
创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群。
登录 Spark-sql 客户端
在集群管理页面,点击创建的集群。
点击主机列表,点击emr-header-1机器。
点击远程连接,
选择立即登录。
输入创建集群时设定的密码。登录机器。
在指令行输入以下指令,登录 Spark-sql 客户端,
streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2
批处理
创建结果表
在 Tablestore 中创建用于存储批处理结果的结果表,当然,结果表可以存放于 MySQL、Tablestore、OSS对象存储等任意数据库以及存储结构。本例中,Tablestore 中结果表表名为 store_statistic,表结构如下,只预设一个主键。
序号 |
主键名称 |
主键类型 |
说明 |
1 |
s_id |
STRING |
分区键,店铺id |
执行 Spark-sql
首先在 Spark-sql 客户端输入如下 SQL,创建 Spark 中的源表。
create table order_contract_oss(
oId string,
create_time string,
total_price double,
p_brand string,
p_price double,
pay_time bigint,
has_paid bigint,
s_id string,
p_name string,
c_id string,
c_name string,
s_name string,
p_count bigint,
p_id string
) using parquet
location 'oss://{accessSecret}:{accessKey}@otssearch-vpc-hangzhou.oss-cn-hangzhou-internal.aliyuncs.com/order_contract';
然后在 Spark-sql 客户端输入如下 SQL,创建 Spark 中的目标表。目标表中除了店铺 id 字段 s_id 外,还有字段 total,作为店铺总交易额字段。
DROP TABLE IF EXISTS store_statistic;
CREATE TABLE store_statistic
USING tablestore
OPTIONS(
endpoint="https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="test-20210609",
table.name="store_statistic",
catalog='{"columns":{"s_id":{"col":"s_id","type":"string"},"total":{"col":"total","type":"string"}}}'
);
最后输入计算 SQL,如下,即完成批处理任务。
insert into store_statistic
select s_id,sum(total_price) as total from order_contract_oss where group by s_id
查看结果
在结果表中查看结果数据,如图。
总结
Tablestore 将数据投递进入 OSS,以 OSS 作为其全量数据备份。OSS 中数据以时间分区,列式存储,存储成本更低,且更适合支持数据分析需求。