Tablestore结合Spark的流批一体SQL实战

背景介绍

电子商务模式是指在网络环境和大数据环境下基于一定技术基础的商务运作方式和盈利模式,对于数据的分析和可视化是电商运营中最重要的部分之一,而电商大屏提供了数据分析和可视化的完美结合。电商大屏包含有全量订单和实时订单的聚合,全量订单的聚合提供的是全景的综合数据视图,而实时订单的聚合展示的是实时的运营指标数据。本文将通过结合Tablestore和Spark的流批一体存储和计算,来自建电商大屏完成电商数据的分析和可视化,其效果图如下。
Tablestore结合Spark的流批一体SQL实战

架构设计

在本次的电商大屏实战中,客户端会实时向Tablestore插入原始订单数据,实时流计算会通过Spark Structured Streaming实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore,最终在DataV大屏上进行展示,而离线批计算通过Spark SQL进行原始订单数据的总金额和用户维度总金额的离线聚合,聚合结果也会写回Tablestore, 并最终在DataV大屏上进行展示,整个场景的架构图如下图所示。
Tablestore结合Spark的流批一体SQL实战

准备工作

  1. 创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群
  2. 下载E-MapReduce的最新SDK包,包名的格式为emr-datasources_shaded_*.jar,里面会包含有Tablestore相关的Spark批流Source和Sink。

数据源说明

数据源是一张简单的原始订单表OrderSource,表有两个主键UserId(用户ID)和OrderId(订单ID)和两个属性列price(价格)和timestamp(订单时间),数据示例如下图所示。
Tablestore结合Spark的流批一体SQL实战

批流SQL流程详解

创建数据源表

1.登陆EMR Header机器,执行以下命令,启动sql客户端,该客户端用于批流SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版的SDK包。

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

2.创建原始订单数据表(Source表)的外表order_source,该外表将用于后续的流批SQL执行。

DROP TABLE IF EXISTS order_source;
CREATE TABLE order_source
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderSource",
tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);

参数说明:

参数名 解释
endpoint 表格存储实例的访问地址
access.key.id 阿里云账号AK ID
access.key.secret 阿里云账号AK Secret
instance.name 表格存储实例名
table.name 表格存储表名
tunnel.id 表格存储的增量通道ID, 该参数用于实时的增量SQL, 批量SQL时非必须。
catalog 表的字段Schema定义,上述示例中对应的四个列为UserId(主键), OrderId(主键), price, timestamp,数据类型分别为string, string, double, long。

实时流计算

实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回Tablestore。首先创建流计算的Sink外表order_stream_sink(对应Tablestore表OrderStreamSink),然后运行流计算SQL进行实时聚合,最后将聚合结果实时写回Tablestore目的表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,对应的Sink表中有四个字段,begin(开始时间,主键列,格式为2019-11-27 14:54:00),end(结束时间,主键列),count(订单数),totalPrice(订单总金额)。

// 创建Sink表order_stream_sink对应Tablestore的表OrderStreamSink(主键为begin和end两列)
DROP TABLE IF EXISTS order_stream_sink;
CREATE TABLE order_stream_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderStreamSink",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

// 在order_source表上创建视图order_source_stream_view
CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");

// 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合,
// 聚合结果将写回Tablestore表OrderStreamSink。
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_stream_sink
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");

在运行Stream SQL后,可以实时得到聚合结果,聚合结果样例如下图所示,聚合结果存放在OrderStreamSink表中,通过Tablestore和DataV的直连功能,可以很容易的将结果绘制在DataV的大屏上。
Tablestore结合Spark的流批一体SQL实战

离线批计算

离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合,首先会创建两张Sink表分别存放历史总金额和用户维度总金额的聚合数据,然后直接在源表order_source上运行批计算SQL,最后得到聚合结果。

// 批计算任务
// 用户维度结果表:OrderBatchSink(主键UserId, 属性列count,totalPrice)
// 总数据维度结果表:OrderTotalSink(主键Count, 属性列totalPrice)
DROP TABLE IF EXISTS order_batch_sink;
CREATE TABLE order_batch_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderBatchSink",
tunnel.id="",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

DROP TABLE IF EXISTS order_totol_sink;
CREATE TABLE order_total_sink
USING tablestore
OPTIONS(
endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="OrderTotalSink",
tunnel.id="",
catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}'
);

运行以下批计算SQL进行用户维度聚合结果的更新。

// SQL命令
INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
// 实际运行
spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId;
Time taken: 5.107 seconds

Tablestore结合Spark的流批一体SQL实战

运行以下批计算SQL进行总数据维度结果的更新。

// SQL命令
INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
// 实际运行
spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source;
Time taken: 4.272 seconds

Tablestore结合Spark的流批一体SQL实战

写在最后

本文通过使用一套存储(Tablestore)和一套计算(Spark)完成了批流计算的有效结合,更多有关批流一体的细节和干货可以参见Tablestore结合Spark的云上流批一体大数据架构
对Tablestore有任何问题,随时欢迎同我们进行交流,钉钉群号:11789671(1群)、23307953(2群)。

上一篇:《算法技术手册》一2.3.3最好情况


下一篇:跟我一起学习ASP.NET 4.5 MVC4.0(五)