背景
随着新零售的概念慢慢崛起,互联网电商行业竞争越来越激烈!实时数据信息对于电商行业尤为重要,那如何从实时不断的数据流中获取我们想要的信息呢?以下案例是流计算的合作伙伴袋鼠云用阿里云流计算来解决电商订单管理案例。
场景案例
统计商铺的订单总数和总的销量
业务架构图
业务流程:
- 用阿里云的DTS(DTS信息同步)把用户的数据同步到大数据总线(DATAHUB)。
- 阿里云流计算订阅大数据总线(DATAHUB)的数据进行实时计算。
- 将实时数据插入到RDS的云数据库
- 再通过阿里云的DATAV或者是其他的大屏做数据展示。
准备工作
RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。
由RDS经过DTS数据同步到大数据总线(DATAHUB)后 DataHub表Schema信息。
1.订单源表
字段名 | 数据类型 | 详情 |
---|---|---|
dts_ordercodeofsys | varchar | 订单编号 |
dts_paytime | varchar | 订单付款时间 |
dts_deliveredtime | varchar | 订单发货时间 |
dts_storecode | varchar | 店铺编号 |
dts_warehousecode | varchar | 仓库code |
dts_cancelled | bigint | 是否取消 |
dts_delivered | bigint | 是否发货 |
dts_receivercity | varchar | 收货人城市 |
dts_receiverprovince | varchar | 收货人省份 |
dts_record_id | varchar | 记录ID |
dts_operation_flag | varchar | 操作Flag |
dts_instance_id | varchar | 数据库instanceId |
dts_db_name | varchar | 数据库名 |
dts_table_name | varchar | 数据表 |
dts_utc_timestamp | varchar | 更新时间 |
dts_before_flag | varchar | 变更前标识 |
dts_after_flag | varchar | 变更后标识 |
2.订单详情源表
字段名 | 数据类型 | 详情 |
---|---|---|
dts_ordercodeofsys | varchar | 订单编号 |
dts_skuname | varchar | 商品名字 |
dts_skucode | varchar | 商品编号 |
dts_quantity | bigint | 数量 |
dts_dividedamount | double | 发货金额 |
dts_salechanneldividedamount | double | 渠道销售金额 |
dts_initialcost | double | 成本 |
dts_record_id | varchar | 记录ID |
dts_operation_flag | varchar | 操作Flag |
dts_instance_id | varchar | 数据库instanceId |
dts_db_name | varchar | 数据库名字 |
dts_table_name | varchar | 表名 |
dts_utc_timestamp | varchar | 更新时间 |
dts_before_flag | varchar | 变更前标识 |
dts_after_flag | varchar | 变更后标识 |
业务逻辑
--数据的订单源表
create table orders_real(
dts_ordercodeofsys varchar,
dts_paytime varchar,
dts_deliveredtime varchar,
dts_storecode varchar,
dts_warehousecode varchar,
dts_cancelled bigint,
dts_delivered bigint,
dts_receivercity varchar,
dts_receiverprovince varchar,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
) with (
type='datahub',
endPoint='http://dh-cn-XXXXX.com',
project='项目名',
topic='表名',
accessId='自己的ID',
accessKey='自己的KEY'
);
create table orderdetail_real(
dts_ordercodeofsys varchar,
dts_skuname varchar,
dts_skucode varchar,
dts_quantity bigint ,
dts_dividedamount double,
dts_salechanneldividedamount double,
dts_initialcost double,
dts_record_id varchar,
dts_operation_flag varchar,
dts_instance_id varchar,
dts_db_name varchar,
dts_table_name varchar,
dts_utc_timestamp varchar,
dts_before_flag varchar,
dts_after_flag varchar
) with (
type='datahub',
endPoint='http://dh-cn-XXXX.com',
project='项目名',
topic='表名',
accessId='自己的ID',
accessKey='自己的KEY'
);
create table ads_all_count_amount(
bill_date varchar,--下单时间
bill_count bigint,--总的订单总数
qty bigint,--总的销售量
primary key (bill_date)
) with (
type='rds',
url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
tableName='数据库表名',
userName='数据库的账号',
password='数据库的密码'
);
--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys
--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
AND dts_before_flag = 'Y'
AND dts_after_flag = 'N' THEN -1 * dts_quantity
WHEN dts_operation_flag = 'U'
AND dts_before_flag = 'N'
AND dts_after_flag = 'Y' THEN dts_quantity
WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
orderdetail_real
--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT
from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
(new_paytime) a
join
(new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY
from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd')
难点解析
为了方便大家理解结构化代码和代码维护,我们推荐使用View(VIEW的语义)把业务逻辑差分成三个模块。
模块一
首先根据订单编号做分组,因为同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。
CREATE VIEW new_paytime AS
SELECT
dts_ordercodeofsys,
MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys
模块二
数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如列表所示:
字段名 | 数据类型 | 详情 |
---|---|---|
dts_record_id | varchar | 记录ID |
dts_operation_flag | varchar | 操作Flag |
dts_instance_id | varchar | 数据库instanceId |
dts_db_name | varchar | 数据库名字 |
dts_table_name | varchar | 表名 |
dts_utc_timestamp | varchar | 更新时间 |
dts_before_flag | varchar | 变更前标识 |
dts_after_flag | varchar | 变更后标识 |
dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。
dts_instance_id: 这条增量日志所对应的数据库的 server id。
dts_db_name: 这条增量更新日志更新的表所在的数据库库名。
dts_table_name:这条增量更新日志更新的表。
dts_operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作
dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。
dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.
dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.
对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:
- 操作类型为:insert
当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y
- 操作类型为:update
当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y
- 操作类型为:delete
当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N
CREATE VIEW new_orderdetail AS
SELECT
dts_ordercodeofsys,
dts_skuname,
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
AND dts_before_flag = 'Y'
AND dts_after_flag = 'N' THEN -1 * dts_quantity
WHEN dts_operation_flag = 'U'
AND dts_before_flag = 'N'
AND dts_after_flag = 'Y' THEN dts_quantity
WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity
ELSE dts_quantity
END AS dts_quantity
FROM
orderdetail_real
怎么判断是有效交易订单呢?
首先是要满足dts_operation_flag=U 或者 dts_operation_flag=I,
然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态;
所以有效交易订单为:
dts_operation_flag = 'U'
AND dts_before_flag = 'N'
AND dts_after_flag = 'Y' THEN dts_quantity
- 为什么THEN -1 * dts_quantity呢?
订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。
模块三
为什么订单源表和订单详情要做JOIN操作?
new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表JOIN是为整合成一张大表,方便用户来统计订单总数和总的销量。
SELECT
from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date,
COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count,
SUM(b.dts_quantity) AS qty
from
(new_paytime) a
join
(new_orderdetail) b
ON
a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY
from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd');