基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

前言

在大规模订单系统中,离线的数据分析必不可少。工程师和运营人员使用批处理工具如 Spark 对历史数据进行计算分析,其计算结果可以用于用户画像、产品推荐等多个场景。

传统的 MySQL 分库分表架构是无法应对这种需求的,一般会引入 Hive 对数据进行归档存储,然后利用 Hive 来对接 Spark 或者 HiveSQL 等分析工具。这样的架构会带来很高的维护成本,首先 Hive、HDFS、Hadoop 等大数据工具的搭建和维护需要专业的大数据运维团队;其次,从 MySQL 或 Oracle 向 Hive 中导入数据的 ETL 作业需要开发、维护、监控。运维成本高,系统链路复杂。

阿里云的表格存储,结合对象存储服务 OSS,实现了数据的自动投递归档,开发者无需关注运维细节,这极大降低了归档和离线数据分析的运维成本。Tablestore 结合 OSS 为此类订单场景下的离线数据分析提供了一种更为低成本、便捷的解决方案。

OSS 简介

对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。

使用 Tablestore 结合 OSS 系统,架构如下:

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

MySQL 中数据通过 DTS 同步进入 Tablestore,数据再从 Tablestore 被投递进入 OSS。这样 OSS 中可以存储全量历史数据,成为 Tablestore 的归档备份库,其中数据分区,使用列式存储,更适合对接批处理工具。这样的架构可以实现如下场景需求:

  • 全量数据备份数据湖投递可以自动将表格存储的全表数据投递到 OSS Bucket 中,作为备份归档数据。
  • 投递的数据按系统时间分区、Parquet 列存存储;利用 OSS 的高读带宽和列存面向扫描场景优化,对接批处理工具如 Spark,实现历史数据分析、画像分析等需求。

下文,我们将展示如何配置 Tablestore 向 OSS 中的数据自动同步任务。 

OSS 数据投递

OSS 服务开通

首先需要开通 OSS 服务。在阿里云官网,进入对象存储 OSS 首页。点击立即开通

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

勾选服务协议,然后点击立即开通。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

然后需要在 OSS 创建 BUCKET。进入 OSS 管理控制台,点击 Bucket 列表,然后点击创建 Bucket

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

填入 Bucket 名称,选择对应地域,其余选项选用默认项即可,点击确定,完成 Bucket 的创建。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

至此,OSS 服务开通完毕,然后要在 Tablestore 中配置投递任务。

投递服务开通

进入表格存储首页

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

点击管理控制台,进入控制台页面。点击进入实例。可以看到我们前面创建的订单相关表。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

我们要将订单表 order_contract 投递到 OSS 。点击数据湖投递

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

点击创建投递任务

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

各参数如图。任务名称自定。目标 Table 选择 order_contract。目标 OSS BUCKET 选择 otssearch-vpc-hangzhou。投递路径自定义,这里使用 order_contract/year=$yyyy/month=$MM。投递类型选择全量&增量,Schema 采用自动生成

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

点击确定,完成投递任务的创建。

此时,在 Tablestore 数据湖投递这一页里,可以看到刚刚新建的同步任务。任务会先同步历史数据,完成后,其状态显示为增量同步中

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

同步结束后,可以在 OSS 控制台首页,对应 Bucket 下可以看到投递到 OSS 下的数据文件。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

至此投递服务创建完毕。Tablestore 也支持使用 SDK 创建同步任务,具体可以参考数据湖投递->使用SDK

DLA 查询

自动发现元数据

进入 DLA 控制台首页。点击元信息发现,然后点击 OSS数据源旁的进入向导。这里如果提示要开通 DLA 到 OSS 的访问权限,按照提示开通即可。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

选择数仓模式。选择 parquet。填入 Schema 名称。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

选择 OSS 目录位置,弹出

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

选择对应的 bucket。选择到对应路径,这里 order_contract 数据就在根路径/下,因此这里选择到根路径,不需要勾选 order_contract。然后点击确定,完成 OSS 目录位置的配置。

最后点击外层的创建,点击立即发现,完成元数据发现的创建。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

此时,可以在元信息发现页面看到刚刚创建的任务。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

SQL 查询数据

在 DLA 控制台首页,点击 SQL 执行,点击登录 DML 执行 SQL,进入 DML 页面。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

在 DML 页面,可以看到刚刚自动创建出的 Schema “test_tablestore_oss”以及创建出的映射表 order_contract。对映射表执行对应的 SQL,可以得到对应结果。这里也可以使用程序读取表中的数据,相关内容在前面关于 DLA 的文章中已经讲述过,这里不再展开。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

数据分析

下面将展示如何使用 Spark 对接 OSS,并完成如下需求,

  • 统计各店铺历史总交易额,并将结果数据入库

创建集群

创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群

登录 Spark-sql 客户端

集群管理页面,点击创建的集群。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

点击主机列表,点击emr-header-1机器。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

点击远程连接

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

选择立即登录

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

输入创建集群时设定的密码。登录机器。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

在指令行输入以下指令,登录 Spark-sql 客户端,

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

批处理

创建结果表

在 Tablestore 中创建用于存储批处理结果的结果表,当然,结果表可以存放于 MySQL、Tablestore、OSS对象存储等任意数据库以及存储结构。本例中,Tablestore 中结果表表名为 store_statistic,表结构如下,只预设一个主键。

序号

主键名称

主键类型

说明

1

s_id

STRING

分区键,店铺id

执行 Spark-sql

首先在 Spark-sql 客户端输入如下 SQL,创建 Spark 中的源表。

create table order_contract_oss(
oId string,
create_time string,
total_price double,
p_brand string,
p_price double,
pay_time bigint,
has_paid bigint,
s_id string,
p_name string,
c_id string,
c_name string,
s_name string,
p_count bigint,
p_id string
) using parquet
location 'oss://{accessSecret}:{accessKey}@otssearch-vpc-hangzhou.oss-cn-hangzhou-internal.aliyuncs.com/order_contract';

然后在 Spark-sql 客户端输入如下 SQL,创建 Spark 中的目标表。目标表中除了店铺 id 字段 s_id 外,还有字段 total,作为店铺总交易额字段。

DROP TABLE IF EXISTS store_statistic;
CREATE TABLE store_statistic
USING tablestore
OPTIONS(
endpoint="https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="test-20210609",
table.name="store_statistic",
catalog='{"columns":{"s_id":{"col":"s_id","type":"string"},"total":{"col":"total","type":"string"}}}'
);

最后输入计算 SQL,如下,即完成批处理任务。

insert into store_statistic
select s_id,sum(total_price) as total from order_contract_oss  where group by s_id

查看结果

在结果表中查看结果数据,如图。

基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-历史数据分析篇

总结

Tablestore 将数据投递进入 OSS,以 OSS 作为其全量数据备份。OSS 中数据以时间分区,列式存储,存储成本更低,且更适合支持数据分析需求。

上一篇:基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 Canal 篇


下一篇:基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据同步 DTS 篇