【阿里云流计算】- 电商订单和销量统计案例

背景

随着新零售的概念慢慢崛起,互联网电商行业竞争越来越激烈!实时数据信息对于电商行业尤为重要,那如何从实时不断的数据流中获取我们想要的信息呢?以下案例是流计算的合作伙伴袋鼠云用阿里云流计算来解决电商订单管理案例。

场景案例

统计商铺的订单总数和总的销量

业务架构图

【阿里云流计算】- 电商订单和销量统计案例

业务流程:

  1. 用阿里云的DTS(DTS信息同步)把用户的数据同步到大数据总线(DATAHUB)。
  2. 阿里云流计算订阅大数据总线(DATAHUB)的数据进行实时计算。
  3. 将实时数据插入到RDS的云数据库
  4. 再通过阿里云的DATAV或者是其他的大屏做数据展示。

准备工作

RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。
由RDS经过DTS数据同步到大数据总线(DATAHUB)后 DataHub表Schema信息。

可以参考RDS 到 DataHub 数据实时同步

1.订单源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_paytime varchar 订单付款时间
dts_deliveredtime varchar 订单发货时间
dts_storecode varchar 店铺编号
dts_warehousecode varchar 仓库code
dts_cancelled bigint 是否取消
dts_delivered bigint 是否发货
dts_receivercity varchar 收货人城市
dts_receiverprovince varchar 收货人省份
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名
dts_table_name varchar 数据表
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

2.订单详情源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_skuname varchar 商品名字
dts_skucode varchar 商品编号
dts_quantity bigint 数量
dts_dividedamount double 发货金额
dts_salechanneldividedamount double 渠道销售金额
dts_initialcost double 成本
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

业务逻辑

--数据的订单源表
create table orders_real(
     dts_ordercodeofsys   varchar, 
     dts_paytime          varchar, 
     dts_deliveredtime    varchar, 
     dts_storecode        varchar, 
     dts_warehousecode    varchar, 
     dts_cancelled        bigint, 
     dts_delivered        bigint, 
     dts_receivercity     varchar, 
     dts_receiverprovince varchar, 
     dts_record_id        varchar, 
     dts_operation_flag   varchar, 
     dts_instance_id      varchar, 
     dts_db_name          varchar, 
     dts_table_name       varchar, 
     dts_utc_timestamp    varchar, 
     dts_before_flag      varchar, 
     dts_after_flag       varchar 
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 

create table orderdetail_real(
     dts_ordercodeofsys            varchar, 
     dts_skuname                   varchar,
     dts_skucode                   varchar,
     dts_quantity                  bigint ,
     dts_dividedamount             double,
     dts_salechanneldividedamount  double,
     dts_initialcost               double,
     dts_record_id                 varchar,
     dts_operation_flag            varchar,
     dts_instance_id               varchar,
     dts_db_name                   varchar,
     dts_table_name                varchar,
     dts_utc_timestamp             varchar,
     dts_before_flag               varchar,
     dts_after_flag                varchar
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 


create table ads_all_count_amount(
    bill_date     varchar,--下单时间
    bill_count    bigint,--总的订单总数
    qty           bigint,--总的销售量
    primary key (bill_date) 
) with (
  type='rds',
  url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
  tableName='数据库表名',
  userName='数据库的账号',
  password='数据库的密码'
);

--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT 
dts_ordercodeofsys, 
MAX(dts_paytime) AS dts_paytime
    FROM orders_real
    GROUP BY dts_ordercodeofsys
    
--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real
        
--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd')

难点解析

为了方便大家理解结构化代码和代码维护,我们推荐使用View(VIEW的语义)把业务逻辑差分成三个模块。

模块一

首先根据订单编号做分组,因为同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。

CREATE VIEW new_paytime AS
SELECT 
    dts_ordercodeofsys, 
    MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys

模块二

数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如列表所示:

字段名 数据类型 详情
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。

dts_instance_id: 这条增量日志所对应的数据库的 server id。

dts_db_name: 这条增量更新日志更新的表所在的数据库库名。

dts_table_name:这条增量更新日志更新的表。

dts_operation_flag: 标示这条增量日志的操作类型。取值包括:

I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。

dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.

dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

  1. 操作类型为:insert

当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y
【阿里云流计算】- 电商订单和销量统计案例

  1. 操作类型为:update

当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y
【阿里云流计算】- 电商订单和销量统计案例

  1. 操作类型为:delete

当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N
【阿里云流计算】- 电商订单和销量统计案例

CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real

怎么判断是有效交易订单呢?

首先是要满足dts_operation_flag=U 或者 dts_operation_flag=I,
然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态;
所以有效交易订单为:

        dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
  • 为什么THEN -1 * dts_quantity呢?

订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。

模块三

为什么订单源表和订单详情要做JOIN操作?

new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表JOIN是为整合成一张大表,方便用户来统计订单总数和总的销量。


SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON 
    a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd');
上一篇:ISC公司提供的新服务,使飞康FreeStor的服务范围延伸至中小企业


下一篇:在cygwin上运行apache2