点一下关注吧!!!非常感谢!!持续更新!!!
Java篇开始了!
目前开始更新 MyBatis,一起深入浅出!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(已更完)
- Flink(已更完)
- ClickHouse(已更完)
- Kudu(已更完)
- Druid(已更完)
- Kylin(已更完)
- Elasticsearch(已更完)
- DataX(已更完)
- Tez(已更完)
- 数据挖掘(已更完)
- Prometheus(已更完)
- Grafana(已更完)
- 离线数仓(正在更新…)
章节内容
上节我们完成了如下的内容:
- 电商核心交易
- 增量数据导入 订单、订单明细、产品明细表
基本介绍
- ODS层表结构域源数据基本类似(列表、数据类型)
- ODS层表名遵循统一的规范
在大数据体系中,ODS(Operational Data Store),即操作数据存储,是数据仓库中的重要组成部分,起着承上启下的作用。ODS主要是用于存储原始的、经过轻度处理的数据,通常直接从业务系统(如ERP、CRM等)中抽取而来。
ODS是大数据架构中的数据层之一,它是指在数据从业务系统到数据仓库的过程中,用于临时存放和管理数据的区域。ODS一般用来存储接近实时的、较为原始的操作型数据,为上层的数据清洗、加工、分析提供基础。
功能定位
- 作为数据仓库的前置层。
- 承接从业务系统或外部来源抽取的数据。
- 数据通常不进行深度加工和聚合,保持业务的原貌。
数据特点
- 原始性:数据通常是经过轻度标准化后的原始数据,保留业务系统的字段格式。
- 实时性:相比数据仓库,ODS的数据更新更为及时,可接近实时。
- 短期存储:ODS中的数据存储周期通常较短,一般只保存最近一段时间的数据(如7天、30天)。
ODS的作用
缓冲层作用
- ODS是数据进入数据仓库的缓冲区,避免直接从业务系统中抽取数据对业务系统造成压力。
- 将业务系统与数据仓库解耦,降低系统之间的耦合性。
数据整合和清洗
- 对原始数据进行轻度的清洗(如去重、格式转换、简单校验等)。
- 为不同来源的数据提供统一的格式和标准,便于后续分析和处理。
支持实时查询
- 提供较实时的数据查询服务,支持业务运营和分析需求。
- 适合处理短期内的数据报表、运营监控等需求。
数据追溯
保留业务数据的原始状态,方便数据问题的追溯和修复。
ODS层建表
所有的表都是分区表,字段之间的分隔符号都是",",为表的数据文件指定的位置。
ods_trade_orders
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_orders`;
CREATE EXTERNAL TABLE `ods.ods_trade_orders`(
`orderid` int,
`orderno` string,
`userid` bigint,
`status` tinyint,
`productmoney` decimal(10, 0),
`totalmoney` decimal(10, 0),
`paymethod` tinyint,
`ispay` tinyint,
`areaid` int,
`tradesrc` tinyint,
`tradetype` int,
`isrefund` tinyint,
`dataflag` tinyint,
`createtime` string,
`paytime` string,
`modifiedtime` string)
COMMENT '订单表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/orders/';
加载结果如下所示:
ods_trade_order_product
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_order_product`;
CREATE EXTERNAL TABLE `ods.ods_trade_order_product`(
`id` string,
`orderid` decimal(10,2),
`productid` string,
`productnum` string,
`productprice` string,
`money` string,
`extra` string,
`createtime` string)
COMMENT '订单明细表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/order_product/';
执行结果如下图所示:
ods_trade_product_info
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_product_info`;
CREATE EXTERNAL TABLE `ods.ods_trade_product_info`(
`productid` bigint,
`productname` string,
`shopid` string,
`price` decimal(10,0),
`issale` tinyint,
`status` tinyint,
`categoryid` string,
`createtime` string,
`modifytime` string)
COMMENT '产品信息表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/product_info/';
对应的结果如下图所示:
ods_trade_product_category
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_product_category`;
CREATE EXTERNAL TABLE `ods.ods_trade_product_category`(
`catid` int,
`parentid` int,
`catname` string,
`isshow` tinyint,
`sortnum` int,
`isdel` tinyint,
`createtime` string,
`level` tinyint)
COMMENT '产品分类表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/product_category';
对应的结果如下图所示:
ods_trade_shop
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_shops`;
CREATE EXTERNAL TABLE `ods.ods_trade_shops`(
`shopid` int,
`userid` int,
`areaid` int,
`shopname` string,
`shoplevel` tinyint,
`status` tinyint,
`createtime` string,
`modifytime` string)
COMMENT '商家店铺表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/shops';
执行结果如下图所示:
ods_trade_shop_admin_org
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_shop_admin_org`;
CREATE EXTERNAL TABLE `ods.ods_trade_shop_admin_org`(
`id` int,
`parentid` int,
`orgname` string,
`orglevel` tinyint,
`isdelete` tinyint,
`createtime` string,
`updatetime` string,
`isshow` tinyint,
`orgType` tinyint)
COMMENT '商家地域组织表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/shop_org/';
执行结果如下图所示:
ods_trade_payments
use ods;
DROP TABLE IF EXISTS `ods.ods_trade_payments`;
CREATE EXTERNAL TABLE `ods.ods_trade_payments`(
`id` string,
`paymethod` string,
`payname` string,
`description` string,
`payorder` int,
`online` tinyint)
COMMENT '支付方式表'
PARTITIONED BY (`dt` string)
row format delimited fields terminated by ','
location '/user/data/trade.db/payments/';
执行结果如下图所示:
ODS层数据加载
编写脚本
DataX仅仅是将数据从MySQL导入到了HDFS,数据并没有与Hive表建立起关联。现在我们要编写脚本,任务就是:数据迁移、数据加载到ODS层
编写脚本:
vim /opt/wzk/hive/ods_load_trade.sh
写入的内容如下所示:
#!/bin/bash
source /etc/profile
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
# 创建目录
hdfs dfs -mkdir -p /user/data/trade.db/product_category/dt=$do_date
hdfs dfs -mkdir -p /user/data/trade.db/shops/dt=$do_date
hdfs dfs -mkdir -p /user/data/trade.db/shop_org/dt=$do_date
hdfs dfs -mkdir -p /user/data/trade.db/payments/dt=$do_date
hdfs dfs -mkdir -p /user/data/trade.db/orders/dt=$do_date
hdfs dfs -mkdir -p /user/data/trade.db/order_product/dt=$do_date
hdfs dfs -mkdir -p /user/data/trade.db/product_info/dt=$do_date
# 数据迁移
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/product_category.json
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/shops.json
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/shop_org.json
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/payments.json
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/orders.json
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/order_product.json
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/product_info.json
# 加载 ODS 层数据
sql="
alter table ods.ods_trade_orders add partition(dt='$do_date');
alter table ods.ods_trade_order_product add
partition(dt='$do_date');
alter table ods.ods_trade_product_info add
partition(dt='$do_date');
alter table ods.ods_trade_product_category add
partition(dt='$do_date');
alter table ods.ods_trade_shops add partition(dt='$do_date');
alter table ods.ods_trade_shop_admin_org add
partition(dt='$do_date');
alter table ods.ods_trade_payments add
partition(dt='$do_date');
"
hive -e "$sql"
编写的结果如下所示:
运行测试
sh /opt/wzk/hive/ods_load_trade.sh 2020-07-12
执行结果如下所示: