介 绍
Shell语言在Linux上有自己独到的优势,特别是在Job调度层面,为了调度项目的规范,往往我们都喜欢把配置写在数据库内或者xml文件上,这里就举列实战下Shell语言和MySQL数据库的交互,以实现经典的时间历史拉链表(数据仓库也把这种逻辑叫作缓慢渐变维)为例。
项目背景
很多情况下,新的数据不是固定时间如T+1生成的,可能是一周的某一天,或者是一个月的某一天,但是时间不固定,这个时候数据多了,就是工程师自己也无法知道某一时刻该用那一份数据,这个时候历史拉链表就是一种解决办法;
如图1,鬼谷资产公司,该公司有很多资产,但是有一些可能没有*备案,有一些坏了,有一些特殊原因不让卖,于是不定期销售部会产生一个可售资源的数据文件,把符合销售规范的数据标出来,销售人员根据这份数据文件内的数据对外销售,如20200102除了数据文件1,则没有新的数据文件出来前,销售一直按这份资产数据对外销售,直到20200115日后,新的数据文件2出来了,则改用数据文件2,以此类推,那么回顾历史数据分析,任意哪天event_day,该用那份数据呢?请看时间历史拉链表的实现。
mysql的DDL语句如下,其中create_date为数据文件生成日期,end_date为数据文件结束日期,my_partition是文件的存储分区,event_datetime为生成这条历史拉链记录的时间,这样你只要任意一天mydate,你的逻辑mydate>=create_date and mydate<end_date,取出来的my_partition就是你任意一天数据文件存储的分区,通过时间历史拉链表起到一个书本的目录作用,去把任意一天需要的分区数据取出来,问题得到解决。
CREATE TABLE `cfg_sale_his` (
`create_date` varchar(8) COLLATE utf8mb4_unicode_ci NOT NULL,
`end_date` varchar(8) COLLATE utf8mb4_unicode_ci NOT NULL,
`my_partition` varchar(8) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`event_datetime` datetime DEFAULT NULL
) ENGINE=InnoDB AUTO_INCREMENT=67 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
cfg_sale_his的数据如下,最后一条记录的最近的一个数据文件,因为不知道什么时候结束,所以end_date=‘99991231’,当新的数据文件来了后,将原来end_date='99991231’的记录修改为end_date=今天,并且新增一条记录: create_date=今天,end_date=‘99991231’,my_partition=今天,event_datetime=now()即可。
+-------------+----------+-- --------+---------------------+
| create_date | end_date | my_partition | event_datetime |
+-------------+----------+-----------+---------------------+
| 20200102 | 20200115 | 20200102 | 2020-01-15 18:33:15 |
| 20200115 | 20200120 | 20200115 | 2020-01-20 21:36:47 |
| 20200120 | 20200220 | 20200120 | 2020-02-20 22:00:57 |
| 20200220 | 99991231 | 20200220 | 2020-02-20 22:15:46 |
+-------------+----------+-----------+---------------------+
Shell交互实现代码
【题外篇】前段时间读了篇博客文章,扫了个知识盲区,可以分享下,shell的 #! 符号,专业名称叫Shebang或者Shabang符号,Shebang都没有正式的中文名称。Linux中国翻译组的GOLinux将其翻译为“释伴”,即“解释伴随行”的简称,同时又是Shebang的音译。必须顶行写,在直接调用脚本时,系统的程序载入器会分析 Shebang 后的内容,将这些内容作为解释器指令,并调用该指令,将载有 Shebang 的文件路径作为该解释器的参数来执行脚本,如我们熟悉的Shell脚本开头顶行的#!/bin/sh,在执行时会实际调用 /bin/sh 程序(通常是 Bourne shell 或兼容的 shell,例如 bash、dash 等)来执行,同时Shebang 的内容会被这些脚本解释器自动忽略。
Shell实现cfg_sale_his表的逻辑具体如下,把这个脚本命名为cfs_sale_his.sh吧:
#! /bin/bash
#User:Liuxw
#cfs_sale_his.sh
#设置三个参数,如果参数不是3个直接退出
if [[ $# -ne 3 ]];then
echo "parameters are incorrect"
exit 5
fi
#参数1 mysql密码 参数2 重试连接mysql次数,有可能某次连接网络不好中断了 参数3传入 enddate的时间
PASSWORD=$1
RETRY_TIMES=$2
EVENT_DAY=$3
echo "PASSWORD: $PASSWORD"
echo "RETRY_TIMES: $RETRY_TIMES"
echo "EVENT_DAY: $EVENT_DAY"
#表有4列值,但是最后一列的event_time日期和时分秒的中间用了空格分隔,在linux的awk语法也会分列,所以是5列
SELECT_RET_FIELDS=5
#设置最新一条记录的值为99991231
ENDLESS_DATE="99991231"
#设置event_time的值
CURRENT_DATE_WITH_MINUS_TIME=`date "+%Y-%m-%d %H:%M:%S"`
echo "SELECT_RET_FIELDS: $SELECT_RET_FIELDS"
echo "ENDLESS_DATE: $ENDLESS_DATE"
echo "CURRENT_DATE_WITH_MINUS_TIME: $CURRENT_DATE_WITH_MINUS_TIME"
#配置你的mysql连接ip 10.198.101.30 用户 my_user 密码为输入的参数${PASSWORD},端口13300
#shell交互mysql的模板:mysql -h${db_ip} -u${db_user} -p${db_pawd} -P${db_port} -D${db_name} -s -e "${sql}"
MYSQL="mysql -h10.198.101.30 -umy_user -p${PASSWORD} -P13300 -A -N --default-character-set=utf8 dw_config"
#写好你的sql语句
INSERT_SQL="insert into cfg_sale_his (create_date,end_date,my_partition,event_datetime )
values ( '$EVENT_DAY', '$ENDLESS_DATE', '$EVENT_DAY', '$CURRENT_DATE_WITH_MINUS_TIME')"
UPDATE_SQL="update cfg_sale_his set end_date = '$EVENT_DAY' where end_date = '${ENDLESS_DATE}'"
SELECT_SQL="select * from cfg_sale_his where end_date = '${ENDLESS_DATE}' "
echo "insert sql: $INSERT_SQL"
echo "update sql: $UPDATE_SQL"
echo "select sql: $SELECT_SQL"
#配置执行sql的函数
function execute_sql() {
sql="$1"
select="select"
echo "ready to execute sql: $sql"
if [[ $sql =~ $select ]];then #$sql =~ $select的=~是正则匹配,看语句是否包含select
result=`$MYSQL -e "$sql"` #select语句有返回结果
echo "execute result: "
echo "$result"
ret=`echo "$result" | awk '{print NF}'`
#查看select的结果是不是4列,因为event_time前面解释了特殊,所以等于5,不然报错
if [[ $ret -eq $SELECT_RET_FIELDS ]];then
echo "record with end_date = ${ENDLESS_DATE_WITH_MINUS} is : $result"
return 0
fi
else #不带select的是insert,update语句
$MYSQL -e "$sql"
result=$?
echo "execute result: $result"
if [[ $result -ne 0 ]];then
echo "execute insert or update sql faild, sql: $sql"
ret=3
else
echo "execute insert or update sql success, sql: $sql"
ret=0
fi
fi
return $ret
}
#构建重试次数的函数,防止就试一次,这次连接还网络异常了
function retry_execute() {
ret=4
retry_times=$1 #接受传入重试的次数,一般3次就行,
sql=$2 #sql语句
#如果参数不等于2,函数调用失败
if [[ $# -ne 2 ]];then
echo "ERROR, parameters in function [retry_execute] are not correct, parameter number is $#"
exit 3
fi
echo "ready to execute $retry_times times, sql: $sql"
#利用for循环实现重试n次
for i in `seq 1 $retry_times`;do
echo "execute num: $i"
execute_sql "$sql" #调用execute_sql函数
ret=$?
if [[ $ret -ne 0 ]];then
#如果交互 mysql失败,则睡一会,再重试。
echo "WARN: execute sql failed, sql: $sql, will sleep 10s and retry"
sleep 10s #失败后多久重试的时间,这里是10s
else #如果成功了直接break跳出循环
echo "execute sql success, sql: $sql"
ret=0
break
fi
done
return $ret
}
#调用retry_execute函数
retry_execute $RETRY_TIMES "${SELECT_SQL}"
SELECT_RET=$? #记录这次调用的状态并赋值给SELECT_RET
#调用retry_execute函数,实现修改原来的end_date=99991231的记录
retry_execute $RETRY_TIMES "${UPDATE_SQL}"
UPDATE_RET=$? #记录这次调用的状态并赋值给UPDATE_RET
#判断更新语句是否成功
if [[ $UPDATE_RET -ne 0 ]];then
echo "ERROR: update mysql failed $RETRY_TIMES times, will exit"
exit 9
fi
#调用retry_execute函数,实现insert新记录记录
retry_execute $RETRY_TIMES "$INSERT_SQL"
INSERT_RET=$?
#判断insert语句是否成功
if [[ $INSERT_RET -ne 0 ]]; then
echo "ERROR: insert mysql failed with $RETRY_TIMES times, will exit"
exit 8
fi
echo "job complete"
最终的调用脚本:
#123456位你mysql的密码
#3为你重试次数
#20200301为新的数据文件来的日期
./cfs_sale_his.sh 123456 3 20200301
卖脐橙的鬼谷大师兄
发布了20 篇原创文章 · 获赞 3 · 访问量 2133
私信
关注