大数据之Python实现每日钉钉数据自动推送
最近有一个需求,领导们需要关注每日的一些指标。简单研究了一下,发现用python+impala+钉钉可以实现,最终敲定了两个方式
- python+钉钉
- python+邮件推送
下面简单的记录一下实现第一种方案的:
一,钉钉群内添加推送的小机器人
在这里可以随意选择一种安全设置:我尝试了自定义的方式
最后会生成一个Webhook,这个后面我们代码里面会用到
二,代码实现
from datetime import datetime
import json
import urllib.request
import pymysql as pms
import requests
import pandas as pd
from sqlalchemy import create_engine
import datetime
from pyhive import hive
from impala.dbapi import connect as impala_connect
from impala.util import as_pandas
# 连接impala数据库
def impala_db(sql):
conn = impala_connect(host='188.88.88.88', port=88888)
cursor = conn.cursor()
cursor.execute(sql)
results = as_pandas(cursor)
cursor.close()
conn.close()
return results
# 昨日
today = datetime.date.today()
last_day = today + datetime.timedelta(days=-1)
last_day = datetime.datetime.strftime(last_day,"%Y%m%d")#因为是每天推送,所以设置的是根据系统时间变化的参数 格式如:20211116(查询的表里面的格式是这样的)
#where 条件传入的参数是根据系统时间变化的
df_sql1 ='''
SELECT
sum(consumption) as sum
FROM cl_cdm.dws_flash_sdk_acc_profit_di
where ptt_day = \'''' + last_day + '''\' '''
df1 = impala_db(sql = df_sql1)
#生成的df1(dataframe)输出到控制台应该是如下结构
#print(df1)
# sum
# 0 9897
#最终输出到钉钉上的指标是这个了,我们只需要在对这个数据拼接一些字符串就可以钉钉输出了
#str(df1['sum'].loc[0] 这样就拿到了sum列名第0行的数据
def get_ddmodel_datas(type):
#返回钉钉模型数据,1:文本;
if type == 1:
my_data = {
"msgtype": "text",
"title":"昨日数据推送如下",
"text": {
"content":"昨日数据统计如下测试:" +"\n"
+"总消耗收入: "+ str(df1['sum'].loc[0])+"元\n"
+"如有疑问请及时沟通,感谢您的支持!\n"
},
"at": {
"atMobiles": [
"17318598888"
],
"isAtAll":True #这个参数为true好像是@所有人的意思
}
}
return my_data
if __name__ == "__main__":
my_data = get_ddmodel_datas(1)
my_data["text"]["title"] = " 测试:"
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}
my_url ="https://oapi.dingtalk(这个输入的是上面添加机器人时生成的Webhook)"
requests.post(url=my_url, data=json.dumps(my_data), headers=header)
三,运行测试
最终也完成了这个需求,简单记录一下方法,后面的一些其他简单的指标推送也会省力很多。
如果是一些指标是一些比较复杂的表格的话,建议使用python+邮件推送(查询结果封装成一个csv文件),这个我也实现了,后面记录一下实现的方法。
四,调度实现每日推送
因为这个py文件是放在公司集群上的,所以我在此直接使用了ubantu自带的调度系统crontab,
crontab -e 进入定时任务界面
敲入:
0 6 * * * /home/user/me/test2.py > /home/user/app/tmp/chuw/pl/test.txt
:wq 保存退出
这样就实现了每天早晨6:00系统会帮我们运行/home/user/me/test2.py 这个py文件
这个需求也就完美结束了。。。
下面附带一点简单的crontab 调度案例,大家可以简单作为参考:
# 注意单纯echo,从屏幕上看不到任何输出,因为cron把任何输出都email到root的信箱了。
# 每天早上6点,输出Good morning到/tmp/test.txt文件中。
0 6 * * * echo "Good morning" >> /tmp/test.txt
# 每两个小时,输出Good morning到/tmp/test.txt文件中。
0 */2 * * * echo "Good morning" >> /tmp/test.txt
# 晚上11点到早上8点之间每两个小时和早上八点,输出Good morning到/tmp/test.txt文件中。
0 23-7/2,8 * * * echo "Good morning" >> /tmp/test.txt
# 每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点,输出Good morning到/tmp/test.txt文件中。
0 11 4 * 1-3 echo "Good morning" >> /tmp/test.txt
# 每天的下午4点、5点、6点的5 min、15 min、25 min、35 min、45 min、55 min时,输出Good morning到/tmp/test.txt文件中。
5,15,25,35,45,55 16,17,18 * * * echo "Good morning" >> /tmp/test.txt
# 1月1日早上4点,,输出Good morning到/tmp/test.txt文件中,如果出现错误,或者有数据输出,数据作为邮件发给root
0 4 1 1 * echo "Good morning" >> /tmp/test.txt SHELL=/bin/bash PATH=/sbin:/bin:/usr/sbin:/usr/bin MAILTO=root
# 如果去掉"run-parts"这个参数,后面要运行的脚本名,而不是文件夹名。
# 每小时以root用户执行/etc/cron.hourly内的脚本(1个或N个脚本程序)
01 * * * * root run-parts /etc/cron.hourly
# 每天早上4点02分以root用户执行/etc/cron.daily内的脚本(1个或N个脚本程序)
02 4 * * * root run-parts /etc/cron.daily
# 每周日早上4点02分以root用户执行/etc/cron.weekly内的脚本(1个或N个脚本程序)
02 4 * * 0 root run-parts /etc/cron.weekly
# 每月1号早上4点02分以root用户去执行/etc/cron.monthly内的脚本(1个或N个脚本程序)
02 4 1 * * root run-parts /etc/cron.monthly
# 每周一,三,五的下午3:00系统进入维护状态,重新启动系统。
00 15 * * 1,3,5 shutdown -r +5
# 每天早晨3点20分执行用户目录下如下所示的两个指令(每个指令以;分隔):
20 3 * * * (/bin/rm -f /tmp/test.txt;/bin/rm -rf /home/ec2-user/1.txt)
# 每年的一月和四月,4号到9号的3点12分和3点55分执行/bin/rm -f mm.txt 这个文件(mm.txt文件位于用户自己的目录位置)。
12,55 3 4-9 1,4 * /bin/rm -f mm.txt
五:心得
做为21年刚毕业的大学生,参加工作之后第一次接到这个需求差点给我吓到,因为我再次之前从来也没接触过python,没想到还真给我解决了。并且除了这种方式,还掌握了python进行邮件推送到指定邮箱的方式,也算是一种进步。大数据之路果然任重道远,还要继续加油!
希望以后有机会还是要多总结多记录。。。